Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ def process_executor_events(
1. **Normal task completion**: Updates task states for successful/failed tasks
2. **External termination**: Detects tasks killed outside Airflow and marks them as failed
3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors,
and tasks moved to ``scheduled`` after a trigger fired so a stale executor success from the
and tasks moved to ``scheduled`` or ``queued`` after a trigger fired so a stale executor success from the
pre-deferral worker exit does not fail the task instance
4. **Callback processing**: Sends task callback requests to DAG Processor for execution
5. **Email notifications**: Sends email notification requests to DAG Processor
Expand Down Expand Up @@ -1370,8 +1370,8 @@ def process_executor_events(
# - in this case we should mark it as failed here.
# 2) the TI has been requeued after getting deferred - in this case either our executor has it
# or the TI is queued by another job. Either ways we should not fail it.
# 3) the trigger already put the TI back to scheduled (resume after defer) but the executor success
# from the worker exit after defer() has not been processed yet - should not fail it.
# 3) the trigger already put the TI back to scheduled/queued (resume after defer) but the executor
# success from the worker exit after defer() has not been processed yet - should not fail it.

# All of this could also happen if the state is "running",
# but that is handled by the scheduler detecting task instances without heartbeats.
Expand All @@ -1386,9 +1386,9 @@ def process_executor_events(
ti.queued_by_job_id != job_id # Another scheduler has queued this task again
or executor.has_task(ti) # This scheduler has this task already
or (
# Resume-after-defer: trigger moved TI to scheduled (next_method set) before we saw the
# Resume-after-defer: trigger moved TI to scheduled/queued (next_method set) before we saw the
# executor success from the defer exit for the same try_number.
ti.state == TaskInstanceState.SCHEDULED
ti.state in (TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED)
and state == TaskInstanceState.SUCCESS
and ti.next_method is not None
)
Expand Down
18 changes: 10 additions & 8 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,17 +819,19 @@ def test_process_executor_events_ti_requeued(

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow._shared.observability.metrics.stats._get_backend")
def test_process_executor_events_stale_success_when_scheduled_after_defer(
self, mock_get_backend, mock_task_callback, dag_maker
@pytest.mark.parametrize("rescheduled_state", [State.SCHEDULED, State.QUEUED])
def test_process_executor_events_stale_success_when_rescheduled_after_defer(
self, mock_get_backend, mock_task_callback, dag_maker, rescheduled_state
):
"""
Trigger moved TI to scheduled (resume after defer) before executor success from defer exit arrived.
Trigger moved TI to scheduled/queued (resume after defer) before executor success from defer exit arrived.

Regression for https://github.com/apache/airflow/issues/66374 — must not treat as state mismatch.
Regression for https://github.com/apache/airflow/issues/66374 and
https://github.com/apache/airflow/issues/67287 — must not treat as state mismatch.
"""
mock_stats = mock.MagicMock(spec=StatsLogger)
mock_get_backend.return_value = mock_stats
dag_id = "test_process_executor_events_stale_success_scheduled_after_defer"
dag_id = "test_process_executor_events_stale_success_rescheduled_after_defer"
task_id_1 = "dummy_task"

session = settings.Session()
Expand All @@ -845,7 +847,7 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer(
session.flush()
self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor])

ti1.state = State.SCHEDULED
ti1.state = rescheduled_state
ti1.next_method = "execute_callback"
ti1.queued_by_job_id = scheduler_job.id
ti1.try_number = 1
Expand All @@ -858,11 +860,11 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer(

self.job_runner._process_executor_events(executor=executor, session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.SCHEDULED
assert ti1.state == rescheduled_state
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats.incr.assert_not_called()

# Without next_method, scheduled + stale success is still a mismatch (e.g. external kill).
# Without next_method, scheduled/queued + stale success is still a mismatch (e.g. external kill).
ti1.next_method = None
session.merge(ti1)
session.commit()
Expand Down
Loading