From 56fcb82e68e2d218ee127677fbd4d5f78c556f6d Mon Sep 17 00:00:00 2001 From: ColtenOuO Date: Wed, 20 May 2026 01:06:29 +0800 Subject: [PATCH] Replace per-TI session.merge() with a single UPDATE keyed on TI.id when a dagrun_timeout fires. --- .../src/airflow/jobs/scheduler_job_runner.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 9a650b110c963..1641dfcc8aad0 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2403,10 +2403,14 @@ def _schedule_dag_run( key=lambda ti: ti.start_date or timezone.make_aware(datetime.min), default=None, ) - for task_instance in unfinished_task_instances: - task_instance.state = TaskInstanceState.SKIPPED - session.merge(task_instance) - session.flush() + if unfinished_task_instances: + session.execute( + update(TI) + .where(TI.id.in_(ti.id for ti in unfinished_task_instances)) + .values(state=TaskInstanceState.SKIPPED) + .execution_options(synchronize_session="fetch") + ) + session.flush() self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) if dag_run.state in State.finished_dr_states and dag_run.run_type in (