Add logging to detect try number race#62703
Add logging to detect try number race#62703ephraimbuddy wants to merge 2 commits intoapache:mainfrom
Conversation
This adds more logging to select places that try_number mismatch could happen and would help us detect and fix the issue. Related: apache#57618
There was a problem hiding this comment.
Pull request overview
This PR adds targeted logging (and unit tests) to help detect try_number mismatches/races in the scheduler flow, particularly around TI scheduling and executor event processing (related to #57618).
Changes:
- Add a debug-gated post-update DB read in
DagRun.schedule_tis()to warn when the persistedtry_numberdiffers from the expected value. - Add additional scheduler logs/warnings around queueing workloads and handling executor events with mismatched/multiple
try_numbers. - Add/extend unit tests to assert the new warnings/logging behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
airflow-core/src/airflow/models/dagrun.py |
Adds debug-gated DB verification and warning logging for try_number mismatches after scheduling. |
airflow-core/src/airflow/jobs/scheduler_job_runner.py |
Adds more context-rich logs for queueing/scheduling and warnings for executor events with conflicting try_numbers. |
airflow-core/tests/unit/models/test_dagrun.py |
Adds tests validating warning behavior for schedule_tis() try-number mismatch checks. |
airflow-core/tests/unit/jobs/test_scheduler_job.py |
Extends/adds tests asserting new scheduler warnings via caplog. |
| dr: DagRun = dag_maker.create_dagrun(session=session) | ||
| ti = dr.get_task_instance("task_1", session=session) | ||
| assert ti is not None | ||
|
|
||
| original_execute = session.execute | ||
|
|
||
| class _FakeSelectResult: | ||
| def all(self): | ||
| return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)] | ||
|
|
||
| def execute_with_mismatch(statement, *args, **kwargs): | ||
| if getattr(statement, "is_select", False): | ||
| return _FakeSelectResult() | ||
| return original_execute(statement, *args, **kwargs) | ||
|
|
||
| monkeypatch.setattr(session, "execute", execute_with_mismatch) | ||
|
|
||
| with ( | ||
| mock.patch.object(dr.log, "isEnabledFor", return_value=True), | ||
| mock.patch.object(dr.log, "warning") as warning_mock, | ||
| ): | ||
| dr.schedule_tis((ti,), session=session) | ||
|
|
There was a problem hiding this comment.
These new tests call DagRun.schedule_tis() without explicitly setting ti.task, even though schedule_tis() assumes each TI has its task attribute populated. This currently works only if the TI instance happens to already have task set in the session identity map; please set ti.task = dr.dag.get_task("task_1") (or similar) in the test to avoid brittleness and better reflect the method contract.
| dr: DagRun = dag_maker.create_dagrun(session=session) | ||
| ti = dr.get_task_instance("task_1", session=session) | ||
| assert ti is not None | ||
|
|
||
| with ( | ||
| mock.patch.object(dr.log, "isEnabledFor", return_value=True), | ||
| mock.patch.object(dr.log, "warning") as warning_mock, | ||
| ): | ||
| dr.schedule_tis((ti,), session=session) | ||
|
|
There was a problem hiding this comment.
This test calls DagRun.schedule_tis() without explicitly setting ti.task. schedule_tis() relies on ti.is_schedulable, which assumes ti.task is populated; please assign ti.task from the DAG (e.g. dr.dag.get_task("task_1")) to avoid relying on session identity-map side effects.
| dr: DagRun = dag_maker.create_dagrun(session=session) | ||
| ti = dr.get_task_instance("task_1", session=session) | ||
| assert ti is not None | ||
|
|
||
| original_execute = session.execute | ||
| select_calls = 0 | ||
|
|
||
| def execute_with_counter(statement, *args, **kwargs): | ||
| nonlocal select_calls | ||
| if getattr(statement, "is_select", False): | ||
| select_calls += 1 | ||
| return original_execute(statement, *args, **kwargs) | ||
|
|
||
| monkeypatch.setattr(session, "execute", execute_with_counter) | ||
|
|
||
| with mock.patch.object(dr.log, "isEnabledFor", return_value=False): | ||
| dr.schedule_tis((ti,), session=session) | ||
|
|
There was a problem hiding this comment.
This test calls DagRun.schedule_tis() without ensuring ti.task is set. Since schedule_tis() assumes each TI has a populated task, the test can become flaky if the TI is reloaded or the session identity map changes; explicitly set ti.task from dr.dag before invoking schedule_tis().
| cls.logger().warning( | ||
| "Multiple executor events for same TI with different try_numbers! " | ||
| "primary_key=%s existing_try_number=%d new_try_number=%d new_state=%s. ", | ||
| key.primary, | ||
| existing_try, | ||
| key.try_number, | ||
| state, | ||
| ) |
There was a problem hiding this comment.
The warning message for multiple try_numbers has a trailing space in the format string (ends with new_state=%s. ). This creates inconsistent log output and makes log matching harder; please remove the trailing whitespace (and consider dropping the extra period) from the message.
This adds more logging to select places that try_number mismatch could happen and would help us detect and fix the issue.
Related: #57618
Was generative AI tooling used to co-author this PR?
GPT-5.3-codex