Added JOB_STATE_CANCELLED and pool_sleep GCP Dataflow Operators#37364
Added JOB_STATE_CANCELLED and pool_sleep GCP Dataflow Operators#37364jcamatta wants to merge 5 commits intoapache:mainfrom jcamatta:google-dataflow-add-status_cancelled-pool_sleep
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
hussein-awala
left a comment
There was a problem hiding this comment.
Could you please add a test for this change?
|
@hussein-awala done. |
…n DataflowStartFlexTemplateOperator
DataflowStartFlexTemplateOperator, DataflowTemplatedJobStartOperator
|
I fixed some of the static checks and rebased. |
|
Hi, @eladkal . If you dont mind, I will try to take this in the weekend since right now i am full of work. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
Added an 'elif' block in the 'async def run(self)' method of 'TemplateJobStartTrigger', which captures 'JOB_STATE_CANCELLED'. This prevents the triggerer from looping indefinitely when a dataflow is cancelled. This also required updating the 'execute_complete' method of both 'DataflowStartFlexTemplateOperator' and 'DataflowTemplatedJobStartOperator', which now raise an 'AirflowException(event["message"])' when the job has the status "cancelled". So, if you cancel the job in the Google console, this will be reflected in the Airflow task execution.
Added 'pool_sleep' as an optional argument to the constructor of 'DataflowStartFlexTemplateOperator', preserving its default value of 10 seconds. This argument is passed to the 'TemplateJobStartTrigger' constructor when the 'self.defer' method is called in 'execute'.
Added tests to TemplateJobStartTrigger, DataflowStartFlexTemplateOperator and DataflowTemplatedJobStartOperator.