Fix EMR container job not cancelled on deferral timeout#64770
Fix EMR container job not cancelled on deferral timeout#64770aurangzaib048 wants to merge 12 commits intoapache:mainfrom
Conversation
- Wrap stop_query() in try/except inside CancelledError handler to ensure CancelledError is always re-raised even if cancellation fails - Wrap safe_to_cancel() call to prevent DB/API errors from replacing the CancelledError - Wrap stop_query() in execute_complete() to preserve original error - Narrow exception handler in run() from Exception to AirflowException to match base class behavior and avoid swallowing programming bugs - Narrow except in get_task_state() to (KeyError, TypeError) with proper exception chaining via 'from e' - Use "error" status in failure TriggerEvent to match base class convention and include return_key/return_value
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
The base class hook() method is typed to return AwsGenericHook[Any], which does not expose the stop_query() method specific to EmrContainerHook. Add an explicit type annotation with a type ignore to satisfy mypy.
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Fixes orphaned EMR on EKS container jobs when deferrable tasks time out or are killed by adding cancel-on-cancel semantics to the trigger and ensuring the operator cancels the job on terminal error events.
Changes:
- Add
cancel_on_killsupport toEmrContainerTrigger, including asafe_to_cancel()check and explicitCancelledErrorhandling to stop the remote job. - Update
EmrContainerOperator.execute_complete()to stop the EMR job when the trigger reports failure/timeout. - Add/extend unit tests to cover trigger serialization and cancellation behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py | Adds cancel-on-kill behavior and safe cancellation checks in EmrContainerTrigger. |
| providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py | Cancels EMR container jobs in execute_complete() on non-success events; passes cancel_on_kill=True to trigger. |
| providers/amazon/tests/unit/amazon/aws/triggers/test_emr.py | Adds serialization and cancellation-path unit tests for EmrContainerTrigger. |
| providers/amazon/tests/unit/amazon/aws/operators/test_emr_containers.py | Adds unit tests verifying job cancellation on execute_complete() failure. |
providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
Outdated
Show resolved
Hide resolved
providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
Outdated
Show resolved
Hide resolved
- Add except Exception fallback in trigger run() to match base class behavior for non-AirflowException errors - Wrap sync get_task_instance() and stop_query() with sync_to_async to avoid blocking the triggerer event loop - Add cancel_on_kill parameter to EmrContainerOperator for user opt-out - Add test verifying original error is preserved when stop_query fails
providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
Outdated
Show resolved
Hide resolved
…omplete
- Pass virtual_cluster_id to EmrContainerHook in hook() so stop_query()
can call cancel_job_run with the correct cluster ID
- Use validated_event.get("job_id") in execute_complete() as primary
source since self.job_id may be None after deferral reconstruction
- Move 'from sqlalchemy import select' to module-level conditional block to avoid repeated function-scope imports - Add spec=EmrContainerHook to MagicMock instances in trigger tests to catch attribute/typo bugs
providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
Outdated
Show resolved
Hide resolved
- Merge except AirflowException and except Exception into single except Exception handler since both yield identical error events - Remove test_serialization_includes_cancel_on_kill since the existing test_serialization already covers cancel_on_kill=True default - Remove unused AirflowException import from triggers/emr.py
providers/amazon/src/airflow/providers/amazon/aws/triggers/emr.py
Outdated
Show resolved
Hide resolved
The execution API uses task_id as key for non-mapped tasks but
"{task_id}_{map_index}" for mapped tasks (map_index >= 0). Use the
correct key format so safe_to_cancel works for mapped deferrable
EMR container jobs.
vincbeck
left a comment
There was a problem hiding this comment.
I understand the goal here but I am also wondering if it is worth it. You're adding a lot of code just for that use case, which creates significant complexity in the operator. By any chance, is there any way to make it simpler?
providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
Outdated
Show resolved
Hide resolved
The worker running execute_complete may differ from the one that ran execute, so self.job_id cannot be relied upon after deferral.
| if job_id: | ||
| self.log.info("Cancelling EMR container job %s", job_id) | ||
| try: | ||
| self.hook.stop_query(job_id) |
There was a problem hiding this comment.
You seem to cancel the job regardless of cancel_on_kill value?
When
EmrContainerOperatorruns in deferrable mode and the trigger times out or the task is killed, the EMR job keeps running on the cluster. This leads to orphaned jobs consuming resources and duplicate executions on retry.This PR adds cancel-on-kill support to
EmrContainerTriggerfollowing the same proven pattern asEMrServerlessStartJobTrigger(PR #51883):run()inEmrContainerTriggerto catchasyncio.CancelledErrorand cancel the EMR job viahook.stop_query()when safe to do sosafe_to_cancel()check to distinguish user-initiated kills from triggerer restarts (avoids cancelling jobs during triggerer restart)cancel_on_killparameter (defaultTrue) for opt-outEmrContainerOperator.execute_complete()to cancel the job when the trigger reports a failure/timeout eventcloses: #60517
Was generative AI tooling used to co-author this PR?