Skip to content

Possible race condition in triggerer when running multiple instances #38599

@Lioscro

Description

@Lioscro

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3

What happened?

Our Airflow deployment runs 4 triggerer instances. Under high load, we would often see triggerer containers sporadically crashing (and being restarted by our container orchestration system) with the following error.

Traceback (most recent call last):
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
3/9/2024 5:32:22 PM    self.run()
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 72, in run
3/9/2024 5:32:22 PM    reraise(*_capture_exception())
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/_compat.py", line 60, in reraise
3/9/2024 5:32:22 PM    raise value
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 70, in run
3/9/2024 5:32:22 PM    return old_run_func(self, *a, **kw)
3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 461, in run
3/9/2024 5:32:22 PM    asyncio.run(self.arun())
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/runners.py", line 190, in run
3/9/2024 5:32:22 PM    return runner.run(main)
3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^^
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/runners.py", line 118, in run
3/9/2024 5:32:22 PM    return self._loop.run_until_complete(task)
3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
3/9/2024 5:32:22 PM    return future.result()
3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 475, in arun
3/9/2024 5:32:22 PM    await self.create_triggers()
3/9/2024 5:32:22 PM  File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 500, in create_triggers
3/9/2024 5:32:22 PM    dag_id = task_instance.dag_id
3/9/2024 5:32:22 PM             ^^^^^^^^^^^^^^^^^^^^
3/9/2024 5:32:22 PMAttributeError: 'NoneType' object has no attribute 'dag_id'

This is the same error and behavior that @tomrutter observed in #32091

I believe I've identified the root cause as a race condition that may happen especially if the TriggererJobRunner's main loop (_run_trigger_loop‎) is delayed due to high load. For context, our triggerer containers share a host with Celery worker containers. It seems there is a small window (which gets larger with higher load) where, if you are running multiple triggerer instances, one triggerer could clear the TaskInstance.trigger_id while another is about to start executing it. Consider the following scenario.

  1. TaskInstance TI1 defers itself, which creates Trigger T1, which holds a reference to TI1.
  2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1.
  3. TJR1 misses a heartbeat, most likely due to high host load causing delays in each TriggererJobRunner._run_trigger_loop‎ loop.
  4. A second TriggererJobRunner TJR2 notices that T1 has missed its heartbeat, so it starts the process of picking up any Triggers that TJR1 may have had, including T1.
  5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and cleans it up by clearing the trigger_id of TI1.
  6. TJR2 tries to execute T1, but it crashes (with the above error) while trying to look up TI1 (because T1 no longer has a TaskInstance linked to it).

When a new Trigger is created, it is guaranteed to have a linked TaskInstance because the creation of the Trigger and update to the TaskInstance's trigger_id field are committed together.

self._defer_task(defer=defer, session=session)

The only places I could find where the TaskInstance's trigger_id is modified are in the Trigger.submit_event and Trigger.submit_failure.
def submit_event(cls, trigger_id, event, session: Session = NEW_SESSION) -> None:

def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) -> None:

So it seems to me that we can assume that any Trigger that does not have an associated TaskInstance has already been handled correctly by another TriggerJobRunner. I was able to fix this behavior in our deployment with a simple patch to skip Triggers without a TaskInstance.

diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py
index c1168a09b1..1f71b2abe7 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -689,6 +689,16 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.failed_triggers.append((new_id, e))
                 continue
 
+            # If new_trigger_orm.task_instance is None, this means the TaskInstance
+            # row was updated by either Trigger.submit_event or Trigger.submit_failure
+            # and can happen when a single trigger Job is being run on multiple TriggerRunners
+            # in a High-Availability setup.
+            if new_trigger_orm.task_instance is None:
+                self.log.warning(
+                    "TaskInstance for Trigger ID %s is None. Skipping trigger instantiation.", new_id
+                )
+                continue
+
             try:
                 new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
             except TypeError as err:

Happy to submit a PR!

What you think should happen instead?

Triggerer containers shouldn't crash due to a race condition.

How to reproduce

Happens very sporadically and intermittently, so very difficult to reproduce. I've also played around with a toy Airflow deployment locally but haven't found a way to reproduce it. Suggestions welcome!

Operating System

Ubuntu 22.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions