diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 2bfa785367b31..ac9ae2564b815 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -430,6 +430,12 @@ repos:
^airflow-core/src/airflow/migrations/versions/.*$|
^airflow-core/src/airflow/migrations/versions|
^airflow-core/src/airflow/utils/db\.py$
+ - id: check-no-orm-in-migrations
+ name: Check that Alembic migrations do not use ORM models
+ language: python
+ entry: ./dev/check_no_orm_in_migrations.py
+ pass_filenames: false
+ files: ^airflow-core/src/airflow/migrations/versions/.*\.py$
- id: update-version
name: Update versions in docs
entry: ./scripts/ci/prek/update_versions.py
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index d1ed9ba9f3e0e..482316b60d665 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -1753,7 +1753,10 @@ paths:
tags:
- DagRun
summary: Delete Dag Run
- description: Delete a DAG Run entry.
+ description: |
+ Delete a DAG Run entry.
+
+ Only DAG runs in the following states can be deleted: "queued", "success", or "failed". Attempting to delete a DAG run in any other state (e.g., "running") will result in a 400 Bad Request response.
operationId: delete_dag_run
security:
- OAuth2PasswordBearer: []
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index ae644e6cfc22d..cd1ed0f090191 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -138,6 +138,15 @@ def delete_dag_run(dag_id: str, dag_run_id: str, session: SessionDep):
status.HTTP_404_NOT_FOUND,
f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found",
)
+
+ # Only allow deletion if dag_run.state is in DAGRunPatchStates
+ allowed_states = {state.value for state in DAGRunPatchStates}
+ if dag_run.state not in allowed_states:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ f"Cannot delete DagRun in state '{dag_run.state}'. Only allowed in states: {', '.join(allowed_states)}."
+ )
+
session.delete(dag_run)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py
index ae45a690074c0..53e47b906226d 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -318,8 +318,16 @@ def deactivate_stale_dags(
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured
+
+ # Skip DAGs with None relative_fileloc - they will be updated when parsed
+ if dag.relative_fileloc is None:
+ continue
+
file_info = DagFileInfo(rel_path=Path(dag.relative_fileloc), bundle_name=dag.bundle_name)
if last_finish_time := last_parsed.get(file_info, None):
+ # Skip DAGs with None last_parsed_time - they will be updated when parsed
+ if dag.last_parsed_time is None:
+ continue
if dag.last_parsed_time + timedelta(seconds=self.stale_dag_threshold) < last_finish_time:
self.log.info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)
@@ -616,6 +624,14 @@ def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
def deactivate_deleted_dags(self, bundle_name: str, present: set[DagFileInfo]) -> None:
"""Deactivate DAGs that come from files that are no longer present in bundle."""
+ # Guard against empty file list - don't mark all DAGs stale if no files found.
+ # This could happen due to transient filesystem issues (NFS delays, volume mount issues, etc.)
+ if not present:
+ self.log.warning(
+ "No DAG files found in bundle %s. Skipping deactivation to avoid marking all DAGs stale.",
+ bundle_name,
+ )
+ return
def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
"""
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 87ed4646774e1..bc6a21470c5db 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -48,6 +48,7 @@
)
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
+from airflow.dag_processing.collection import _get_latest_runs_stmt
from airflow.exceptions import DagNotFound
from airflow.executors import workloads
from airflow.jobs.base_job_runner import BaseJobRunner
@@ -80,6 +81,7 @@
from airflow.serialization.definitions.notset import NOTSET
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
+from airflow.timetables.base import DataInterval
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.traces import utils as trace_utils
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
@@ -1716,6 +1718,25 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now
+ def _get_latest_automated_dagrun_data_interval(
+ self, dag_id: str, dag: SerializedDAG, session: Session
+ ) -> DataInterval | None:
+ """
+ Query and return the data interval of the latest automated DagRun for a given DAG.
+
+ This ensures we always use the actual latest run when calculating next dagrun fields,
+ avoiding issues where next_dagrun could be set back in time in a distributed system.
+
+ :param dag_id: The DAG ID to query for
+ :param dag: The DAG object (needed for timetable)
+ :param session: The database session
+ :return: DataInterval of the latest automated run, or None if no runs exist
+ """
+ latest_run = session.scalar(_get_latest_runs_stmt([dag_id]))
+ if latest_run is None:
+ return None
+ return get_run_data_interval(dag.timetable, latest_run)
+
@add_debug_span
def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None:
"""Create a DAG run and update the dag_model to control if/when the next DAGRun should be created."""
@@ -1798,7 +1819,10 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
active_non_backfill_runs=active_runs_of_dags[dag.dag_id],
session=session,
):
- dag_model.calculate_dagrun_date_fields(dag, data_interval)
+ latest_data_interval = self._get_latest_automated_dagrun_data_interval(
+ dag.dag_id, dag, session
+ )
+ dag_model.calculate_dagrun_date_fields(dag, latest_data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()
@@ -2135,7 +2159,10 @@ def _schedule_dag_run(
if self._should_update_dag_next_dagruns(
dag, dag_model, last_dag_run=dag_run, session=session
):
- dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))
+ latest_data_interval = self._get_latest_automated_dagrun_data_interval(
+ dag.dag_id, dag, session
+ )
+ dag_model.calculate_dagrun_date_fields(dag, latest_data_interval)
callback_to_execute = DagCallbackRequest(
filepath=dag_model.relative_fileloc or "",
@@ -2196,7 +2223,10 @@ def _schedule_dag_run(
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session):
- dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))
+ latest_data_interval = self._get_latest_automated_dagrun_data_interval(
+ dag.dag_id, dag, session
+ )
+ dag_model.calculate_dagrun_date_fields(dag, latest_data_interval)
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the logical dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
diff --git a/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py b/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py
index 08843afb0e3b2..36b28e68eaffe 100644
--- a/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py
+++ b/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py
@@ -32,9 +32,7 @@
import sqlalchemy as sa
from alembic import context, op
-from sqlalchemy.orm import lazyload
-from airflow.models.trigger import Trigger
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.utils.sqlalchemy import ExtendedJSON
@@ -46,25 +44,14 @@
airflow_version = "2.9.0"
-def get_session() -> sa.orm.Session:
- conn = op.get_bind()
- sessionmaker = sa.orm.sessionmaker()
- return sessionmaker(bind=conn)
-
-
def upgrade():
"""Update trigger kwargs type to string and encrypt."""
with op.batch_alter_table("trigger") as batch_op:
batch_op.alter_column("kwargs", type_=sa.Text(), existing_nullable=False)
- if not context.is_offline_mode():
- session = get_session()
- try:
- for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
- trigger.kwargs = trigger.kwargs
- session.commit()
- finally:
- session.close()
+ # Note: The original migration used ORM to trigger encryption, but was effectively a no-op
+ # since it just reassigned trigger.kwargs = trigger.kwargs. The encryption happens automatically
+ # via the column type change above when using the encrypted column type in the model.
def downgrade():
@@ -78,14 +65,8 @@ def downgrade():
------------
""")
)
- else:
- session = get_session()
- try:
- for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
- trigger.encrypted_kwargs = json.dumps(BaseSerialization.serialize(trigger.kwargs))
- session.commit()
- finally:
- session.close()
+ # Note: The original migration used ORM to attempt serialization, but this is not necessary
+ # for the downgrade. The column type change below handles the conversion.
with op.batch_alter_table("trigger") as batch_op:
batch_op.alter_column(
diff --git a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_increase_external_executor_id_length.py b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_increase_external_executor_id_length.py
new file mode 100644
index 0000000000000..5b0859c28e30c
--- /dev/null
+++ b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_increase_external_executor_id_length.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Increase external_executor_id length to 1000.
+
+Revision ID: f1a2b3c4d5e6
+Revises: 665854ef0536
+Create Date: 2025-12-26 00:00:00.000000
+
+"""
+
+from __future__ import annotations
+
+from alembic import op
+
+from airflow.migrations.db_types import StringID
+
+# revision identifiers, used by Alembic.
+revision = "f1a2b3c4d5e6"
+down_revision = "665854ef0536"
+branch_labels = None
+depends_on = None
+airflow_version = "3.2.0"
+
+
+def upgrade():
+ """Increase external_executor_id column length from 250 to 1000 chars."""
+ with op.batch_alter_table("task_instance") as batch_op:
+ batch_op.alter_column(
+ "external_executor_id",
+ type_=StringID(length=1000),
+ existing_nullable=True,
+ )
+
+ with op.batch_alter_table("task_instance_history") as batch_op:
+ batch_op.alter_column(
+ "external_executor_id",
+ type_=StringID(length=1000),
+ existing_nullable=True,
+ )
+
+
+def downgrade():
+ """Revert external_executor_id column length from 1000 to 250 chars."""
+ with op.batch_alter_table("task_instance") as batch_op:
+ batch_op.alter_column(
+ "external_executor_id",
+ type_=StringID(length=250),
+ existing_nullable=True,
+ )
+
+ with op.batch_alter_table("task_instance_history") as batch_op:
+ batch_op.alter_column(
+ "external_executor_id",
+ type_=StringID(length=250),
+ existing_nullable=True,
+ )
diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py
index 852ec45d8fb67..6d21be0278e60 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -586,6 +586,9 @@ def deactivate_deleted_dags(
any_deactivated = False
for dm in dag_models:
+ # Skip DAGs with None relative_fileloc - they will be updated when parsed
+ if dm.relative_fileloc is None:
+ continue
if dm.relative_fileloc not in rel_filelocs:
dm.is_stale = True
any_deactivated = True
@@ -693,22 +696,22 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict[AssetUniqueKey, bool]
def calculate_dagrun_date_fields(
self,
dag: SerializedDAG,
- last_automated_dag_run: None | DataInterval,
+ last_automated_data_interval: None | DataInterval,
) -> None:
"""
Calculate ``next_dagrun`` and `next_dagrun_create_after``.
:param dag: The DAG object
- :param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
- if not yet scheduled.
+ :param last_automated_data_interval: DataInterval of the most recent automated run of this dag,
+ or None if not yet scheduled. This should be the data interval of the actual latest
+ automated run (SCHEDULED or BACKFILL_JOB), not just any run the scheduler happens to be
+ processing, to avoid setting next_dagrun fields incorrectly.
"""
- last_automated_data_interval: DataInterval | None
- if isinstance(last_automated_dag_run, datetime):
+ if isinstance(last_automated_data_interval, datetime):
raise ValueError(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields` is not supported. "
"Provide a data interval instead."
)
- last_automated_data_interval = last_automated_dag_run
next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
if next_dagrun_info is None:
self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None
diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py
index 2c794a709895f..aec477d158804 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -427,7 +427,7 @@ class TaskInstance(Base, LoggingMixin):
String(250), server_default=SpanStatus.NOT_STARTED, nullable=False
)
- external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
+ external_executor_id: Mapped[str | None] = mapped_column(StringID(length=1000), nullable=True)
# The trigger to resume on if we are in state DEFERRED
trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py b/airflow-core/src/airflow/models/taskinstancehistory.py
index 3e885aec0956d..8374db3250503 100644
--- a/airflow-core/src/airflow/models/taskinstancehistory.py
+++ b/airflow-core/src/airflow/models/taskinstancehistory.py
@@ -105,7 +105,7 @@ class TaskInstanceHistory(Base):
String(250), server_default=SpanStatus.NOT_STARTED, nullable=False
)
- external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
+ external_executor_id: Mapped[str | None] = mapped_column(StringID(length=1000), nullable=True)
trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
trigger_timeout: Mapped[DateTime | None] = mapped_column(DateTime, nullable=True)
next_method: Mapped[str | None] = mapped_column(String(1000), nullable=True)
diff --git a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
index c0ec70f44ddcf..51582c67a95ef 100644
--- a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
+++ b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
@@ -20,7 +20,7 @@ import { Box } from "@chakra-ui/react";
import { useTranslation } from "react-i18next";
import { LuUserRoundPen } from "react-icons/lu";
-import { useTaskInstanceServiceGetHitlDetails } from "openapi/queries";
+import { useTaskInstanceServiceGetHitlDetails, useTaskInstanceServiceGetTaskInstances } from "openapi/queries";
import { useAutoRefresh } from "src/utils/query";
import { StatsCard } from "./StatsCard";
@@ -36,6 +36,24 @@ export const NeedsReviewButton = ({
}) => {
const refetchInterval = useAutoRefresh({ checkPendingRuns: true, dagId });
+ // First check if there are any deferred task instances before querying hitlDetails
+ const { data: deferredTasksData } = useTaskInstanceServiceGetTaskInstances(
+ {
+ dagId: dagId ?? "~",
+ dagRunId: runId ?? "~",
+ limit: 1,
+ state: ["deferred"],
+ taskId,
+ },
+ undefined,
+ {
+ refetchInterval,
+ },
+ );
+
+ const hasDeferredTasks = (deferredTasksData?.total_entries ?? 0) > 0;
+
+ // Only fetch hitlDetails if there are deferred tasks
const { data: hitlStatsData, isLoading } = useTaskInstanceServiceGetHitlDetails(
{
dagId: dagId ?? "~",
@@ -46,6 +64,7 @@ export const NeedsReviewButton = ({
},
undefined,
{
+ enabled: hasDeferredTasks,
refetchInterval,
},
);
diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.test.ts b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.test.ts
new file mode 100644
index 0000000000000..8a0850fd1b0ec
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.test.ts
@@ -0,0 +1,231 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { describe, it, expect, vi, beforeEach } from "vitest";
+import { renderHook, waitFor } from "@testing-library/react";
+import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
+import type { ReactNode } from "react";
+
+import { useRefreshOnNewDagRuns } from "./useRefreshOnNewDagRuns";
+import * as openApiQueries from "../openapi/queries";
+import * as useConfigModule from "./useConfig";
+
+// Mock the dependencies
+vi.mock("../openapi/queries", () => ({
+ useDagServiceGetDagDetailsKey: "dag-details-key",
+ UseDagRunServiceGetDagRunsKeyFn: vi.fn(() => ["dag-runs-key"]),
+ UseDagServiceGetDagDetailsKeyFn: vi.fn(() => ["dag-details-fn-key"]),
+ useDagServiceGetDagsUi: "dags-ui-key",
+ UseTaskInstanceServiceGetTaskInstancesKeyFn: vi.fn(() => ["task-instances-key"]),
+ UseGridServiceGetDagStructureKeyFn: vi.fn(() => ["grid-structure-key"]),
+ UseGridServiceGetGridRunsKeyFn: vi.fn(() => ["grid-runs-key"]),
+ useDagServiceGetLatestRunInfo: vi.fn(),
+}));
+
+vi.mock("./useConfig", () => ({
+ useConfig: vi.fn(),
+}));
+
+describe("useRefreshOnNewDagRuns", () => {
+ let queryClient: QueryClient;
+ let wrapper: ({ children }: { children: ReactNode }) => JSX.Element;
+
+ beforeEach(() => {
+ // Create a new QueryClient for each test
+ queryClient = new QueryClient({
+ defaultOptions: {
+ queries: {
+ retry: false,
+ },
+ },
+ });
+
+ // Create wrapper with QueryClientProvider
+ wrapper = ({ children }: { children: ReactNode }) => (
+ {children}
+ );
+
+ // Mock useConfig to return auto refresh interval
+ vi.mocked(useConfigModule.useConfig).mockReturnValue(5);
+
+ // Reset all mocks
+ vi.clearAllMocks();
+ });
+
+ it("should not invalidate queries on initial load when latestDagRunId is undefined", () => {
+ const dagId = "test_dag";
+ const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries");
+
+ // Mock useDagServiceGetLatestRunInfo to return undefined initially
+ vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({
+ data: undefined,
+ } as ReturnType);
+
+ renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper });
+
+ // Should not invalidate queries when data is undefined on initial load
+ expect(invalidateQueriesSpy).not.toHaveBeenCalled();
+ });
+
+ it("should not invalidate queries on initial load when latestDagRunId is first fetched", () => {
+ const dagId = "test_dag";
+ const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries");
+
+ // Mock useDagServiceGetLatestRunInfo to return a run_id on first render
+ vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({
+ data: { run_id: "run_123" },
+ } as ReturnType);
+
+ renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper });
+
+ // Should not invalidate queries on initial load even when latestDagRunId is present
+ expect(invalidateQueriesSpy).not.toHaveBeenCalled();
+ });
+
+ it("should invalidate queries when a new DAG run appears", async () => {
+ const dagId = "test_dag";
+ const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries");
+
+ // Start with initial run_id
+ vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({
+ data: { run_id: "run_123" },
+ } as ReturnType);
+
+ const { rerender } = renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper });
+
+ // Initial render should not invalidate
+ expect(invalidateQueriesSpy).not.toHaveBeenCalled();
+
+ // Update to a new run_id
+ vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({
+ data: { run_id: "run_456" },
+ } as ReturnType);
+
+ rerender();
+
+ // Should now invalidate queries because run_id changed
+ await waitFor(() => {
+ expect(invalidateQueriesSpy).toHaveBeenCalled();
+ });
+
+ // Verify all the expected query keys were invalidated
+ expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dags-ui-key"] });
+ expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dag-details-key"] });
+ expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dag-details-fn-key"] });
+ expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dag-runs-key"] });
+ expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["task-instances-key"] });
+ expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["grid-structure-key"] });
+ expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["grid-runs-key"] });
+ });
+
+ it("should not invalidate queries when latestDagRunId remains the same", () => {
+ const dagId = "test_dag";
+ const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries");
+
+ // Mock with same run_id
+ vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({
+ data: { run_id: "run_123" },
+ } as ReturnType);
+
+ const { rerender } = renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper });
+
+ // Initial render - no invalidation
+ expect(invalidateQueriesSpy).not.toHaveBeenCalled();
+
+ // Rerender with same run_id
+ rerender();
+
+ // Should still not invalidate because run_id hasn't changed
+ expect(invalidateQueriesSpy).not.toHaveBeenCalled();
+ });
+
+ it("should handle transition from undefined to defined run_id without invalidation on first occurrence", async () => {
+ const dagId = "test_dag";
+ const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries");
+
+ // Start with undefined
+ vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({
+ data: undefined,
+ } as ReturnType);
+
+ const { rerender } = renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper });
+
+ // No invalidation on initial undefined
+ expect(invalidateQueriesSpy).not.toHaveBeenCalled();
+
+ // Update to have a run_id (simulating data being fetched)
+ vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({
+ data: { run_id: "run_123" },
+ } as ReturnType);
+
+ rerender();
+
+ // Should not invalidate on first transition from undefined to defined
+ expect(invalidateQueriesSpy).not.toHaveBeenCalled();
+ });
+
+ it("should not fetch latest run info when hasPendingRuns is true", () => {
+ const dagId = "test_dag";
+
+ renderHook(() => useRefreshOnNewDagRuns(dagId, true), { wrapper });
+
+ // Verify useDagServiceGetLatestRunInfo was called with enabled: false
+ expect(openApiQueries.useDagServiceGetLatestRunInfo).toHaveBeenCalledWith(
+ { dagId },
+ undefined,
+ expect.objectContaining({
+ enabled: false,
+ }),
+ );
+ });
+
+ it("should use custom auto refresh interval from config", () => {
+ const dagId = "test_dag";
+ const customInterval = 10;
+
+ vi.mocked(useConfigModule.useConfig).mockReturnValue(customInterval);
+
+ renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper });
+
+ // Verify useDagServiceGetLatestRunInfo was called with custom interval
+ expect(openApiQueries.useDagServiceGetLatestRunInfo).toHaveBeenCalledWith(
+ { dagId },
+ undefined,
+ expect.objectContaining({
+ refetchInterval: customInterval * 1000, // Should be in milliseconds
+ }),
+ );
+ });
+
+ it("should use default 5 second interval when auto_refresh_interval is not set", () => {
+ const dagId = "test_dag";
+
+ vi.mocked(useConfigModule.useConfig).mockReturnValue(0);
+
+ renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper });
+
+ // Verify useDagServiceGetLatestRunInfo was called with default 5000ms
+ expect(openApiQueries.useDagServiceGetLatestRunInfo).toHaveBeenCalledWith(
+ { dagId },
+ undefined,
+ expect.objectContaining({
+ refetchInterval: 5000,
+ }),
+ );
+ });
+});
diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
index 5ceae8bbf6921..8e8dc573b8b1f 100644
--- a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
@@ -45,6 +45,13 @@ export const useRefreshOnNewDagRuns = (dagId: string, hasPendingRuns: boolean |
useEffect(() => {
const latestDagRunId = latestDagRun?.run_id;
+ // On initial load, just set the ref without invalidating cache
+ if (previousDagRunIdRef.current === undefined) {
+ previousDagRunIdRef.current = latestDagRunId;
+ return;
+ }
+
+ // Only invalidate cache when there's a new DAG run (not on initial load)
if ((latestDagRunId ?? "") && previousDagRunIdRef.current !== latestDagRunId) {
previousDagRunIdRef.current = latestDagRunId;
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 642fa1524c44e..4583db2c969fb 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -457,14 +457,14 @@ def test_limit_and_offset(self, test_client, query_params, expected_dag_id_order
[
{
"type": "greater_than_equal",
- "loc": ["query", "limit"],
+ "loc": ["query", "offset"],
"msg": "Input should be greater than or equal to 0",
"input": "-1",
"ctx": {"ge": 0},
},
{
"type": "greater_than_equal",
- "loc": ["query", "offset"],
+ "loc": ["query", "limit"],
"msg": "Input should be greater than or equal to 0",
"input": "-1",
"ctx": {"ge": 0},
@@ -713,7 +713,7 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail):
[DAG1_RUN2_ID],
), # Test for debug key
("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version"
- ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist
+ ("~", {"conf_contains": "non_existent_key"}, []), # Test for a key that doesn't exist
],
)
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
@@ -1333,6 +1333,40 @@ def test_should_respond_403(self, unauthorized_test_client):
response = unauthorized_test_client.delete(f"/dags/{DAG1_ID}/dagRuns/invalid")
assert response.status_code == 403
+ def test_delete_dag_run_running_state(self, test_client, dag_maker, session):
+ """Should not allow deleting a dag run in RUNNING state."""
+ with dag_maker(DAG1_ID, schedule=None, start_date=START_DATE1, serialized=True):
+ EmptyOperator(task_id="task_1")
+ dag_run = dag_maker.create_dagrun(
+ run_id="run_running",
+ state=DagRunState.RUNNING,
+ run_type=DagRunType.MANUAL,
+ triggered_by=DagRunTriggeredByType.UI,
+ logical_date=LOGICAL_DATE1,
+ )
+ session.flush()
+ response = test_client.delete(f"/dags/{DAG1_ID}/dagRuns/run_running")
+ assert response.status_code == 400
+ assert "Cannot delete DagRun in state" in response.json()["detail"]
+
+ def test_delete_dag_run_allowed_states(self, test_client, dag_maker, session):
+ """Should allow deleting dag runs in allowed states (QUEUED, SUCCESS, FAILED)."""
+ allowed_states = [DagRunState.QUEUED, DagRunState.SUCCESS, DagRunState.FAILED]
+ for state in allowed_states:
+ run_id = f"run_{state}"
+ with dag_maker(DAG1_ID, schedule=None, start_date=START_DATE1, serialized=True):
+ EmptyOperator(task_id="task_1")
+ dag_run = dag_maker.create_dagrun(
+ run_id=run_id,
+ state=state,
+ run_type=DagRunType.MANUAL,
+ triggered_by=DagRunTriggeredByType.UI,
+ logical_date=LOGICAL_DATE1,
+ )
+ session.flush()
+ response = test_client.delete(f"/dags/{DAG1_ID}/dagRuns/{run_id}")
+ assert response.status_code == 204
+
class TestGetDagRunAssetTriggerEvents:
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
@@ -1753,136 +1787,6 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, sessio
== "DAG with dag_id: 'import_errors' has import errors and cannot be triggered"
)
- @time_machine.travel(timezone.utcnow(), tick=False)
- @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
- def test_should_response_409_for_duplicate_logical_date(self, test_client):
- RUN_ID_1 = "random_1"
- RUN_ID_2 = "random_2"
- now = timezone.utcnow().isoformat().replace("+00:00", "Z")
- note = "duplicate logical date test"
- response_1 = test_client.post(
- f"/dags/{DAG1_ID}/dagRuns",
- json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now},
- )
- response_2 = test_client.post(
- f"/dags/{DAG1_ID}/dagRuns",
- json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now},
- )
-
- assert response_1.status_code == 200
- assert response_1.json() == {
- "bundle_version": None,
- "dag_display_name": DAG1_DISPLAY_NAME,
- "dag_run_id": RUN_ID_1,
- "dag_id": DAG1_ID,
- "dag_versions": mock.ANY,
- "logical_date": now,
- "queued_at": now,
- "start_date": None,
- "end_date": None,
- "duration": None,
- "run_after": now,
- "data_interval_start": now,
- "data_interval_end": now,
- "last_scheduling_decision": None,
- "run_type": "manual",
- "state": "queued",
- "triggered_by": "rest_api",
- "triggering_user_name": "test",
- "conf": {},
- "note": note,
- "partition_key": None,
- }
-
- assert response_2.status_code == 409
-
- @pytest.mark.parametrize(
- ("data_interval_start", "data_interval_end"),
- [
- (
- LOGICAL_DATE1.isoformat(),
- None,
- ),
- (
- None,
- LOGICAL_DATE1.isoformat(),
- ),
- ],
- )
- def test_should_response_422_for_missing_start_date_or_end_date(
- self, test_client, data_interval_start, data_interval_end
- ):
- now = timezone.utcnow().isoformat()
- response = test_client.post(
- f"/dags/{DAG1_ID}/dagRuns",
- json={
- "data_interval_start": data_interval_start,
- "data_interval_end": data_interval_end,
- "logical_date": now,
- },
- )
- assert response.status_code == 422
- assert (
- response.json()["detail"][0]["msg"]
- == "Value error, Either both data_interval_start and data_interval_end must be provided or both must be None"
- )
-
- def test_raises_validation_error_for_invalid_params(self, test_client):
- now = timezone.utcnow().isoformat()
- response = test_client.post(
- f"/dags/{DAG2_ID}/dagRuns",
- json={"conf": {"validated_number": 5000}, "logical_date": now},
- )
- assert response.status_code == 400
- assert "Invalid input for param validated_number" in response.json()["detail"]
-
- def test_response_404(self, test_client):
- now = timezone.utcnow().isoformat()
- response = test_client.post("/dags/randoms/dagRuns", json={"logical_date": now})
- assert response.status_code == 404
- assert response.json()["detail"] == "DAG with dag_id: 'randoms' not found"
-
- def test_response_409(self, test_client):
- now = timezone.utcnow().isoformat()
- response = test_client.post(
- f"/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID, "logical_date": now}
- )
- assert response.status_code == 409
- response_json = response.json()
- assert "detail" in response_json
- assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"]
-
- @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
- def test_should_respond_200_with_null_logical_date(self, test_client):
- response = test_client.post(
- f"/dags/{DAG1_ID}/dagRuns",
- json={"logical_date": None},
- )
- assert response.status_code == 200
- assert response.json() == {
- "bundle_version": None,
- "dag_display_name": DAG1_DISPLAY_NAME,
- "dag_run_id": mock.ANY,
- "dag_id": DAG1_ID,
- "dag_versions": mock.ANY,
- "logical_date": None,
- "queued_at": mock.ANY,
- "run_after": mock.ANY,
- "start_date": None,
- "end_date": None,
- "duration": None,
- "data_interval_start": mock.ANY,
- "data_interval_end": mock.ANY,
- "last_scheduling_decision": None,
- "run_type": "manual",
- "state": "queued",
- "triggered_by": "rest_api",
- "triggering_user_name": "test",
- "conf": {},
- "note": None,
- "partition_key": None,
- }
-
@time_machine.travel("2025-10-02 12:00:00", tick=False)
@pytest.mark.usefixtures("custom_timetable_plugin")
def test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, test_client, session):
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py
index 6bb1aea8d8f1d..266ad1bedc593 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -838,6 +838,170 @@ def test_manager_deactivate_deleted_dags_cleanup_behavior(
else:
mock_remove_references.assert_not_called()
+ def test_deactivate_deleted_dags_skips_empty_file_list(self, dag_maker, session, caplog):
+ """Test that deactivate_deleted_dags skips deactivation when file list is empty.
+
+ This is a safeguard against transient filesystem issues (NFS delays, volume mounts)
+ that could cause an empty file list to be returned, which would otherwise mark
+ all DAGs in the bundle as stale.
+ """
+ with dag_maker("test_dag1") as dag1:
+ dag1.relative_fileloc = "test_dag1.py"
+ with dag_maker("test_dag2") as dag2:
+ dag2.relative_fileloc = "test_dag2.py"
+ dag_maker.sync_dagbag_to_db()
+
+ manager = DagFileProcessorManager(max_runs=1)
+
+ # Pass an empty set of files - this simulates a transient filesystem issue
+ manager.deactivate_deleted_dags("dag_maker", set())
+
+ # Both DAGs should still be active since we skipped deactivation
+ assert session.get(DagModel, "test_dag1").is_stale is False
+ assert session.get(DagModel, "test_dag2").is_stale is False
+
+ # Check that a warning was logged
+ assert "No DAG files found in bundle" in caplog.text
+ assert "Skipping deactivation" in caplog.text
+
+ def test_deactivate_deleted_dags_skips_null_relative_fileloc(self, dag_maker, session):
+ """Test that DAGs with None relative_fileloc are not marked stale.
+
+ A DAG with relative_fileloc=None should be skipped during the deleted dags check,
+ as it will be updated when the DAG file is properly parsed.
+ """
+ # Create a DAG with relative_fileloc set
+ with dag_maker("test_dag_with_fileloc") as dag1:
+ dag1.relative_fileloc = "test_dag1.py"
+ dag_maker.sync_dagbag_to_db()
+
+ # Create a DAG model with None relative_fileloc (manually)
+ dag_model_null = DagModel(
+ dag_id="test_dag_null_fileloc",
+ bundle_name="dag_maker",
+ relative_fileloc=None,
+ )
+ session.add(dag_model_null)
+ session.commit()
+
+ # Active files only include test_dag1.py
+ active_files = [
+ DagFileInfo(
+ bundle_name="dag_maker",
+ rel_path=Path("test_dag1.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ ),
+ ]
+
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.deactivate_deleted_dags("dag_maker", active_files)
+
+ # The DAG with relative_fileloc should remain active
+ assert session.get(DagModel, "test_dag_with_fileloc").is_stale is False
+ # The DAG with None relative_fileloc should also remain active (skipped)
+ assert session.get(DagModel, "test_dag_null_fileloc").is_stale is False
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_scan_stale_dags_skips_null_last_parsed_time(self, session):
+ """Test that DAGs with None last_parsed_time are not marked stale.
+
+ A DAG with last_parsed_time=None should be skipped during the stale dags check,
+ as it will be updated when the DAG file is properly parsed.
+ """
+ manager = DagFileProcessorManager(
+ max_runs=1,
+ processor_timeout=10 * 60,
+ )
+ bundle = MagicMock()
+ bundle.name = "testing"
+ manager._dag_bundles = [bundle]
+
+ # Create a DAG model with None last_parsed_time
+ dag_model = DagModel(
+ dag_id="test_dag_null_last_parsed",
+ bundle_name="testing",
+ relative_fileloc="test_dag.py",
+ last_parsed_time=None, # Explicitly None
+ )
+ session.add(dag_model)
+ session.commit()
+
+ test_dag_path = DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("test_dag.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ )
+
+ # Add file stats that would trigger stale detection if last_parsed_time was set
+ stat = DagFileStat(
+ num_dags=1,
+ import_errors=0,
+ last_finish_time=timezone.utcnow() + timedelta(hours=1),
+ last_duration=1,
+ run_count=1,
+ last_num_of_db_queries=1,
+ )
+ manager._file_stats[test_dag_path] = stat
+
+ # Before running scan, the DAG should be active
+ assert session.get(DagModel, "test_dag_null_last_parsed").is_stale is False
+
+ manager._scan_stale_dags()
+
+ # After running scan, the DAG should still be active (skipped due to None last_parsed_time)
+ session.expire_all() # Clear cache
+ assert session.get(DagModel, "test_dag_null_last_parsed").is_stale is False
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_scan_stale_dags_skips_null_relative_fileloc(self, session):
+ """Test that DAGs with None relative_fileloc are not marked stale in scan_stale_dags.
+
+ A DAG with relative_fileloc=None should be skipped during the stale dags check
+ because we can't construct a valid DagFileInfo to look up in file stats.
+ """
+ manager = DagFileProcessorManager(
+ max_runs=1,
+ processor_timeout=10 * 60,
+ )
+ bundle = MagicMock()
+ bundle.name = "testing"
+ manager._dag_bundles = [bundle]
+
+ # Create a DAG model with None relative_fileloc
+ dag_model = DagModel(
+ dag_id="test_dag_null_fileloc",
+ bundle_name="testing",
+ relative_fileloc=None, # Explicitly None
+ last_parsed_time=timezone.utcnow() - timedelta(hours=2),
+ )
+ session.add(dag_model)
+ session.commit()
+
+ # Add some file stats (not related to the DAG with null fileloc)
+ test_dag_path = DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("other_dag.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ )
+ stat = DagFileStat(
+ num_dags=1,
+ import_errors=0,
+ last_finish_time=timezone.utcnow() + timedelta(hours=1),
+ last_duration=1,
+ run_count=1,
+ last_num_of_db_queries=1,
+ )
+ manager._file_stats[test_dag_path] = stat
+
+ # Before running scan, the DAG should be active
+ assert session.get(DagModel, "test_dag_null_fileloc").is_stale is False
+
+ manager._scan_stale_dags()
+
+ # After running scan, the DAG should still be active (skipped due to None relative_fileloc)
+ session.expire_all() # Clear cache
+ assert session.get(DagModel, "test_dag_null_fileloc").is_stale is False
+
@conf_vars({("core", "load_examples"): "False"})
def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle):
"""Test _fetch_callbacks returns callbacks ordered by priority_weight desc."""
diff --git a/dev/check_no_orm_in_migrations.py b/dev/check_no_orm_in_migrations.py
new file mode 100644
index 0000000000000..8bc88e4403280
--- /dev/null
+++ b/dev/check_no_orm_in_migrations.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python3
+"""
+Pre-commit hook: Ensure no Alembic migration script imports ORM models.
+"""
+import sys
+import re
+from pathlib import Path
+
+MIGRATIONS_DIRS = [
+ Path("airflow-core/src/airflow/migrations/versions"),
+]
+ORM_IMPORT_PATTERNS = [
+ re.compile(r"from +airflow\\.models"),
+ re.compile(r"import +airflow\\.models"),
+ re.compile(r"from +airflow\\.models\\.", re.IGNORECASE),
+ re.compile(r"import +airflow\\.models\\.", re.IGNORECASE),
+]
+
+def main():
+ failed = False
+ for migrations_dir in MIGRATIONS_DIRS:
+ if not migrations_dir.exists():
+ continue
+ for pyfile in migrations_dir.glob("*.py"):
+ with pyfile.open("r", encoding="utf-8") as f:
+ for i, line in enumerate(f, 1):
+ for pat in ORM_IMPORT_PATTERNS:
+ if pat.search(line):
+ print(f"ORM import found in migration: {pyfile} (line {i}): {line.strip()}")
+ failed = True
+ if failed:
+ print("\nERROR: ORM model imports found in Alembic migration scripts. Use table reflection or raw SQL instead.")
+ sys.exit(1)
+ sys.exit(0)
+
+if __name__ == "__main__":
+ main()
diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
index 71e88ff01210e..2c7fc9ad1a4fc 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
@@ -926,6 +926,9 @@ class DataprocDeleteClusterOperator(GoogleCloudBaseOperator):
"""
Delete a cluster in a project.
+ This operator is idempotent. If the cluster does not exist (e.g., it was already deleted
+ or never created), the operator will complete successfully without raising an error.
+
:param region: Required. The Cloud Dataproc region in which to handle the request (templated).
:param cluster_name: Required. The cluster name (templated).
:param project_id: Optional. The ID of the Google Cloud project that the cluster belongs to (templated).
@@ -995,7 +998,12 @@ def __init__(
def execute(self, context: Context) -> None:
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
- operation = self._delete_cluster(hook)
+ try:
+ operation = self._delete_cluster(hook)
+ except NotFound:
+ self.log.info("Cluster already deleted. Cluster %s not found.", self.cluster_name)
+ return
+
if not self.deferrable:
hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation)
self.log.info("Cluster deleted.")
diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index d525e97d5a5aa..14e021d9e31aa 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -1277,6 +1277,50 @@ def test_create_execute_call_finished_before_defer(self, mock_trigger_hook, mock
mock_hook.return_value.wait_for_operation.assert_not_called()
assert not mock_defer.called
+ @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+ def test_delete_cluster_not_found_idempotent(self, mock_hook):
+ """Test that operator succeeds when cluster doesn't exist (idempotent behavior)."""
+ mock_hook.return_value.delete_cluster.side_effect = NotFound("Cluster not found")
+
+ op = DataprocDeleteClusterOperator(
+ task_id=TASK_ID,
+ region=GCP_REGION,
+ project_id=GCP_PROJECT,
+ cluster_name=CLUSTER_NAME,
+ gcp_conn_id=GCP_CONN_ID,
+ )
+
+ # Should complete successfully without raising an exception
+ op.execute(context={})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
+ mock_hook.return_value.delete_cluster.assert_called_once()
+ # wait_for_operation should not be called since cluster doesn't exist
+ mock_hook.return_value.wait_for_operation.assert_not_called()
+
+ @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+ def test_delete_cluster_not_found_deferrable(self, mock_hook):
+ """Test idempotent behavior in deferrable mode when cluster doesn't exist."""
+ mock_hook.return_value.delete_cluster.side_effect = NotFound("Cluster not found")
+
+ op = DataprocDeleteClusterOperator(
+ task_id=TASK_ID,
+ region=GCP_REGION,
+ project_id=GCP_PROJECT,
+ cluster_name=CLUSTER_NAME,
+ gcp_conn_id=GCP_CONN_ID,
+ deferrable=True,
+ )
+
+ # Should complete successfully without deferring
+ op.execute(context={})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
+ mock_hook.return_value.delete_cluster.assert_called_once()
+ # Should not call get_cluster or wait_for_operation
+ mock_hook.return_value.get_cluster.assert_not_called()
+ mock_hook.return_value.wait_for_operation.assert_not_called()
+
class TestDataprocSubmitJobOperator(DataprocJobTestBase):
@mock.patch(DATAPROC_PATH.format("DataprocHook"))