-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix ECS Executor compatibility with Airflow 3.x in try_adopt_task_instances #58207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix ECS Executor compatibility with Airflow 3.x in try_adopt_task_instances #58207
Conversation
…tances The try_adopt_task_instances method was calling ti.command_as_list() which doesn't exist in Airflow 3.x due to Task SDK changes. This caused scheduler to crash with AttributeError when trying to adopt orphaned ECS tasks. This fix: - Checks AIRFLOW_V_3_0_PLUS to determine Airflow version - For Airflow 3.x: Reconstructs the command list manually from task instance attributes - For Airflow 2.x: Uses the existing command_as_list() method Fixes apache#58205
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
In Airflow 3.x, the TaskInstance.execution_date attribute was renamed to logical_date. This updates the fix to use the correct attribute name.
|
Updated: Also fixed The TaskInstance attribute |
| command = [ | ||
| "airflow", | ||
| "tasks", | ||
| "run", | ||
| ti.dag_id, | ||
| ti.task_id, | ||
| ti.logical_date.isoformat(), | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not how you want to generate a cmd for an Airflow 3.x task. The commands look different in AF3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could consider making this a helper function now as well, instead of copy/pasting the implementation here.
Match the command generation pattern used for newly queued tasks. Creates ExecuteTask workload, serializes it as JSON, and constructs the proper SDK command structure.
|
Updated to use proper Task SDK workload serialization pattern. Now matches the command generation used for newly queued tasks (lines 509-518):
|
o-nikolas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a unit tests we can add for this?
| command = [ | ||
| "airflow", | ||
| "tasks", | ||
| "run", | ||
| ti.dag_id, | ||
| ti.task_id, | ||
| ti.logical_date.isoformat(), | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could consider making this a helper function now as well, instead of copy/pasting the implementation here.
Removing request for change as the critical piece was addressed
|
@iamapez Are you planning to get back to this one? I've removed my request for changes since you've addressed the main issue. But I still think tidying the command code into a helper since it's now copied in two places is worth doing. |
Address code review feedback by consolidating duplicated command generation into reusable helper methods and adding comprehensive test coverage. Changes: - Add _serialize_workload_to_command() static method for workload serialization - Add _build_task_command() method for version-aware command generation - Refactor execute_async() to use helper method (removed 9 lines of duplication) - Refactor try_adopt_task_instances() to use helper method (removed 18 lines) - Add comprehensive unit tests for both Airflow 2.x and 3.x compatibility - Update test_try_adopt_task_instances and remove skip marker for Airflow 3.x - Add Tradepost Markets Inc to INTHEWILD.md Test coverage: - test_serialize_workload_to_command: validates workload-to-command conversion - test_build_task_command_airflow3: validates Airflow 3.x command generation - test_build_task_command_airflow2: validates Airflow 2.x command generation - test_try_adopt_task_instances: updated for Airflow 3.x support
3fd19b5 to
ca4bdf8
Compare
|
Consolidated the command generation logic into helper methods and added unit tests as requested. @o-nikolas |
|
checking in on this pr @o-nikolas |
providers/amazon/src/airflow/providers/amazon/aws/exceptions.py
Outdated
Show resolved
Hide resolved
d1bc3ee to
59231e2
Compare
|
sorry @o-nikolas accidental commit to this branch - i reverted the changes |
@iamapez Looks like there are many test failures. Both static checks and unit test failures. Can you please have a look? |
Summary
Fixes #58205
The
try_adopt_task_instancesmethod in the ECS Executor was callingti.command_as_list()which doesn't exist in Airflow 3.x due to Task SDK changes. This caused the scheduler to crash withAttributeErrorwhen trying to adopt orphaned ECS tasks.The Problem
TaskInstance.command_as_list()method was removed as part of the Task SDK refactoringtry_adopt_task_instancesmethod still calls this non-existent method at line 598The Solution
This PR adds version-aware code that:
AIRFLOW_V_3_0_PLUSto determine the Airflow versioncommand_as_list()methodTesting
py_compileChecklist