Skip to content
1 change: 1 addition & 0 deletions airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
93f81a375991749656fa7cdb8bc8b2f875ca1f57fcc1824bff6039e12a1fcb9b
5 changes: 4 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``6222ce48e289`` (head) | ``134de42d3cb0`` | ``3.2.0`` | Add partition fields to DagModel. |
| ``c9e9e8c38cc7`` (head) | ``6222ce48e289`` | ``3.2.0`` | Add indexes on dag_version_id columns for db cleanup |
| | | | performance. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``6222ce48e289`` | ``134de42d3cb0`` | ``3.2.0`` | Add partition fields to DagModel. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``134de42d3cb0`` | ``e42d9fcd10d9`` | ``3.2.0`` | Add partition_key to backfill_dag_run. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add indexes on dag_version_id columns for db cleanup performance.

When running `airflow db clean -t dag_version`, the cleanup process checks
foreign key references from task_instance and dag_run. These columns are
unindexed, so the checks can degrade into full table scans.

This migration adds the missing indexes to speed up dag_version cleanup.

See: https://github.com/apache/airflow/issues/60145

Revision ID: c9e9e8c38cc7
Revises: 6222ce48e289
Create Date: 2026-03-11 23:10:00.000000

"""

from __future__ import annotations

from alembic import op

revision = "c9e9e8c38cc7"
down_revision = "6222ce48e289"
branch_labels = None
depends_on = None
airflow_version = "3.2.0"


def upgrade():
"""Add dag_version cleanup indexes."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.create_index("idx_task_instance_dag_version_id", ["dag_version_id"], unique=False)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.create_index("idx_dag_run_created_dag_version_id", ["created_dag_version_id"], unique=False)


def downgrade():
"""Remove dag_version cleanup indexes."""
conn = op.get_bind()

# MySQL can bind a foreign key to the newly-created index, so dropping that index
# requires dropping and re-creating the foreign key constraint.
if conn.dialect.name == "mysql":
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("created_dag_version_id_fkey", type_="foreignkey")
batch_op.drop_index("idx_dag_run_created_dag_version_id")
batch_op.create_foreign_key(
"created_dag_version_id_fkey",
"dag_version",
["created_dag_version_id"],
["id"],
ondelete="set null",
)

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("task_instance_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_index("idx_task_instance_dag_version_id")
batch_op.create_foreign_key(
batch_op.f("task_instance_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="RESTRICT",
)
return

op.drop_index("idx_dag_run_created_dag_version_id", table_name="dag_run")
op.drop_index("idx_task_instance_dag_version_id", table_name="task_instance")
1 change: 1 addition & 0 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class DagRun(Base, LoggingMixin):
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"),
Index("idx_dag_run_dag_id", dag_id),
Index("idx_dag_run_created_dag_version_id", created_dag_version_id),
Index("idx_dag_run_run_after", run_after),
Index(
"idx_dag_run_running_dags",
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
__table_args__ = (
Index("ti_dag_state", dag_id, state),
Index("ti_dag_run", dag_id, run_id),
Index("idx_task_instance_dag_version_id", dag_version_id),
Index("ti_state", state),
Index("ti_state_lkp", dag_id, task_id, run_id, state),
Index("ti_pool", pool, state, priority_weight),
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class MappedClassProtocol(Protocol):
"3.0.3": "fe199e1abd77",
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
"3.2.0": "6222ce48e289",
"3.2.0": "c9e9e8c38cc7",
}

# Prefix used to identify tables holding data moved during migration.
Expand Down
Loading