Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for pipeline state in Data Fusion operators #8954

Merged
merged 3 commits into from
Jun 15, 2020

Conversation

turbaszek
Copy link
Member

@turbaszek turbaszek commented May 21, 2020

Closes: #8673


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the provider:google Google (including GCP) related issues label May 21, 2020
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
pipeline_name=self.pipeline_name,
instance_url=api_url,
namespace=self.namespace,
runtime_args=self.runtime_args

)
self.log.info("Pipeline started")
hook.wait_for_pipeline_state(
success_states=[PipelineStates.COMPLETED],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operator is name CloudDataFusionStartPipelineOperator (key word start).
IMO that means PipelineStates.RUNNING should be a success state by default.
However, it might be worth allowing the user to optionally control these success states to span more use cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added success_states as an optional argument. If not provided the operator will wait for operation to be RUNNING

airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
airflow/providers/google/cloud/hooks/datafusion.py Outdated Show resolved Hide resolved
runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
if runtime_args[job_id_key] == faux_pipeline_id:
return pipe["runid"]
sleep(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often do you notice the first request failing? If >50% then we expect the program run doesn't show up for some time (seconds) and we can expect first iteration to fail not work, do we expect this loop to happen at least 2-3x?
Could we move the sleep to the beginning of the loop body to increase likelihood that this loop exits on an earlier iteration? Hopefully save API calls we don't expect to yield a successful get on the program run.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually it's successful on 2nd loop but sometimes I get to the 3rd. I will move the sleep to the beginning as you suggested so it should decrease number of requests.

# may not be present instantly
for _ in range(5):
response = self._cdap_request(url=url, method="GET")
if response.status != 200:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will have the behavior you expect.

what happens when you do a get on the program run id that isn't present yet? 404 or empty body?

I personally get 404 on a 6.1.1 instance for a random uuid i generated. Has the API behavior changed?

Screenshot 2020-05-22 at 9 58 30 AM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI you can get to this convenient UI by clicking system admin > configuration > Make HTTP calls
It's very useful for getting used to / testing the CDAP REST API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are requesting .../runs/run-id, the code here is calling .../runs to get list of all runs because we don't know yet the proper CDAP run-id. I assume that this request should be successful unless something wrong is with API / network.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, my mistake.
You'll get 200 and empty collection if there's no runs.

Copy link
Member Author

@turbaszek turbaszek May 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so I will retry the request. I am not sure if there's anything we can do about this. When we call this method we are expecting to see some runs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaketf the API that CDAP exposes is the basic building blocks of programs. Which are workflows, spark, mapreduce jobs etc. The Data Fusion pipelines use workflows for batch jobs and spark streaming jobs for realtime. The operators should wait for the batch jobs and not wait for the streaming ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sreevatsanraman how can we distinguish those two types? According to Data Fusion CDAP API reference users should use the same endpoint to start both batch and streaming pipelines:
https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just handle batch pipelines in this PR (as this is implicitly all the current operator does). Also, anecdotally, I think this covers 90% of use cases for airflow. In the field i have not see a lot of streaming orchestration with airflow.

namespace,
"apps",
pipeline_name,
"workflows",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This CDAP API is very convoluted.

Seems like there are several program types and this is hard coding workflows and DataPipelineWorkflow .

I think DataPipelineWorkflow this will not be present for streaming pipelines instead you have to poll a spark program.

I'm not sure how many other scenarios require other program types.
It would be good to get someone from CDAP community to review this.

Screenshot 2020-05-22 at 10 21 00 AM

Screenshot 2020-05-22 at 10 20 27 AM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@turbaszek turbaszek May 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a bigger change because we will have to adjust each method that uses DataPipelineWorkflow in URI. So, I would say we can do this but in a follow up PR.

Btw. I was relying on Google docs: https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we cover batch pipelines (with spark or MR backend I think we should be good)

@turbaszek
Copy link
Member Author

Hi @jaketf @sreevatsanraman what should we do to move this forward?

@jaketf
Copy link
Contributor

jaketf commented Jun 9, 2020

Thanks for following up @turbaszek.
tl;dr I think we should merge this PR as it fixes the immediate issue. We can file a lower priority issue to handle streaming pipelines in the future. This can be an additional kwarg that accepts a streaming flag and uses a different paths for polling.

I've updated the threads. I agree I think we should keep this PR small and focused on patching the existing operator for starting data fusion batch pipelines.

In general I think batch is more used than streaming and spark is more used than MR.
In batch both MR and spark can be polled at the .../DataPipelineWorkflow/runs/run_id endpoint.

@turbaszek turbaszek marked this pull request as ready for review June 10, 2020 13:33
@turbaszek turbaszek requested a review from mik-laj June 10, 2020 14:42
namespace: str = "default",
pipeline_timeout: int = 10 * 60,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the success state is COMPLETED, we will timeout before the pipeline run completes.
It can take > 5 minutes to just provision a Data Fusion pipeline run. Some pipelines can take hours to complete.
Can we increase the default timeout to 1 hour?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract of this operator is to start a pipeline. not wait til pipeline completion.
10 mins is reasonable timeout.
COMPLETED is just a success state in case it's a super quick pipeline that completes between polls.
We can add a sensor for waiting on pipeline completion (which should use reschedule mode if it expects to be so long).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of these changes you can now pass in a parameter to have the operator wait for pipeline completion (not just pipeline start).
sensor + reschedule mode sounds like a good suggestion, thanks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created an issue to limit scope of this PR
#9300

@@ -616,6 +625,9 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
:type pipeline_name: str
:param instance_name: The name of the instance.
:type instance_name: str
:param success_states: If provided the operator will wait for pipeline to be in one of
the provided states.
:type success_states: List[str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing info for new pipeline_timeout parameter

"programId": "DataPipelineWorkflow",
"runtimeargs": runtime_args
}]
response = self._cdap_request(url=url, method="POST", body=body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an FYI - this is the API request to start multiple pipelines.
There will eventually be a fix return the run Id as part of the API request to run a single pipeline.
We can revert to your original URL when this is available. For context:
https://issues.cask.co/browse/CDAP-7641

fixup! Wait for pipeline state in Data Fusion operators

fixup! fixup! Wait for pipeline state in Data Fusion operators

fixup! fixup! fixup! Wait for pipeline state in Data Fusion operators
@turbaszek turbaszek merged commit aee6ab9 into apache:master Jun 15, 2020
@turbaszek turbaszek deleted the improve-datafusion-ops branch June 15, 2020 10:09
kaxil pushed a commit to kaxil/airflow that referenced this pull request Jun 27, 2020
* Wait for pipeline state in Data Fusion operators

fixup! Wait for pipeline state in Data Fusion operators

fixup! fixup! Wait for pipeline state in Data Fusion operators

fixup! fixup! fixup! Wait for pipeline state in Data Fusion operators

* Use quote to encode url parts

* fixup! Use quote to encode url parts
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Data Fusion Hook Start pipeline will succeed before pipeline is in RUNNING state
5 participants