Pre-assign Celery task ID at queuing time to prevent duplicate execution on scheduler crash#65594
Conversation
2fbba3e to
c01855c
Compare
c01855c to
d04f387
Compare
b8785ab to
36c6afc
Compare
36c6afc to
6d02e2d
Compare
…execution on scheduler crash When a scheduler crashes between dispatching a task to Celery and processing the QUEUED event that persists `external_executor_id`, the replacement scheduler cannot adopt the in-flight task. Without the Celery task ID in the database, `try_adopt_task_instances` has no `AsyncResult` to look up, so the task is reset and re-queued — causing duplicate execution of an already-running task. Fix this by generating `external_executor_id` via a DB-side UUID function (`gen_random_uuid` on PostgreSQL, `UUID()` on MySQL, a Python `uuid4` registered on SQLite) in the same bulk UPDATE that sets state=QUEUED. The ID is committed atomically with the state transition — no second write, no race window. RETURNING is used on PostgreSQL and SQLite to read back the generated UUIDs without a second round-trip; MySQL falls back to a SELECT. The CeleryExecutor passes the pre-assigned ID to `apply_async()` as the Celery `task_id`, making it deterministic from DB state. Other executors ignore it and overwrite with their own ID (e.g. ECS task ARN) during event processing. This also fixes the separate race in apache#55004 where `external_executor_id` is lost when the task instance row is locked during event processing. `process_executor_events` uses `skip_locked=True`, and `get_event_buffer()` flushes the executor's in-memory buffer into a local variable. If a TI is locked and skipped, its QUEUED event is consumed from the buffer but never processed — the event and its task ID are silently dropped. With the ID now written to the database before the task is even sent to Celery, adoption no longer depends on the event being processed. Closes: apache#55004 Closes: apache#58570 Closes: apache#64971
6d02e2d to
c71d5c8
Compare
|
Hi maintainer, this PR was merged without a milestone set.
|
Backport failed to create: v3-2-test. View the failure log Run detailsNote: As of Merging PRs targeted for Airflow 3.X In matter of doubt please ask in #release-management Slack channel.
You can attempt to backport this manually by running: cherry_picker 3b188b9 v3-2-testThis should apply the commit to the v3-2-test branch and leave the commit in conflict state marking After you have resolved the conflicts, you can continue the backport process by running: cherry_picker --continueIf you don't have cherry-picker installed, see the installation guide. |
When a scheduler crashes between dispatching a task to Celery and processing the QUEUED event that persists
external_executor_id, the replacement scheduler cannot adopt the in-flight task. Without the Celery task ID in the database,try_adopt_task_instanceshas noAsyncResultto look up, so the task is reset and re-queued — causing duplicate execution of an already-running task.Fix this by generating a UUID for
external_executor_idat queuing time (in_enqueue_task_instances_with_queued_state), committed to the database atomically with the QUEUED state transition. The same ID is carried through the workload and passed astask_idto Celery'sapply_async(), making the Celery task ID deterministic from database state. A fresh UUID is generated on every queuing — including reschedule sensor re-queuing — avoiding stale result backend collisions.This also fixes the separate race in #55004 where
external_executor_idis lost when the task instance row is locked during event processing.process_executor_eventsusesskip_locked=True, andget_event_buffer()flushes the executor's in-memory buffer into a local variable. If a TI is locked and skipped, its QUEUED event is consumed from the buffer but never processed — the event and its task ID are silently dropped. With the ID now written to the database before the task is even sent to Celery, adoption no longer depends on the event being processed.The ID is added to
TaskInstanceDTOwithField(exclude=True)(same pattern asexecutor_config) so it is available on the in-memory model but excluded from the JSON payload sent to workers.Closes: #55004
Closes: #58570
Closes: #64997