Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/64311.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add database indexes on ``serialized_dag(dag_id, last_updated)``, ``dag_code(dag_id)``, and ``task_instance(dag_version_id)`` to improve query performance.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add performance indexes on serialized_dag, dag_code, and task_instance.

Revision ID: 76984aa0347c
Revises: a4c2d171ae18
Create Date: 2026-03-27 00:00:00.000000

"""

from __future__ import annotations

from alembic import op

# revision identifiers, used by Alembic.
revision = "76984aa0347c"
down_revision = "a4c2d171ae18"
branch_labels = None
depends_on = None
airflow_version = "3.3.0"


def upgrade():
"""Add indexes for query performance on serialized_dag, dag_code, and task_instance."""
with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
batch_op.create_index(
"idx_serialized_dag_dag_id_last_updated", ["dag_id", "last_updated"], unique=False
)

with op.batch_alter_table("dag_code", schema=None) as batch_op:
batch_op.create_index("idx_dag_code_dag_id", ["dag_id"], unique=False)

# MySQL auto-creates an index on FK columns, so dag_version_id already has one.
conn = op.get_bind()
if conn.dialect.name != "mysql":
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.create_index("idx_task_instance_dag_version_id", ["dag_version_id"], unique=False)


def downgrade():
"""Remove performance indexes."""
conn = op.get_bind()

# MySQL requires an index on FK columns; since dag_version_id has a FK constraint,
# we cannot drop the index on MySQL.
if conn.dialect.name != "mysql":
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.drop_index("idx_task_instance_dag_version_id")

with op.batch_alter_table("dag_code", schema=None) as batch_op:
batch_op.drop_index("idx_dag_code_dag_id")

with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
batch_op.drop_index("idx_serialized_dag_dag_id_last_updated")
4 changes: 3 additions & 1 deletion airflow-core/src/airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import sqlalchemy as sa
import uuid6
from sqlalchemy import ForeignKey, String, Text, select
from sqlalchemy import ForeignKey, Index, String, Text, select
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import literal
Expand Down Expand Up @@ -71,6 +71,8 @@ class DagCode(Base):
)
dag_version = relationship("DagVersion", back_populates="dag_code", uselist=False)

__table_args__ = (Index("idx_dag_code_dag_id", dag_id),)

def __init__(self, dag_version, full_filepath: str, source_code: str | None = None):
self.dag_version = dag_version
self.fileloc = full_filepath
Expand Down
4 changes: 3 additions & 1 deletion airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from uuid import UUID

import uuid6
from sqlalchemy import JSON, ForeignKey, LargeBinary, String, Uuid, exists, select, tuple_, update
from sqlalchemy import JSON, ForeignKey, Index, LargeBinary, String, Uuid, exists, select, tuple_, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, backref, foreign, mapped_column, relationship
from sqlalchemy.sql.expression import func, literal
Expand Down Expand Up @@ -304,6 +304,8 @@ class SerializedDagModel(Base):
)
dag_hash: Mapped[str] = mapped_column(String(32), nullable=False)

__table_args__ = (Index("idx_serialized_dag_dag_id_last_updated", dag_id, last_updated),)

dag_runs = relationship(
DagRun,
primaryjoin=dag_id == foreign(DagRun.dag_id), # type: ignore[has-type]
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
Index("ti_pool", pool, state, priority_weight),
Index("ti_trigger_id", trigger_id),
Index("ti_heartbeat", last_heartbeat_at),
Index("idx_task_instance_dag_version_id", dag_version_id),
PrimaryKeyConstraint("id", name="task_instance_pkey"),
UniqueConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_composite_key"),
ForeignKeyConstraint(
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class MappedClassProtocol(Protocol):
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
"3.2.0": "1d6611b6ab7c",
"3.3.0": "a4c2d171ae18",
"3.3.0": "76984aa0347c",
}

# Prefix used to identify tables holding data moved during migration.
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/tests/unit/utils/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self, initialized_db):
lambda t: t[0] == "add_index" and t[1].name == "idx_ab_user_username",
lambda t: t[0] == "remove_index" and t[1].name == "idx_ab_register_user_username",
lambda t: t[0] == "remove_index" and t[1].name == "idx_ab_user_username",
# MySQL auto-creates an index on FK columns; the migration skips creating this
# index on MySQL to avoid a duplicate, but the ORM always declares it.
lambda t: t[0] == "add_index" and t[1].name == "idx_task_instance_dag_version_id",
]

for ignore in ignores:
Expand Down
Loading