Persist Celery external executor ids on send#64997
Open
DaveT1991 wants to merge 5 commits intoapache:mainfrom
Open
Persist Celery external executor ids on send#64997DaveT1991 wants to merge 5 commits intoapache:mainfrom
DaveT1991 wants to merge 5 commits intoapache:mainfrom
Conversation
Prab-27
reviewed
Apr 10, 2026
| from sqlalchemy import update | ||
|
|
||
| from airflow.models.taskinstance import TaskInstance as TI | ||
| from airflow.models.taskinstancekey import TaskInstanceKey |
Contributor
There was a problem hiding this comment.
@DaveT1991 These imports statemtns are defined above. Could we use them here ?
Contributor
Author
There was a problem hiding this comment.
Of course, can you check now and let me know?
Contributor
Author
There was a problem hiding this comment.
@Prab-27 I did some minor import refactoring to solve the failed testing issue.
Per Prab-27 feedback, update and TaskInstance/TaskInstanceKey were re-imported inside _persist_task_external_executor_id even though they belong at the module level. Promote them out of the local scope (and out of the TYPE_CHECKING block for the model imports) so there is a single definition at the top of the file. Drop the TI alias in favour of the full TaskInstance name now that it is always in scope.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR addresses a CeleryExecutor resiliency gap by persisting the Celery task_id into task_instance.external_executor_id immediately after a successful send, so that a replacement scheduler can adopt in-flight tasks after a crash (instead of resetting/re-queuing them).
Changes:
- Persist
external_executor_idfor task-instance workloads during_send_workloads()right afterapply_async()succeeds. - Add a helper to update the
task_instancerow for the relevantTaskInstanceKey. - Add a unit regression test verifying the DB persistence and the existing event-buffer behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
providers/celery/src/airflow/providers/celery/executors/celery_executor.py |
Writes Celery task_id to TaskInstance.external_executor_id on send via a new persistence helper. |
providers/celery/tests/unit/celery/executors/test_celery_executor.py |
Adds regression coverage ensuring external_executor_id is persisted immediately after send. |
providers/celery/src/airflow/providers/celery/executors/celery_executor.py
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/executors/celery_executor.py
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/executors/celery_executor.py
Show resolved
Hide resolved
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Persist the Celery task id to
task_instance.external_executor_idas soon as a workload is successfully submitted.Today the Celery executor stores the returned
task_idinevent_buffer, and the scheduler later writes it to the database while handling the queued/running event. If the scheduler dies in that window, the replacement scheduler cannot adopt the in-flight task becauseexternal_executor_idis stillNULL, so the task may be reset and re-queued instead of adopted.This change persists the Celery task id immediately after a successful send for task-instance workloads, while keeping the existing event-buffer path as the normal reconciliation flow.
Closes #64971
Testing
providers/celery/tests/unit/celery/executors/test_celery_executor.pygit diff --check