Skip to content

TriggerCommsDecoder.asend() counter increment outside async lock causes RuntimeError under greenback portal #67380

@my2038

Description

@my2038

TriggerCommsDecoder.asend() counter increment outside async lock causes RuntimeError under greenback portal
Apache Airflow version
3.2.1
What happened
Any deferrable trigger that uses asyncio.to_thread() in its run() body crashes the
triggerer process with:

RuntimeError: Response read out of order! Got frame.id=2, expect_id=5

The crash originates in TriggerCommsDecoder._aget_response() in
airflow/jobs/triggerer_job_runner.py.
What you expected to happen
A deferrable trigger using asyncio.to_thread() for synchronous I/O — the pattern
described in Airflow documentation and consistent with the Airflow 3 trigger contract
— should run stably in the triggerer without corrupting the supervisor communication
channel.
How to reproduce
Write a deferrable trigger that wraps any synchronous blocking call in
asyncio.to_thread:

from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio

class MyTrigger(BaseTrigger):
    def serialize(self):
        return ("my_module.MyTrigger", {})

    async def run(self):
        while True:
            # Any synchronous blocking call wrapped in to_thread
            result = await asyncio.to_thread(self._do_sync_work)
            if result is not None:
                yield TriggerEvent({"status": "done", "result": result})
                return
            await asyncio.sleep(2)

    def _do_sync_work(self):
        # Simulate synchronous I/O (database call, HTTP, Kafka poll, etc.)
        import time
        time.sleep(0.05)
        return None

Deploy this trigger in a deferrable operator. Run the triggerer. Under Airflow 3.2.1
the triggerer crashes with RuntimeError: Response read out of order within seconds
to minutes, depending on polling interval and system load. The crash is
non-deterministic in timing but reliably reproducible under sustained operation.
Root cause analysis
The crash is caused by an interaction between two pieces of Airflow's own code:
run_trigger unconditionally installing a greenback portal, and asend placing
next(self.id_counter) outside async with self._async_lock.
Step 1 — greenback portal installation
run_trigger installs a greenback coroutine shim on every trigger task:

# airflow/jobs/triggerer_job_runner.py
async def run_trigger(self, trigger_id, trigger, timeout_after=None):
    if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true":
        import greenback
        await greenback.ensure_portal()   # ← installs shim on this asyncio Task

    async for event in trigger.run():    # ← iterates the trigger
        ...

ensure_portal replaces the asyncio Task's coroutine with a greenback shim that
intercepts every step of the task as it passes through the event loop. This is
necessary to allow synchronous code inside the trigger to call await via
greenback.await_(). The shim is active for the entire lifetime of the trigger.
Step 2 — asyncio.to_thread creates a thread-completion Future
Inside the trigger's run():

result = await asyncio.to_thread(self._do_sync_work)

asyncio.to_thread submits _do_sync_work to the default ThreadPoolExecutor.
When the OS thread completes, it calls loop.call_soon_threadsafe(callback) to
schedule the Future's done callback on the event loop.
Step 3 — greenback delivers the callback while asend is suspended
TriggerCommsDecoder.asend is the method that sends messages from the trigger runner
to its supervisor over a socket. It is called by arun()
sync_state_to_supervisor() during normal trigger lifecycle management (heartbeats,
log lines, state updates).
The current implementation:

async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
    frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())  # ← (A)
    bytes = frame.as_bytes()

    async with self._async_lock:                        # ← (B)
        self._async_writer.write(bytes)
        return await self._aget_response(frame.id)      # ← (C) suspended here

The sequence that triggers the bug:
asend Call 1 executes line (A): next(self.id_counter) → id=2. Counter
is now at 2.
Call 1 acquires _async_lock at (B), writes frame id=2, and suspends at (C)
while waiting for the supervisor's response.
The greenback shim intercepts the to_thread Future's completion callback and
delivers it into the trigger Task's greenlet context — injecting execution while
Call 1 is suspended at (C).
This re-enters arun()sync_state_to_supervisor()asend().
asend Call 2 executes line (A): next(self.id_counter) → id=3. Counter
now at 3. Call 1's frame id=2 has not yet received a response.
Call 2 tries to acquire _async_lock — blocks, because Call 1 holds it.
More callbacks arrive; Calls 3, 4, 5 enter, each advancing the counter to
4, 5, 6.
Eventually Call 1 receives its response (for id=2) and releases the lock.
Call 2 acquires the lock, writes frame id=3, suspends at (C).
The supervisor responds to id=3. Call 2 calls _aget_response(3).
But meanwhile Call 5 (id=6) has acquired the lock and written frame id=6.
The supervisor's next response is for id=6, not id=3.

async def _aget_response(self, expect_id: int) -> ToTriggerRunner | None:
    frame = await self._aread_frame()
    if frame.id != expect_id:
        raise RuntimeError(
            f"Response read out of order! Got {frame.id=}, {expect_id=}"
        )

Crash: RuntimeError: Response read out of order! Got frame.id=6, expect_id=3.
Why this is a re-entrancy bug, not a race condition
This is not a threading race condition. Python's asyncio event loop is
single-threaded; under normal cooperative scheduling, nothing can execute between
line (A) and line (B) because there is no await between them. The asyncio.Lock
would be sufficient protection in a normal asyncio program.
Greenback breaks the no-interruption guarantee. Its coroutine shim can deliver
thread-completion callbacks into the Task's execution context between any two
coroutine steps — including the step between line (A) and the lock acquisition at
(B). This makes asend effectively re-entrant from the perspective of the counter
and socket state, even though only one OS thread is involved.
The correct analogy is a signal handler re-entrancy bug: a signal fires
mid-function, the signal handler calls the same function again, and two live
invocations share mutable state (self.id_counter) without mutual exclusion.
Critically: both halves of the bug are in Airflow's own code. run_trigger
unconditionally installs the greenback portal. asend places the counter increment
outside the lock. A user-written trigger using asyncio.to_thread — which is the
documented, correct pattern for synchronous I/O in deferrable triggers — has no
means to protect itself from this interaction.

proposed fix — move counter inside lock is insufficient; queue-based serialisation is the correct approach

Why moving the counter inside the lock deadlocks

The intuitive fix is:

async def asend(self, msg):
    async with self._async_lock:
        frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
        self._async_writer.write(frame.as_bytes())
        return await self._aget_response(frame.id)  # ← injection point, still inside lock

This trades the crash for a deadlock. asyncio.Lock is not reentrant. When greenback injects a second asend call while the first is suspended at await self._aget_response(frame.id) inside the lock, the injected call hits async with self._async_lock and tries to acquire a lock already held by the same asyncio Task. The Task suspends waiting for itself to release the lock and never does.

Correct fix: per-instance ordered queue

TriggerCommsDecoder is an attrs class, so _pending_ids is cleanly declared as a proper field initialised at construction. Replace asend with a queue-based implementation and add an additive _aget_queued_response method. The original _aget_response is not modified.

from collections import deque
import asyncio

@attrs.define(kw_only=True)
class TriggerCommsDecoder(CommsDecoder[ToTriggerRunner, ToTriggerSupervisor]):
    _async_writer: asyncio.StreamWriter = attrs.field(alias="async_writer")
    _async_reader: asyncio.StreamReader = attrs.field(alias="async_reader")
    body_decoder: TypeAdapter[ToTriggerRunner] = attrs.field(
        factory=lambda: TypeAdapter(ToTriggerRunner), repr=False
    )
    _async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
    _pending_ids: deque = attrs.field(factory=deque, repr=False)  # ← ADD

    async def asend(self, msg):
        frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
        # ── synchronous block — no await, no injection point ──
        evt = asyncio.Event()
        self._pending_ids.append((frame.id, evt))
        self._async_writer.write(frame.as_bytes())
        # ──────────────────────────────────────────────────────
        return await self._aget_queued_response(frame.id, evt)

    async def _aget_queued_response(self, expected_id: int, evt: asyncio.Event):
        # Wait until we are at the front of the queue.
        while self._pending_ids[0][0] != expected_id:
            await evt.wait()

        # Sole reader — call original _aget_response unmodified.
        result = await self._aget_response(expected_id)

        # Vacate front and signal next waiter.
        self._pending_ids.popleft()
        if self._pending_ids:
            _, next_evt = self._pending_ids[0]
            next_evt.set()

        return result

Why this is correct

  • next(self.id_counter), deque.append, and _async_writer.write are all synchronous — no await between them, so greenback cannot inject between counter increment and enqueue. Enqueue order == counter order == socket arrival order.
  • If greenback injects a second asend while _aget_queued_response is awaiting, the injected call enqueues its (frame_id, Event) tuple and writes its frame, then suspends at evt.wait() because it is not at the front. It never touches the socket reader.
  • When the front-of-queue caller finishes reading its response it calls next_evt.set() on the next waiter's Event before returning. The next caller wakes without new IO — its response is already buffered.
  • _aget_response is called by exactly one caller at a time. The method is not modified.
  • _pending_ids is per-instance — no cross-pod or cross-process coordination needed.

Validated against Airflow 3.1.0, 3.1.8, 3.2.0, 3.2.1 — asend is structurally identical across all four versions.

Why the AIRFLOW_DISABLE_GREENBACK_PORTAL escape hatch is not an adequate
workaround:
Disabling the greenback portal at triggerer-process scope has unverified side effects
on other Airflow internals that depend on the portal for sync-from-async bridging
(Task SDK calls from synchronous trigger setup code, observability integrations).
The escape hatch is process-wide, not per-trigger. The code fix is the correct
resolution.
Additional context
Verified against Airflow 3.2.1 source:
airflow/jobs/triggerer_job_runner.py lines 812–859 (TriggerCommsDecoder),
lines 1204–1209 (run_trigger greenback installation).
asyncio.to_thread and loop.run_in_executor are equivalent in this context —
both submit work to a thread pool and return a Future whose completion callback is
delivered via loop.call_soon_threadsafe. Both exhibit the crash.
confluent_kafka.aio.AIOConsumer also uses loop.run_in_executor internally
(_call method) and exhibits the same crash profile. Any thread-pool-based async
wrapper over a synchronous library is affected.
The crash is non-deterministic in exact timing but reliable under sustained
operation. It depends on the thread completing and the callback being ready to
deliver precisely while asend is suspended inside _aget_response. Under load
or with longer blocking calls this window widens and the crash frequency increases.
Environment

Airflow version 3.2.1
Python version 3.11
Executor LocalExecutor
Deployment Vanilla Airflow — official Docker image (apache/airflow:3.2.1)
OS Linux (Docker container)
Install method Official Docker image, no third-party distribution
The reproduction was performed against the official apache/airflow:3.2.1 Docker
image with no Astronomer or other vendor-specific modifications. This confirms the
bug is in core Airflow and not specific to any managed deployment platform.

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Triggererkind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions