[AIRFLOW-5716][part of AIRFLOW-5697] Simplify DataflowJobsController logic#6386
[AIRFLOW-5716][part of AIRFLOW-5697] Simplify DataflowJobsController logic#6386potiuk merged 1 commit intoapache:masterfrom
Conversation
airflow/gcp/hooks/dataflow.py
Outdated
There was a problem hiding this comment.
The clean up in this PR LGTM. My only thought for further clean up is IMO this function is a misnomer it is called "_start_dataflow" but it actually does two things start and wait_for_done. detangling this so the hook provides a function for starting and a function for waiting and leave details to the Operator's execute. I think this would make it simpler once we sort out my reschedule poking operator PR. Another place it could be useful is we could allow the hook to start a Dataflow Streaming job without waiting on it until some other system cancels. I think this could be cool for streaming jobs we'd only need running at certain times of day. Of course we'd have to add a function to the hook to stop / drain a dataflow streaming job. This could be interesting if you are using a dataflow job to do streaming analytics on IoT data but only during 8 hr working day. Your dag could be @daily start dataflow job and then have a stop dataflow job which reschedules itself for 8hrs after the start dataflow job succeeds. This "ephemeral streaming job " is a rather contrived use case but it demonstrates additional value of separating start and wait_for_done operations in hooks like this one.
There was a problem hiding this comment.
Circulating this bounded streaming pipeline idea internally it doesn't seem like there's been real use cases for it in the field.
There was a problem hiding this comment.
We can't split this one method into two methods because a process is being run that supervisor the task. Unfortunately, this is a limitation of Apache Beam, which does not have the option of forcing external supervision. In any case, we must wait until the Apache Beam system process is completed to be sure of completing the job.
This operator can also be used to initiate streaming jobs, but we lack the operator to stop the task if we want to handle your process fully.
https://github.com/apache/airflow/blob/master/tests/gcp/hooks/test_dataflow.py#L490-L520
There was a problem hiding this comment.
My understanding that this client side controller process that supervises the job is only the case for "normal" jobs that would be submitted with DataflowPythonOperator or DataflowJavaOperator. However templates can be instantiated and poll for completion separately (see running templates).
There was a problem hiding this comment.
You added a comment about the function that is responsible for running tasks on the local machine, so I was slightly confused
If you would like to create an asynchronous operator then you would have to make changes to the _start_template_dataflow method, so that it does not start the waiting process. In next step, you should use is_job_dataflow_running method to poke jobs status. Currently, most hook methods for GCP integration are synchronous, because it was part of the practice that my team used. I think this is not explicitly written in the integration guide and we should update it on this issue.
https://docs.google.com/document/d/1_rTdJSLCt0eyrAylmmgYc3yZr-_h51fVlnvMmWqhCkY/edit?ts=5bb72dfd#
Codecov Report
@@ Coverage Diff @@
## master #6386 +/- ##
==========================================
- Coverage 83.8% 83.74% -0.07%
==========================================
Files 635 635
Lines 36750 36743 -7
==========================================
- Hits 30800 30769 -31
- Misses 5950 5974 +24
Continue to review full report at Codecov.
|
potiuk
left a comment
There was a problem hiding this comment.
I like it. Really nice simplification!
This PR is one of a series that aims to improve this integration
https://issues.apache.org/jira/browse/AIRFLOW-5697
jobsclass fieldMake sure you have checked all steps below.
Jira
Description
Tests
Commits
Documentation