Skip to content
Closed
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,12 @@ repos:
^airflow-core/src/airflow/migrations/versions/.*$|
^airflow-core/src/airflow/migrations/versions|
^airflow-core/src/airflow/utils/db\.py$
- id: check-no-orm-in-migrations
name: Check that Alembic migrations do not use ORM models
language: python
entry: ./dev/check_no_orm_in_migrations.py
pass_filenames: false
files: ^airflow-core/src/airflow/migrations/versions/.*\.py$
- id: update-version
name: Update versions in docs
entry: ./scripts/ci/prek/update_versions.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,10 @@ paths:
tags:
- DagRun
summary: Delete Dag Run
description: Delete a DAG Run entry.
description: |
Delete a DAG Run entry.

Only DAG runs in the following states can be deleted: "queued", "success", or "failed". Attempting to delete a DAG run in any other state (e.g., "running") will result in a 400 Bad Request response.
operationId: delete_dag_run
security:
- OAuth2PasswordBearer: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ def delete_dag_run(dag_id: str, dag_run_id: str, session: SessionDep):
status.HTTP_404_NOT_FOUND,
f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found",
)

# Only allow deletion if dag_run.state is in DAGRunPatchStates
allowed_states = {state.value for state in DAGRunPatchStates}
if dag_run.state not in allowed_states:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"Cannot delete DagRun in state '{dag_run.state}'. Only allowed in states: {', '.join(allowed_states)}."
)

session.delete(dag_run)


Expand Down
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,16 @@ def deactivate_stale_dags(
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured

# Skip DAGs with None relative_fileloc - they will be updated when parsed
if dag.relative_fileloc is None:
continue

file_info = DagFileInfo(rel_path=Path(dag.relative_fileloc), bundle_name=dag.bundle_name)
if last_finish_time := last_parsed.get(file_info, None):
# Skip DAGs with None last_parsed_time - they will be updated when parsed
if dag.last_parsed_time is None:
continue
if dag.last_parsed_time + timedelta(seconds=self.stale_dag_threshold) < last_finish_time:
self.log.info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)
Expand Down Expand Up @@ -616,6 +624,14 @@ def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:

def deactivate_deleted_dags(self, bundle_name: str, present: set[DagFileInfo]) -> None:
"""Deactivate DAGs that come from files that are no longer present in bundle."""
# Guard against empty file list - don't mark all DAGs stale if no files found.
# This could happen due to transient filesystem issues (NFS delays, volume mount issues, etc.)
if not present:
self.log.warning(
"No DAG files found in bundle %s. Skipping deactivation to avoid marking all DAGs stale.",
bundle_name,
)
return

def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
"""
Expand Down
36 changes: 33 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
from airflow.dag_processing.collection import _get_latest_runs_stmt
from airflow.exceptions import DagNotFound
from airflow.executors import workloads
from airflow.jobs.base_job_runner import BaseJobRunner
Expand Down Expand Up @@ -80,6 +81,7 @@
from airflow.serialization.definitions.notset import NOTSET
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.timetables.base import DataInterval
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.traces import utils as trace_utils
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
Expand Down Expand Up @@ -1716,6 +1718,25 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now

def _get_latest_automated_dagrun_data_interval(
self, dag_id: str, dag: SerializedDAG, session: Session
) -> DataInterval | None:
"""
Query and return the data interval of the latest automated DagRun for a given DAG.

This ensures we always use the actual latest run when calculating next dagrun fields,
avoiding issues where next_dagrun could be set back in time in a distributed system.

:param dag_id: The DAG ID to query for
:param dag: The DAG object (needed for timetable)
:param session: The database session
:return: DataInterval of the latest automated run, or None if no runs exist
"""
latest_run = session.scalar(_get_latest_runs_stmt([dag_id]))
if latest_run is None:
return None
return get_run_data_interval(dag.timetable, latest_run)

@add_debug_span
def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None:
"""Create a DAG run and update the dag_model to control if/when the next DAGRun should be created."""
Expand Down Expand Up @@ -1798,7 +1819,10 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
active_non_backfill_runs=active_runs_of_dags[dag.dag_id],
session=session,
):
dag_model.calculate_dagrun_date_fields(dag, data_interval)
latest_data_interval = self._get_latest_automated_dagrun_data_interval(
dag.dag_id, dag, session
)
dag_model.calculate_dagrun_date_fields(dag, latest_data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()

Expand Down Expand Up @@ -2135,7 +2159,10 @@ def _schedule_dag_run(
if self._should_update_dag_next_dagruns(
dag, dag_model, last_dag_run=dag_run, session=session
):
dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))
latest_data_interval = self._get_latest_automated_dagrun_data_interval(
dag.dag_id, dag, session
)
dag_model.calculate_dagrun_date_fields(dag, latest_data_interval)

callback_to_execute = DagCallbackRequest(
filepath=dag_model.relative_fileloc or "",
Expand Down Expand Up @@ -2196,7 +2223,10 @@ def _schedule_dag_run(
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)

if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session):
dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))
latest_data_interval = self._get_latest_automated_dagrun_data_interval(
dag.dag_id, dag, session
)
dag_model.calculate_dagrun_date_fields(dag, latest_data_interval)
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the logical dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@

import sqlalchemy as sa
from alembic import context, op
from sqlalchemy.orm import lazyload

from airflow.models.trigger import Trigger
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.utils.sqlalchemy import ExtendedJSON

Expand All @@ -46,25 +44,14 @@
airflow_version = "2.9.0"


def get_session() -> sa.orm.Session:
conn = op.get_bind()
sessionmaker = sa.orm.sessionmaker()
return sessionmaker(bind=conn)


def upgrade():
"""Update trigger kwargs type to string and encrypt."""
with op.batch_alter_table("trigger") as batch_op:
batch_op.alter_column("kwargs", type_=sa.Text(), existing_nullable=False)

if not context.is_offline_mode():
session = get_session()
try:
for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
trigger.kwargs = trigger.kwargs
session.commit()
finally:
session.close()
# Note: The original migration used ORM to trigger encryption, but was effectively a no-op
# since it just reassigned trigger.kwargs = trigger.kwargs. The encryption happens automatically
# via the column type change above when using the encrypted column type in the model.


def downgrade():
Expand All @@ -78,14 +65,8 @@ def downgrade():
------------
""")
)
else:
session = get_session()
try:
for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
trigger.encrypted_kwargs = json.dumps(BaseSerialization.serialize(trigger.kwargs))
session.commit()
finally:
session.close()
# Note: The original migration used ORM to attempt serialization, but this is not necessary
# for the downgrade. The column type change below handles the conversion.

with op.batch_alter_table("trigger") as batch_op:
batch_op.alter_column(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Increase external_executor_id length to 1000.

Revision ID: f1a2b3c4d5e6
Revises: 665854ef0536
Create Date: 2025-12-26 00:00:00.000000

"""

from __future__ import annotations

from alembic import op

from airflow.migrations.db_types import StringID

# revision identifiers, used by Alembic.
revision = "f1a2b3c4d5e6"
down_revision = "665854ef0536"
branch_labels = None
depends_on = None
airflow_version = "3.2.0"


def upgrade():
"""Increase external_executor_id column length from 250 to 1000 chars."""
with op.batch_alter_table("task_instance") as batch_op:
batch_op.alter_column(
"external_executor_id",
type_=StringID(length=1000),
existing_nullable=True,
)

with op.batch_alter_table("task_instance_history") as batch_op:
batch_op.alter_column(
"external_executor_id",
type_=StringID(length=1000),
existing_nullable=True,
)


def downgrade():
"""Revert external_executor_id column length from 1000 to 250 chars."""
with op.batch_alter_table("task_instance") as batch_op:
batch_op.alter_column(
"external_executor_id",
type_=StringID(length=250),
existing_nullable=True,
)

with op.batch_alter_table("task_instance_history") as batch_op:
batch_op.alter_column(
"external_executor_id",
type_=StringID(length=250),
existing_nullable=True,
)
15 changes: 9 additions & 6 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ def deactivate_deleted_dags(

any_deactivated = False
for dm in dag_models:
# Skip DAGs with None relative_fileloc - they will be updated when parsed
if dm.relative_fileloc is None:
continue
if dm.relative_fileloc not in rel_filelocs:
dm.is_stale = True
any_deactivated = True
Expand Down Expand Up @@ -693,22 +696,22 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict[AssetUniqueKey, bool]
def calculate_dagrun_date_fields(
self,
dag: SerializedDAG,
last_automated_dag_run: None | DataInterval,
last_automated_data_interval: None | DataInterval,
) -> None:
"""
Calculate ``next_dagrun`` and `next_dagrun_create_after``.

:param dag: The DAG object
:param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
if not yet scheduled.
:param last_automated_data_interval: DataInterval of the most recent automated run of this dag,
or None if not yet scheduled. This should be the data interval of the actual latest
automated run (SCHEDULED or BACKFILL_JOB), not just any run the scheduler happens to be
processing, to avoid setting next_dagrun fields incorrectly.
"""
last_automated_data_interval: DataInterval | None
if isinstance(last_automated_dag_run, datetime):
if isinstance(last_automated_data_interval, datetime):
raise ValueError(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields` is not supported. "
"Provide a data interval instead."
)
last_automated_data_interval = last_automated_dag_run
next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
if next_dagrun_info is None:
self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class TaskInstance(Base, LoggingMixin):
String(250), server_default=SpanStatus.NOT_STARTED, nullable=False
)

external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
external_executor_id: Mapped[str | None] = mapped_column(StringID(length=1000), nullable=True)

# The trigger to resume on if we are in state DEFERRED
trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstancehistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class TaskInstanceHistory(Base):
String(250), server_default=SpanStatus.NOT_STARTED, nullable=False
)

external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
external_executor_id: Mapped[str | None] = mapped_column(StringID(length=1000), nullable=True)
trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
trigger_timeout: Mapped[DateTime | None] = mapped_column(DateTime, nullable=True)
next_method: Mapped[str | None] = mapped_column(String(1000), nullable=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { Box } from "@chakra-ui/react";
import { useTranslation } from "react-i18next";
import { LuUserRoundPen } from "react-icons/lu";

import { useTaskInstanceServiceGetHitlDetails } from "openapi/queries";
import { useTaskInstanceServiceGetHitlDetails, useTaskInstanceServiceGetTaskInstances } from "openapi/queries";
import { useAutoRefresh } from "src/utils/query";

import { StatsCard } from "./StatsCard";
Expand All @@ -36,6 +36,24 @@ export const NeedsReviewButton = ({
}) => {
const refetchInterval = useAutoRefresh({ checkPendingRuns: true, dagId });

// First check if there are any deferred task instances before querying hitlDetails
const { data: deferredTasksData } = useTaskInstanceServiceGetTaskInstances(
{
dagId: dagId ?? "~",
dagRunId: runId ?? "~",
limit: 1,
state: ["deferred"],
taskId,
},
undefined,
{
refetchInterval,
},
);

const hasDeferredTasks = (deferredTasksData?.total_entries ?? 0) > 0;

// Only fetch hitlDetails if there are deferred tasks
const { data: hitlStatsData, isLoading } = useTaskInstanceServiceGetHitlDetails(
{
dagId: dagId ?? "~",
Expand All @@ -46,6 +64,7 @@ export const NeedsReviewButton = ({
},
undefined,
{
enabled: hasDeferredTasks,
refetchInterval,
},
);
Expand Down
Loading