-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Deadline breach callback fires multiple times with multiple scheduler replicas (race condition in handle_miss) #64710
Description
Description
When Airflow is running with multiple scheduler replicas (the default on Astronomer Hosted, where 2 scheduler replicas always run), the deadline check loop in scheduler_job_runner.py has a race condition that causes duplicate CallbackTrigger entries to be created for the same missed deadline. This results in the deadline breach callback firing multiple times — producing duplicate alerts/notifications.
With a single triggerer replica the duplicates may go unnoticed (they fire in rapid succession from the same pod). With two or more triggerer replicas, each triggerer pod claims one of the duplicate triggers and fires its callback independently, making the problem clearly visible to users.
Steps to Reproduce
-
Use Airflow 3 with a DAG that defines a
deadlineusingDeadlineAlertand a callback (e.g.AsyncCallback):from datetime import timedelta from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.sdk import AsyncCallback with DAG( dag_id="crm-braze-cdi-daily", schedule="@daily", deadline=DeadlineAlert( reference=DeadlineReference.DAGRUN_QUEUED_AT, interval=timedelta(hours=1), callback=AsyncCallback(path="my_package.callbacks.on_deadline_breach"), ), ) as dag: ...
-
Run Airflow with 2 scheduler replicas (the default on Astro Hosted).
-
Run Airflow with 2 triggerer replicas (e.g.
ASTRO_TRIGGERER_REPLICAS=2). -
Allow a DAG run to exceed its deadline.
Expected: The deadline breach callback fires once.
Actual: The deadline breach callback fires twice — once per triggerer replica.
Root Cause Analysis
1. The scheduler deadline check loop lacks row-level locking
In scheduler_job_runner.py, the scheduling loop checks for missed deadlines on every iteration:
# airflow/jobs/scheduler_job_runner.py, lines 1626-1635
with create_session() as session:
# Only retrieve expired deadlines that haven't been processed yet.
# `missed` is False by default until the handler sets it.
for deadline in session.scalars(
select(Deadline)
.where(Deadline.deadline_time < datetime.now(timezone.utc))
.where(~Deadline.missed) # ← NO SELECT FOR UPDATE
.options(selectinload(Deadline.callback), selectinload(Deadline.dagrun))
):
deadline.handle_miss(session)There is no with_for_update() (pessimistic locking) on this query. Under PostgreSQL's default READ COMMITTED isolation, two scheduler replicas executing this query concurrently can both read the same Deadline row with missed=False before either has committed its transaction.
2. handle_miss() creates a new Trigger record each time it is called
# airflow/models/deadline.py, lines 216-264
def handle_miss(self, session: Session):
...
if isinstance(self.callback, TriggererCallback):
self.callback.queue() # ← creates a NEW Trigger row in the DB
session.add(self.callback)
session.flush()
...
self.missed = True
session.add(self)TriggererCallback.queue() calls Trigger.from_object(CallbackTrigger(...)), inserting a new row into the trigger table:
# airflow/models/callback.py, lines 214-224
def queue(self):
from airflow.models.trigger import Trigger
from airflow.triggers.callback import CallbackTrigger
self.trigger = Trigger.from_object(
CallbackTrigger(
callback_path=self.data["path"],
callback_kwargs=self.data["kwargs"],
)
)
super().queue()3. Race condition sequence (2 scheduler replicas)
Time → Scheduler A Scheduler B
T1 reads Deadline X (missed=False)
T1 reads Deadline X (missed=False)
T2 calls handle_miss() → Trigger 1 created, missed=True, COMMIT
T3 calls handle_miss() → Trigger 2 created, missed=True, COMMIT
At T3, Scheduler B's session still holds the stale missed=False value from T1 (READ COMMITTED doesn't re-read within the same transaction). B's update of missed=True succeeds silently (idempotent), but Trigger 2 already exists in the database.
4. Two triggerer replicas pick up one trigger each
Trigger.assign_unassigned() distributes unassigned triggers across available triggerer instances. With 2 triggerer replicas, each pod claims one of the two CallbackTrigger rows and executes the deadline breach callback independently → 2 notifications sent.
With only 1 triggerer replica, both triggers are picked up by the same pod and fire back-to-back. This is still a bug (fires twice), but it is much harder to notice because the notifications arrive nearly simultaneously.
Proposed Fix
Add .with_for_update(skip_locked=True) to the deadline query in the scheduler loop. This ensures that when Scheduler A is processing a deadline, Scheduler B will skip it (rather than both processing it simultaneously):
# scheduler_job_runner.py
with create_session() as session:
for deadline in session.scalars(
select(Deadline)
.where(Deadline.deadline_time < datetime.now(timezone.utc))
.where(~Deadline.missed)
.with_for_update(skip_locked=True) # ← ADD THIS
.options(selectinload(Deadline.callback), selectinload(Deadline.dagrun))
):
deadline.handle_miss(session)skip_locked=True is preferred over nowait=True because it prevents the second scheduler from erroring out — it simply skips the row that is already locked by the first scheduler, which is the correct behaviour.
Alternatively, a unique constraint on (dagrun_id, deadline_alert_id) in the deadline table combined with an upsert pattern would also prevent the duplicate trigger creation, but the SELECT FOR UPDATE / SKIP LOCKED approach is lower risk and more consistent with how similar patterns are handled elsewhere in Airflow (e.g., trigger assignment).
Impact
| Scenario | Behaviour |
|---|---|
| 1 scheduler, 1 triggerer | No visible duplicate (correct) |
| 2 schedulers, 1 triggerer | 2 triggers created; same pod fires callback twice (may be invisible) |
| 2 schedulers, 2 triggerers | 2 triggers created; each pod fires callback once → user sees 2 notifications |
The bug is latent whenever multiple scheduler replicas run (the default on Astronomer Hosted and most HA Airflow deployments). Adding a second triggerer makes it clearly visible because the two duplicate triggers are now distributed across two independent pods.
Environment
- Airflow version: 3.0.x / 3.1.x (confirmed in
apache/airflowmainbranch as of 2026-04-02) - Database: PostgreSQL (READ COMMITTED isolation level)
- Scheduler replicas: 2 (standard Astro Hosted configuration)
- Triggerer replicas: 2 (set via
ASTRO_TRIGGERER_REPLICAS=2) - Executor: AstroExecutor
Related Issues / PRs
- apache/airflow#64620 — Triggerer
TriggerCommsDecoderasyncio.Lockrace condition (separate but related triggerer stability issue on the same deployment)