diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index d04ee85c2022c..9182f76b43dbd 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1212,7 +1212,7 @@ def process_executor_events( 1. **Normal task completion**: Updates task states for successful/failed tasks 2. **External termination**: Detects tasks killed outside Airflow and marks them as failed 3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors, - and tasks moved to ``scheduled`` after a trigger fired so a stale executor success from the + and tasks moved to ``scheduled`` or ``queued`` after a trigger fired so a stale executor success from the pre-deferral worker exit does not fail the task instance 4. **Callback processing**: Sends task callback requests to DAG Processor for execution 5. **Email notifications**: Sends email notification requests to DAG Processor @@ -1370,8 +1370,8 @@ def process_executor_events( # - in this case we should mark it as failed here. # 2) the TI has been requeued after getting deferred - in this case either our executor has it # or the TI is queued by another job. Either ways we should not fail it. - # 3) the trigger already put the TI back to scheduled (resume after defer) but the executor success - # from the worker exit after defer() has not been processed yet - should not fail it. + # 3) the trigger already put the TI back to scheduled/queued (resume after defer) but the executor + # success from the worker exit after defer() has not been processed yet - should not fail it. # All of this could also happen if the state is "running", # but that is handled by the scheduler detecting task instances without heartbeats. @@ -1386,9 +1386,9 @@ def process_executor_events( ti.queued_by_job_id != job_id # Another scheduler has queued this task again or executor.has_task(ti) # This scheduler has this task already or ( - # Resume-after-defer: trigger moved TI to scheduled (next_method set) before we saw the + # Resume-after-defer: trigger moved TI to scheduled/queued (next_method set) before we saw the # executor success from the defer exit for the same try_number. - ti.state == TaskInstanceState.SCHEDULED + ti.state in (TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED) and state == TaskInstanceState.SUCCESS and ti.next_method is not None ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 3580136ad70b1..66e6df70cd998 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -819,17 +819,19 @@ def test_process_executor_events_ti_requeued( @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow._shared.observability.metrics.stats._get_backend") - def test_process_executor_events_stale_success_when_scheduled_after_defer( - self, mock_get_backend, mock_task_callback, dag_maker + @pytest.mark.parametrize("rescheduled_state", [State.SCHEDULED, State.QUEUED]) + def test_process_executor_events_stale_success_when_rescheduled_after_defer( + self, mock_get_backend, mock_task_callback, dag_maker, rescheduled_state ): """ - Trigger moved TI to scheduled (resume after defer) before executor success from defer exit arrived. + Trigger moved TI to scheduled/queued (resume after defer) before executor success from defer exit arrived. - Regression for https://github.com/apache/airflow/issues/66374 — must not treat as state mismatch. + Regression for https://github.com/apache/airflow/issues/66374 and + https://github.com/apache/airflow/issues/67287 — must not treat as state mismatch. """ mock_stats = mock.MagicMock(spec=StatsLogger) mock_get_backend.return_value = mock_stats - dag_id = "test_process_executor_events_stale_success_scheduled_after_defer" + dag_id = "test_process_executor_events_stale_success_rescheduled_after_defer" task_id_1 = "dummy_task" session = settings.Session() @@ -845,7 +847,7 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer( session.flush() self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) - ti1.state = State.SCHEDULED + ti1.state = rescheduled_state ti1.next_method = "execute_callback" ti1.queued_by_job_id = scheduler_job.id ti1.try_number = 1 @@ -858,11 +860,11 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer( self.job_runner._process_executor_events(executor=executor, session=session) ti1.refresh_from_db(session=session) - assert ti1.state == State.SCHEDULED + assert ti1.state == rescheduled_state self.job_runner.executor.callback_sink.send.assert_not_called() mock_stats.incr.assert_not_called() - # Without next_method, scheduled + stale success is still a mismatch (e.g. external kill). + # Without next_method, scheduled/queued + stale success is still a mismatch (e.g. external kill). ti1.next_method = None session.merge(ti1) session.commit()