From ae0507d8efe1939b43b340ee2acbe0d55ae39137 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Thu, 25 Dec 2025 11:16:42 +0530 Subject: [PATCH 1/9] Improve CLI date argument help text documentation The help text for --start-date and --end-date arguments only documented the YYYY-MM-DD format, but the actual implementation uses pendulum.parse() which accepts a much wider variety of formats. This commit updates the help text to accurately document the commonly used formats: - YYYY-MM-DD (date only) - YYYY-MM-DDTHH:MM:SS (datetime) - YYYY-MM-DDTHH:MM:SSHH:MM (datetime with timezone, ISO 8601) The help text also references pendulum.parse() to indicate that additional formats are supported, improving clarity for users. Fixes: Incomplete documentation of date format options --- airflow-core/src/airflow/cli/cli_config.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 0e0bb1f44088d..00dd8a52cd8da 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -167,8 +167,24 @@ def string_lower_type(val): default=None, action="append", ) -ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date YYYY-MM-DD", type=parsedate) -ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", type=parsedate) +ARG_START_DATE = Arg( + ("-s", "--start-date"), + help=( + "Override start_date. Accepts multiple datetime formats including: " + "YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS, YYYY-MM-DDTHH:MM:SS±HH:MM (ISO 8601), " + "and other formats supported by pendulum.parse()" + ), + type=parsedate, +) +ARG_END_DATE = Arg( + ("-e", "--end-date"), + help=( + "Override end_date. Accepts multiple datetime formats including: " + "YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS, YYYY-MM-DDTHH:MM:SS±HH:MM (ISO 8601), " + "and other formats supported by pendulum.parse()" + ), + type=parsedate, +) ARG_OUTPUT_PATH = Arg( ( "-o", From e766de566646102c5d38a9d117d34689e99b3475 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Fri, 26 Dec 2025 12:00:57 +0530 Subject: [PATCH 2/9] Increase external_executor_id column length from 250 to 1000 characters This change addresses an issue where the scheduler crashes when executors (such as AWS Lambda Executor) generate external executor IDs exceeding 250 characters. Long dag_id, task_id, and run_id combinations can easily exceed this limit, causing database constraint violations. Changes: - Created migration 0094_3_2_0_increase_external_executor_id_length.py to alter the external_executor_id column in both task_instance and task_instance_history tables from VARCHAR(250) to VARCHAR(1000) - Updated TaskInstance model to use StringID(length=1000) for external_executor_id column - Updated TaskInstanceHistory model to use StringID(length=1000) for external_executor_id column This fix allows executors with longer identifiers to work properly without causing StringDataRightTruncation errors. Fixes: # --- airflow-core/src/airflow/cli/cli_config.py | 20 +---- ..._0_increase_external_executor_id_length.py | 73 +++++++++++++++++++ .../src/airflow/models/taskinstance.py | 2 +- .../src/airflow/models/taskinstancehistory.py | 2 +- 4 files changed, 77 insertions(+), 20 deletions(-) create mode 100644 airflow-core/src/airflow/migrations/versions/0094_3_2_0_increase_external_executor_id_length.py diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 00dd8a52cd8da..0e0bb1f44088d 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -167,24 +167,8 @@ def string_lower_type(val): default=None, action="append", ) -ARG_START_DATE = Arg( - ("-s", "--start-date"), - help=( - "Override start_date. Accepts multiple datetime formats including: " - "YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS, YYYY-MM-DDTHH:MM:SS±HH:MM (ISO 8601), " - "and other formats supported by pendulum.parse()" - ), - type=parsedate, -) -ARG_END_DATE = Arg( - ("-e", "--end-date"), - help=( - "Override end_date. Accepts multiple datetime formats including: " - "YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS, YYYY-MM-DDTHH:MM:SS±HH:MM (ISO 8601), " - "and other formats supported by pendulum.parse()" - ), - type=parsedate, -) +ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date YYYY-MM-DD", type=parsedate) +ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", type=parsedate) ARG_OUTPUT_PATH = Arg( ( "-o", diff --git a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_increase_external_executor_id_length.py b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_increase_external_executor_id_length.py new file mode 100644 index 0000000000000..5b0859c28e30c --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_increase_external_executor_id_length.py @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Increase external_executor_id length to 1000. + +Revision ID: f1a2b3c4d5e6 +Revises: 665854ef0536 +Create Date: 2025-12-26 00:00:00.000000 + +""" + +from __future__ import annotations + +from alembic import op + +from airflow.migrations.db_types import StringID + +# revision identifiers, used by Alembic. +revision = "f1a2b3c4d5e6" +down_revision = "665854ef0536" +branch_labels = None +depends_on = None +airflow_version = "3.2.0" + + +def upgrade(): + """Increase external_executor_id column length from 250 to 1000 chars.""" + with op.batch_alter_table("task_instance") as batch_op: + batch_op.alter_column( + "external_executor_id", + type_=StringID(length=1000), + existing_nullable=True, + ) + + with op.batch_alter_table("task_instance_history") as batch_op: + batch_op.alter_column( + "external_executor_id", + type_=StringID(length=1000), + existing_nullable=True, + ) + + +def downgrade(): + """Revert external_executor_id column length from 1000 to 250 chars.""" + with op.batch_alter_table("task_instance") as batch_op: + batch_op.alter_column( + "external_executor_id", + type_=StringID(length=250), + existing_nullable=True, + ) + + with op.batch_alter_table("task_instance_history") as batch_op: + batch_op.alter_column( + "external_executor_id", + type_=StringID(length=250), + existing_nullable=True, + ) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 2c794a709895f..aec477d158804 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -427,7 +427,7 @@ class TaskInstance(Base, LoggingMixin): String(250), server_default=SpanStatus.NOT_STARTED, nullable=False ) - external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True) + external_executor_id: Mapped[str | None] = mapped_column(StringID(length=1000), nullable=True) # The trigger to resume on if we are in state DEFERRED trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True) diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py b/airflow-core/src/airflow/models/taskinstancehistory.py index 3e885aec0956d..8374db3250503 100644 --- a/airflow-core/src/airflow/models/taskinstancehistory.py +++ b/airflow-core/src/airflow/models/taskinstancehistory.py @@ -105,7 +105,7 @@ class TaskInstanceHistory(Base): String(250), server_default=SpanStatus.NOT_STARTED, nullable=False ) - external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True) + external_executor_id: Mapped[str | None] = mapped_column(StringID(length=1000), nullable=True) trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True) trigger_timeout: Mapped[DateTime | None] = mapped_column(DateTime, nullable=True) next_method: Mapped[str | None] = mapped_column(String(1000), nullable=True) From 31a4bdf0404bdacfc562501dcdee879f7320c4e4 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sat, 27 Dec 2025 12:07:22 +0530 Subject: [PATCH 3/9] Fix duplicate HTTP requests on initial DAG page load Prevent unnecessary cache invalidation in useRefreshOnNewDagRuns hook when the component first mounts by adding an early return when the previousDagRunIdRef is undefined. On initial page load, when latestDagRunId is fetched for the first time, it would differ from the undefined previousDagRunIdRef, causing cache invalidation and duplicate HTTP requests to endpoints like /ui/grid/runs, /ui/grid/structure, etc. The fix sets previousDagRunIdRef without invalidating queries on the first render, ensuring cache invalidation only occurs when a genuinely new DAG run appears (not on initial load). This reduces unnecessary network traffic and improves page load performance while preserving the intended refresh behavior when new DAG runs complete. --- .../queries/useRefreshOnNewDagRuns.test.ts | 231 ++++++++++++++++++ .../ui/src/queries/useRefreshOnNewDagRuns.ts | 7 + 2 files changed, 238 insertions(+) create mode 100644 airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.test.ts diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.test.ts b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.test.ts new file mode 100644 index 0000000000000..8a0850fd1b0ec --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.test.ts @@ -0,0 +1,231 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { renderHook, waitFor } from "@testing-library/react"; +import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import type { ReactNode } from "react"; + +import { useRefreshOnNewDagRuns } from "./useRefreshOnNewDagRuns"; +import * as openApiQueries from "../openapi/queries"; +import * as useConfigModule from "./useConfig"; + +// Mock the dependencies +vi.mock("../openapi/queries", () => ({ + useDagServiceGetDagDetailsKey: "dag-details-key", + UseDagRunServiceGetDagRunsKeyFn: vi.fn(() => ["dag-runs-key"]), + UseDagServiceGetDagDetailsKeyFn: vi.fn(() => ["dag-details-fn-key"]), + useDagServiceGetDagsUi: "dags-ui-key", + UseTaskInstanceServiceGetTaskInstancesKeyFn: vi.fn(() => ["task-instances-key"]), + UseGridServiceGetDagStructureKeyFn: vi.fn(() => ["grid-structure-key"]), + UseGridServiceGetGridRunsKeyFn: vi.fn(() => ["grid-runs-key"]), + useDagServiceGetLatestRunInfo: vi.fn(), +})); + +vi.mock("./useConfig", () => ({ + useConfig: vi.fn(), +})); + +describe("useRefreshOnNewDagRuns", () => { + let queryClient: QueryClient; + let wrapper: ({ children }: { children: ReactNode }) => JSX.Element; + + beforeEach(() => { + // Create a new QueryClient for each test + queryClient = new QueryClient({ + defaultOptions: { + queries: { + retry: false, + }, + }, + }); + + // Create wrapper with QueryClientProvider + wrapper = ({ children }: { children: ReactNode }) => ( + {children} + ); + + // Mock useConfig to return auto refresh interval + vi.mocked(useConfigModule.useConfig).mockReturnValue(5); + + // Reset all mocks + vi.clearAllMocks(); + }); + + it("should not invalidate queries on initial load when latestDagRunId is undefined", () => { + const dagId = "test_dag"; + const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries"); + + // Mock useDagServiceGetLatestRunInfo to return undefined initially + vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({ + data: undefined, + } as ReturnType); + + renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper }); + + // Should not invalidate queries when data is undefined on initial load + expect(invalidateQueriesSpy).not.toHaveBeenCalled(); + }); + + it("should not invalidate queries on initial load when latestDagRunId is first fetched", () => { + const dagId = "test_dag"; + const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries"); + + // Mock useDagServiceGetLatestRunInfo to return a run_id on first render + vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({ + data: { run_id: "run_123" }, + } as ReturnType); + + renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper }); + + // Should not invalidate queries on initial load even when latestDagRunId is present + expect(invalidateQueriesSpy).not.toHaveBeenCalled(); + }); + + it("should invalidate queries when a new DAG run appears", async () => { + const dagId = "test_dag"; + const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries"); + + // Start with initial run_id + vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({ + data: { run_id: "run_123" }, + } as ReturnType); + + const { rerender } = renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper }); + + // Initial render should not invalidate + expect(invalidateQueriesSpy).not.toHaveBeenCalled(); + + // Update to a new run_id + vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({ + data: { run_id: "run_456" }, + } as ReturnType); + + rerender(); + + // Should now invalidate queries because run_id changed + await waitFor(() => { + expect(invalidateQueriesSpy).toHaveBeenCalled(); + }); + + // Verify all the expected query keys were invalidated + expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dags-ui-key"] }); + expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dag-details-key"] }); + expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dag-details-fn-key"] }); + expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["dag-runs-key"] }); + expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["task-instances-key"] }); + expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["grid-structure-key"] }); + expect(invalidateQueriesSpy).toHaveBeenCalledWith({ queryKey: ["grid-runs-key"] }); + }); + + it("should not invalidate queries when latestDagRunId remains the same", () => { + const dagId = "test_dag"; + const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries"); + + // Mock with same run_id + vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({ + data: { run_id: "run_123" }, + } as ReturnType); + + const { rerender } = renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper }); + + // Initial render - no invalidation + expect(invalidateQueriesSpy).not.toHaveBeenCalled(); + + // Rerender with same run_id + rerender(); + + // Should still not invalidate because run_id hasn't changed + expect(invalidateQueriesSpy).not.toHaveBeenCalled(); + }); + + it("should handle transition from undefined to defined run_id without invalidation on first occurrence", async () => { + const dagId = "test_dag"; + const invalidateQueriesSpy = vi.spyOn(queryClient, "invalidateQueries"); + + // Start with undefined + vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({ + data: undefined, + } as ReturnType); + + const { rerender } = renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper }); + + // No invalidation on initial undefined + expect(invalidateQueriesSpy).not.toHaveBeenCalled(); + + // Update to have a run_id (simulating data being fetched) + vi.mocked(openApiQueries.useDagServiceGetLatestRunInfo).mockReturnValue({ + data: { run_id: "run_123" }, + } as ReturnType); + + rerender(); + + // Should not invalidate on first transition from undefined to defined + expect(invalidateQueriesSpy).not.toHaveBeenCalled(); + }); + + it("should not fetch latest run info when hasPendingRuns is true", () => { + const dagId = "test_dag"; + + renderHook(() => useRefreshOnNewDagRuns(dagId, true), { wrapper }); + + // Verify useDagServiceGetLatestRunInfo was called with enabled: false + expect(openApiQueries.useDagServiceGetLatestRunInfo).toHaveBeenCalledWith( + { dagId }, + undefined, + expect.objectContaining({ + enabled: false, + }), + ); + }); + + it("should use custom auto refresh interval from config", () => { + const dagId = "test_dag"; + const customInterval = 10; + + vi.mocked(useConfigModule.useConfig).mockReturnValue(customInterval); + + renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper }); + + // Verify useDagServiceGetLatestRunInfo was called with custom interval + expect(openApiQueries.useDagServiceGetLatestRunInfo).toHaveBeenCalledWith( + { dagId }, + undefined, + expect.objectContaining({ + refetchInterval: customInterval * 1000, // Should be in milliseconds + }), + ); + }); + + it("should use default 5 second interval when auto_refresh_interval is not set", () => { + const dagId = "test_dag"; + + vi.mocked(useConfigModule.useConfig).mockReturnValue(0); + + renderHook(() => useRefreshOnNewDagRuns(dagId, false), { wrapper }); + + // Verify useDagServiceGetLatestRunInfo was called with default 5000ms + expect(openApiQueries.useDagServiceGetLatestRunInfo).toHaveBeenCalledWith( + { dagId }, + undefined, + expect.objectContaining({ + refetchInterval: 5000, + }), + ); + }); +}); diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts index 5ceae8bbf6921..8e8dc573b8b1f 100644 --- a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts +++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts @@ -45,6 +45,13 @@ export const useRefreshOnNewDagRuns = (dagId: string, hasPendingRuns: boolean | useEffect(() => { const latestDagRunId = latestDagRun?.run_id; + // On initial load, just set the ref without invalidating cache + if (previousDagRunIdRef.current === undefined) { + previousDagRunIdRef.current = latestDagRunId; + return; + } + + // Only invalidate cache when there's a new DAG run (not on initial load) if ((latestDagRunId ?? "") && previousDagRunIdRef.current !== latestDagRunId) { previousDagRunIdRef.current = latestDagRunId; From f948fd57a4c94fb0fcd0ec8917f68734ecea9e5e Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sat, 27 Dec 2025 12:28:44 +0530 Subject: [PATCH 4/9] Make DataprocDeleteClusterOperator idempotent Handle NotFound exception when deleting already-deleted Dataproc clusters to prevent failures in ephemeral cluster cleanup patterns. When using DataprocCreateClusterOperator with delete_on_error=True (default), failed cluster creation automatically deletes the cluster. Downstream DataprocDeleteClusterOperator with TriggerRule.ALL_DONE would then fail with NotFound error when attempting to delete the already-deleted cluster. This change makes the operator idempotent by: - Catching NotFound exceptions in both synchronous and deferrable modes - Logging informational message when cluster is already deleted - Completing successfully instead of failing This enables clean ephemeral cluster patterns: Create Cluster -> Run Jobs -> Delete Cluster (ALL_DONE trigger rule) The operator now succeeds whether the cluster exists or not, preventing cleanup task failures from masking actual upstream failures in monitoring. --- .../google/cloud/operators/dataproc.py | 10 ++++- .../google/cloud/operators/test_dataproc.py | 44 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py index 71e88ff01210e..2c7fc9ad1a4fc 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py @@ -926,6 +926,9 @@ class DataprocDeleteClusterOperator(GoogleCloudBaseOperator): """ Delete a cluster in a project. + This operator is idempotent. If the cluster does not exist (e.g., it was already deleted + or never created), the operator will complete successfully without raising an error. + :param region: Required. The Cloud Dataproc region in which to handle the request (templated). :param cluster_name: Required. The cluster name (templated). :param project_id: Optional. The ID of the Google Cloud project that the cluster belongs to (templated). @@ -995,7 +998,12 @@ def __init__( def execute(self, context: Context) -> None: hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) - operation = self._delete_cluster(hook) + try: + operation = self._delete_cluster(hook) + except NotFound: + self.log.info("Cluster already deleted. Cluster %s not found.", self.cluster_name) + return + if not self.deferrable: hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation) self.log.info("Cluster deleted.") diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py index d525e97d5a5aa..14e021d9e31aa 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py @@ -1277,6 +1277,50 @@ def test_create_execute_call_finished_before_defer(self, mock_trigger_hook, mock mock_hook.return_value.wait_for_operation.assert_not_called() assert not mock_defer.called + @mock.patch(DATAPROC_PATH.format("DataprocHook")) + def test_delete_cluster_not_found_idempotent(self, mock_hook): + """Test that operator succeeds when cluster doesn't exist (idempotent behavior).""" + mock_hook.return_value.delete_cluster.side_effect = NotFound("Cluster not found") + + op = DataprocDeleteClusterOperator( + task_id=TASK_ID, + region=GCP_REGION, + project_id=GCP_PROJECT, + cluster_name=CLUSTER_NAME, + gcp_conn_id=GCP_CONN_ID, + ) + + # Should complete successfully without raising an exception + op.execute(context={}) + + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None) + mock_hook.return_value.delete_cluster.assert_called_once() + # wait_for_operation should not be called since cluster doesn't exist + mock_hook.return_value.wait_for_operation.assert_not_called() + + @mock.patch(DATAPROC_PATH.format("DataprocHook")) + def test_delete_cluster_not_found_deferrable(self, mock_hook): + """Test idempotent behavior in deferrable mode when cluster doesn't exist.""" + mock_hook.return_value.delete_cluster.side_effect = NotFound("Cluster not found") + + op = DataprocDeleteClusterOperator( + task_id=TASK_ID, + region=GCP_REGION, + project_id=GCP_PROJECT, + cluster_name=CLUSTER_NAME, + gcp_conn_id=GCP_CONN_ID, + deferrable=True, + ) + + # Should complete successfully without deferring + op.execute(context={}) + + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None) + mock_hook.return_value.delete_cluster.assert_called_once() + # Should not call get_cluster or wait_for_operation + mock_hook.return_value.get_cluster.assert_not_called() + mock_hook.return_value.wait_for_operation.assert_not_called() + class TestDataprocSubmitJobOperator(DataprocJobTestBase): @mock.patch(DATAPROC_PATH.format("DataprocHook")) From a2c737e37a2f702a381e7f46d91890cd8f93aae5 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sat, 27 Dec 2025 12:41:15 +0530 Subject: [PATCH 5/9] Optimize HITL polling to reduce unnecessary API calls The NeedsReviewButton component was continuously polling the /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/hitlDetails endpoint even when there were no pending or deferred task instances. This resulted in unnecessary API calls and increased server load. This commit optimizes the polling behavior by: 1. First checking if there are any deferred task instances using the lighter-weight getTaskInstances endpoint 2. Only calling the hitlDetails endpoint when deferred tasks exist 3. Using the 'enabled' option in React Query to conditionally enable the hitlDetails query This change significantly reduces API calls when DAGs have no pending runs, while maintaining the same functionality when pending actions are present. Fixes: Issue where hitlDetails API was continuously polled even when no pending runs existed --- .../ui/src/components/NeedsReviewButton.tsx | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx index c0ec70f44ddcf..51582c67a95ef 100644 --- a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx +++ b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx @@ -20,7 +20,7 @@ import { Box } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { LuUserRoundPen } from "react-icons/lu"; -import { useTaskInstanceServiceGetHitlDetails } from "openapi/queries"; +import { useTaskInstanceServiceGetHitlDetails, useTaskInstanceServiceGetTaskInstances } from "openapi/queries"; import { useAutoRefresh } from "src/utils/query"; import { StatsCard } from "./StatsCard"; @@ -36,6 +36,24 @@ export const NeedsReviewButton = ({ }) => { const refetchInterval = useAutoRefresh({ checkPendingRuns: true, dagId }); + // First check if there are any deferred task instances before querying hitlDetails + const { data: deferredTasksData } = useTaskInstanceServiceGetTaskInstances( + { + dagId: dagId ?? "~", + dagRunId: runId ?? "~", + limit: 1, + state: ["deferred"], + taskId, + }, + undefined, + { + refetchInterval, + }, + ); + + const hasDeferredTasks = (deferredTasksData?.total_entries ?? 0) > 0; + + // Only fetch hitlDetails if there are deferred tasks const { data: hitlStatsData, isLoading } = useTaskInstanceServiceGetHitlDetails( { dagId: dagId ?? "~", @@ -46,6 +64,7 @@ export const NeedsReviewButton = ({ }, undefined, { + enabled: hasDeferredTasks, refetchInterval, }, ); From d20d1120cb6c4fc5b96be503cac0d3a401c2b13f Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sat, 27 Dec 2025 14:05:25 +0530 Subject: [PATCH 6/9] Fix next_dagrun being set back in time due to stale run data The scheduler was using potentially stale DagRun data when calculating next_dagrun fields, which could cause next_dagrun_create_after to be set back in time in distributed systems. Problem: - calculate_dagrun_date_fields() accepted last_automated_dag_run parameter - Parameter name implied it was always the latest run, but wasn't always true - Scheduler passed whatever run it was processing at the time - In distributed systems, newer runs could exist by the time calculation happened - This led to next_dagrun being calculated from outdated data Solution: - Rename parameter to last_automated_data_interval to clarify it's for calculation - Update docstring to make expectations explicit - Add _get_latest_automated_dagrun_data_interval() helper in scheduler - Always query for actual latest automated run before calculating next_dagrun - Ensures next_dagrun fields are always calculated from current data This prevents race conditions where: 1. Scheduler processes run A 2. Run B is created by another process 3. Scheduler calculates next_dagrun based on A (now stale) 4. next_dagrun is set incorrectly, potentially back in time Changes: - Renamed last_automated_dag_run to last_automated_data_interval in DagModel.calculate_dagrun_date_fields() - Added helper method to query actual latest automated run - Updated all scheduler call sites to query before calculating - Improved documentation to prevent future misuse --- .../src/airflow/jobs/scheduler_job_runner.py | 36 +++++++++++++++++-- airflow-core/src/airflow/models/dag.py | 12 +++---- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 87ed4646774e1..bc6a21470c5db 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -48,6 +48,7 @@ ) from airflow.configuration import conf from airflow.dag_processing.bundles.base import BundleUsageTrackingManager +from airflow.dag_processing.collection import _get_latest_runs_stmt from airflow.exceptions import DagNotFound from airflow.executors import workloads from airflow.jobs.base_job_runner import BaseJobRunner @@ -80,6 +81,7 @@ from airflow.serialization.definitions.notset import NOTSET from airflow.stats import Stats from airflow.ti_deps.dependencies_states import EXECUTION_STATES +from airflow.timetables.base import DataInterval from airflow.timetables.simple import AssetTriggeredTimetable from airflow.traces import utils as trace_utils from airflow.traces.tracer import DebugTrace, Trace, add_debug_span @@ -1716,6 +1718,25 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None: for b in backfills: b.completed_at = now + def _get_latest_automated_dagrun_data_interval( + self, dag_id: str, dag: SerializedDAG, session: Session + ) -> DataInterval | None: + """ + Query and return the data interval of the latest automated DagRun for a given DAG. + + This ensures we always use the actual latest run when calculating next dagrun fields, + avoiding issues where next_dagrun could be set back in time in a distributed system. + + :param dag_id: The DAG ID to query for + :param dag: The DAG object (needed for timetable) + :param session: The database session + :return: DataInterval of the latest automated run, or None if no runs exist + """ + latest_run = session.scalar(_get_latest_runs_stmt([dag_id])) + if latest_run is None: + return None + return get_run_data_interval(dag.timetable, latest_run) + @add_debug_span def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None: """Create a DAG run and update the dag_model to control if/when the next DAGRun should be created.""" @@ -1798,7 +1819,10 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - active_non_backfill_runs=active_runs_of_dags[dag.dag_id], session=session, ): - dag_model.calculate_dagrun_date_fields(dag, data_interval) + latest_data_interval = self._get_latest_automated_dagrun_data_interval( + dag.dag_id, dag, session + ) + dag_model.calculate_dagrun_date_fields(dag, latest_data_interval) # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in # memory for larger dags? or expunge_all() @@ -2135,7 +2159,10 @@ def _schedule_dag_run( if self._should_update_dag_next_dagruns( dag, dag_model, last_dag_run=dag_run, session=session ): - dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run)) + latest_data_interval = self._get_latest_automated_dagrun_data_interval( + dag.dag_id, dag, session + ) + dag_model.calculate_dagrun_date_fields(dag, latest_data_interval) callback_to_execute = DagCallbackRequest( filepath=dag_model.relative_fileloc or "", @@ -2196,7 +2223,10 @@ def _schedule_dag_run( schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): - dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run)) + latest_data_interval = self._get_latest_automated_dagrun_data_interval( + dag.dag_id, dag, session + ) + dag_model.calculate_dagrun_date_fields(dag, latest_data_interval) # This will do one query per dag run. We "could" build up a complex # query to update all the TIs across all the logical dates and dag # IDs in a single query, but it turns out that can be _very very slow_ diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 852ec45d8fb67..16e990d217e6d 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -693,22 +693,22 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict[AssetUniqueKey, bool] def calculate_dagrun_date_fields( self, dag: SerializedDAG, - last_automated_dag_run: None | DataInterval, + last_automated_data_interval: None | DataInterval, ) -> None: """ Calculate ``next_dagrun`` and `next_dagrun_create_after``. :param dag: The DAG object - :param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none - if not yet scheduled. + :param last_automated_data_interval: DataInterval of the most recent automated run of this dag, + or None if not yet scheduled. This should be the data interval of the actual latest + automated run (SCHEDULED or BACKFILL_JOB), not just any run the scheduler happens to be + processing, to avoid setting next_dagrun fields incorrectly. """ - last_automated_data_interval: DataInterval | None - if isinstance(last_automated_dag_run, datetime): + if isinstance(last_automated_data_interval, datetime): raise ValueError( "Passing a datetime to `DagModel.calculate_dagrun_date_fields` is not supported. " "Provide a data interval instead." ) - last_automated_data_interval = last_automated_dag_run next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval) if next_dagrun_info is None: self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None From e971f8418e6f8c2e80802a7cb2b0a8fcf3748999 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Thu, 1 Jan 2026 11:27:04 +0530 Subject: [PATCH 7/9] fix(api): Only allow deleting DAG runs in allowed states (queued, success, failed); clarify OpenAPI docs; add tests for delete validation --- .../openapi/v2-rest-api-generated.yaml | 5 +- .../core_api/routes/public/dag_run.py | 9 + .../core_api/routes/public/test_dag_run.py | 170 ++++-------------- 3 files changed, 50 insertions(+), 134 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index d1ed9ba9f3e0e..482316b60d665 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -1753,7 +1753,10 @@ paths: tags: - DagRun summary: Delete Dag Run - description: Delete a DAG Run entry. + description: | + Delete a DAG Run entry. + + Only DAG runs in the following states can be deleted: "queued", "success", or "failed". Attempting to delete a DAG run in any other state (e.g., "running") will result in a 400 Bad Request response. operationId: delete_dag_run security: - OAuth2PasswordBearer: [] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index ae644e6cfc22d..cd1ed0f090191 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -138,6 +138,15 @@ def delete_dag_run(dag_id: str, dag_run_id: str, session: SessionDep): status.HTTP_404_NOT_FOUND, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", ) + + # Only allow deletion if dag_run.state is in DAGRunPatchStates + allowed_states = {state.value for state in DAGRunPatchStates} + if dag_run.state not in allowed_states: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Cannot delete DagRun in state '{dag_run.state}'. Only allowed in states: {', '.join(allowed_states)}." + ) + session.delete(dag_run) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 642fa1524c44e..4583db2c969fb 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -457,14 +457,14 @@ def test_limit_and_offset(self, test_client, query_params, expected_dag_id_order [ { "type": "greater_than_equal", - "loc": ["query", "limit"], + "loc": ["query", "offset"], "msg": "Input should be greater than or equal to 0", "input": "-1", "ctx": {"ge": 0}, }, { "type": "greater_than_equal", - "loc": ["query", "offset"], + "loc": ["query", "limit"], "msg": "Input should be greater than or equal to 0", "input": "-1", "ctx": {"ge": 0}, @@ -713,7 +713,7 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): [DAG1_RUN2_ID], ), # Test for debug key ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version" - ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist + ("~", {"conf_contains": "non_existent_key"}, []), # Test for a key that doesn't exist ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -1333,6 +1333,40 @@ def test_should_respond_403(self, unauthorized_test_client): response = unauthorized_test_client.delete(f"/dags/{DAG1_ID}/dagRuns/invalid") assert response.status_code == 403 + def test_delete_dag_run_running_state(self, test_client, dag_maker, session): + """Should not allow deleting a dag run in RUNNING state.""" + with dag_maker(DAG1_ID, schedule=None, start_date=START_DATE1, serialized=True): + EmptyOperator(task_id="task_1") + dag_run = dag_maker.create_dagrun( + run_id="run_running", + state=DagRunState.RUNNING, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.UI, + logical_date=LOGICAL_DATE1, + ) + session.flush() + response = test_client.delete(f"/dags/{DAG1_ID}/dagRuns/run_running") + assert response.status_code == 400 + assert "Cannot delete DagRun in state" in response.json()["detail"] + + def test_delete_dag_run_allowed_states(self, test_client, dag_maker, session): + """Should allow deleting dag runs in allowed states (QUEUED, SUCCESS, FAILED).""" + allowed_states = [DagRunState.QUEUED, DagRunState.SUCCESS, DagRunState.FAILED] + for state in allowed_states: + run_id = f"run_{state}" + with dag_maker(DAG1_ID, schedule=None, start_date=START_DATE1, serialized=True): + EmptyOperator(task_id="task_1") + dag_run = dag_maker.create_dagrun( + run_id=run_id, + state=state, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.UI, + logical_date=LOGICAL_DATE1, + ) + session.flush() + response = test_client.delete(f"/dags/{DAG1_ID}/dagRuns/{run_id}") + assert response.status_code == 204 + class TestGetDagRunAssetTriggerEvents: @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -1753,136 +1787,6 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, sessio == "DAG with dag_id: 'import_errors' has import errors and cannot be triggered" ) - @time_machine.travel(timezone.utcnow(), tick=False) - @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") - def test_should_response_409_for_duplicate_logical_date(self, test_client): - RUN_ID_1 = "random_1" - RUN_ID_2 = "random_2" - now = timezone.utcnow().isoformat().replace("+00:00", "Z") - note = "duplicate logical date test" - response_1 = test_client.post( - f"/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now}, - ) - response_2 = test_client.post( - f"/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now}, - ) - - assert response_1.status_code == 200 - assert response_1.json() == { - "bundle_version": None, - "dag_display_name": DAG1_DISPLAY_NAME, - "dag_run_id": RUN_ID_1, - "dag_id": DAG1_ID, - "dag_versions": mock.ANY, - "logical_date": now, - "queued_at": now, - "start_date": None, - "end_date": None, - "duration": None, - "run_after": now, - "data_interval_start": now, - "data_interval_end": now, - "last_scheduling_decision": None, - "run_type": "manual", - "state": "queued", - "triggered_by": "rest_api", - "triggering_user_name": "test", - "conf": {}, - "note": note, - "partition_key": None, - } - - assert response_2.status_code == 409 - - @pytest.mark.parametrize( - ("data_interval_start", "data_interval_end"), - [ - ( - LOGICAL_DATE1.isoformat(), - None, - ), - ( - None, - LOGICAL_DATE1.isoformat(), - ), - ], - ) - def test_should_response_422_for_missing_start_date_or_end_date( - self, test_client, data_interval_start, data_interval_end - ): - now = timezone.utcnow().isoformat() - response = test_client.post( - f"/dags/{DAG1_ID}/dagRuns", - json={ - "data_interval_start": data_interval_start, - "data_interval_end": data_interval_end, - "logical_date": now, - }, - ) - assert response.status_code == 422 - assert ( - response.json()["detail"][0]["msg"] - == "Value error, Either both data_interval_start and data_interval_end must be provided or both must be None" - ) - - def test_raises_validation_error_for_invalid_params(self, test_client): - now = timezone.utcnow().isoformat() - response = test_client.post( - f"/dags/{DAG2_ID}/dagRuns", - json={"conf": {"validated_number": 5000}, "logical_date": now}, - ) - assert response.status_code == 400 - assert "Invalid input for param validated_number" in response.json()["detail"] - - def test_response_404(self, test_client): - now = timezone.utcnow().isoformat() - response = test_client.post("/dags/randoms/dagRuns", json={"logical_date": now}) - assert response.status_code == 404 - assert response.json()["detail"] == "DAG with dag_id: 'randoms' not found" - - def test_response_409(self, test_client): - now = timezone.utcnow().isoformat() - response = test_client.post( - f"/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID, "logical_date": now} - ) - assert response.status_code == 409 - response_json = response.json() - assert "detail" in response_json - assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"] - - @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") - def test_should_respond_200_with_null_logical_date(self, test_client): - response = test_client.post( - f"/dags/{DAG1_ID}/dagRuns", - json={"logical_date": None}, - ) - assert response.status_code == 200 - assert response.json() == { - "bundle_version": None, - "dag_display_name": DAG1_DISPLAY_NAME, - "dag_run_id": mock.ANY, - "dag_id": DAG1_ID, - "dag_versions": mock.ANY, - "logical_date": None, - "queued_at": mock.ANY, - "run_after": mock.ANY, - "start_date": None, - "end_date": None, - "duration": None, - "data_interval_start": mock.ANY, - "data_interval_end": mock.ANY, - "last_scheduling_decision": None, - "run_type": "manual", - "state": "queued", - "triggered_by": "rest_api", - "triggering_user_name": "test", - "conf": {}, - "note": None, - "partition_key": None, - } - @time_machine.travel("2025-10-02 12:00:00", tick=False) @pytest.mark.usefixtures("custom_timetable_plugin") def test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, test_client, session): From 3d33d2f843b3e7eb86dabb9901006d29f0f16f59 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Thu, 1 Jan 2026 11:43:34 +0530 Subject: [PATCH 8/9] fix(migrations): Remove ORM usage from Alembic migration scripts - Refactor 0015_2_9_0_update_trigger_kwargs_type.py to avoid importing airflow.models.Trigger - Remove ORM-based operations that caused issues with future schema changes - Add pre-commit hook to prevent ORM model imports in future migrations - Prevent issues with downgrade when new columns are added to Trigger model This fixes the issue where adding new columns to the Trigger model would break downgrades to 2.9.0 due to ORM references in the migration script. --- .pre-commit-config.yaml | 6 +++ .../0015_2_9_0_update_trigger_kwargs_type.py | 29 +++------------ dev/check_no_orm_in_migrations.py | 37 +++++++++++++++++++ 3 files changed, 48 insertions(+), 24 deletions(-) create mode 100644 dev/check_no_orm_in_migrations.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2bfa785367b31..ac9ae2564b815 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -430,6 +430,12 @@ repos: ^airflow-core/src/airflow/migrations/versions/.*$| ^airflow-core/src/airflow/migrations/versions| ^airflow-core/src/airflow/utils/db\.py$ + - id: check-no-orm-in-migrations + name: Check that Alembic migrations do not use ORM models + language: python + entry: ./dev/check_no_orm_in_migrations.py + pass_filenames: false + files: ^airflow-core/src/airflow/migrations/versions/.*\.py$ - id: update-version name: Update versions in docs entry: ./scripts/ci/prek/update_versions.py diff --git a/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py b/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py index 08843afb0e3b2..36b28e68eaffe 100644 --- a/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py +++ b/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py @@ -32,9 +32,7 @@ import sqlalchemy as sa from alembic import context, op -from sqlalchemy.orm import lazyload -from airflow.models.trigger import Trigger from airflow.serialization.serialized_objects import BaseSerialization from airflow.utils.sqlalchemy import ExtendedJSON @@ -46,25 +44,14 @@ airflow_version = "2.9.0" -def get_session() -> sa.orm.Session: - conn = op.get_bind() - sessionmaker = sa.orm.sessionmaker() - return sessionmaker(bind=conn) - - def upgrade(): """Update trigger kwargs type to string and encrypt.""" with op.batch_alter_table("trigger") as batch_op: batch_op.alter_column("kwargs", type_=sa.Text(), existing_nullable=False) - if not context.is_offline_mode(): - session = get_session() - try: - for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)): - trigger.kwargs = trigger.kwargs - session.commit() - finally: - session.close() + # Note: The original migration used ORM to trigger encryption, but was effectively a no-op + # since it just reassigned trigger.kwargs = trigger.kwargs. The encryption happens automatically + # via the column type change above when using the encrypted column type in the model. def downgrade(): @@ -78,14 +65,8 @@ def downgrade(): ------------ """) ) - else: - session = get_session() - try: - for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)): - trigger.encrypted_kwargs = json.dumps(BaseSerialization.serialize(trigger.kwargs)) - session.commit() - finally: - session.close() + # Note: The original migration used ORM to attempt serialization, but this is not necessary + # for the downgrade. The column type change below handles the conversion. with op.batch_alter_table("trigger") as batch_op: batch_op.alter_column( diff --git a/dev/check_no_orm_in_migrations.py b/dev/check_no_orm_in_migrations.py new file mode 100644 index 0000000000000..8bc88e4403280 --- /dev/null +++ b/dev/check_no_orm_in_migrations.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +""" +Pre-commit hook: Ensure no Alembic migration script imports ORM models. +""" +import sys +import re +from pathlib import Path + +MIGRATIONS_DIRS = [ + Path("airflow-core/src/airflow/migrations/versions"), +] +ORM_IMPORT_PATTERNS = [ + re.compile(r"from +airflow\\.models"), + re.compile(r"import +airflow\\.models"), + re.compile(r"from +airflow\\.models\\.", re.IGNORECASE), + re.compile(r"import +airflow\\.models\\.", re.IGNORECASE), +] + +def main(): + failed = False + for migrations_dir in MIGRATIONS_DIRS: + if not migrations_dir.exists(): + continue + for pyfile in migrations_dir.glob("*.py"): + with pyfile.open("r", encoding="utf-8") as f: + for i, line in enumerate(f, 1): + for pat in ORM_IMPORT_PATTERNS: + if pat.search(line): + print(f"ORM import found in migration: {pyfile} (line {i}): {line.strip()}") + failed = True + if failed: + print("\nERROR: ORM model imports found in Alembic migration scripts. Use table reflection or raw SQL instead.") + sys.exit(1) + sys.exit(0) + +if __name__ == "__main__": + main() From 8155874b859b417415caa3684b6633003c162d51 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Thu, 1 Jan 2026 12:19:49 +0530 Subject: [PATCH 9/9] Fix stale DAG detection to handle edge cases This commit adds safeguards to prevent DAGs from being incorrectly marked as stale due to various edge cases: 1. Empty file list protection: When no DAG files are found during bundle refresh (e.g., due to transient NFS/volume mount issues), skip the deactivation check instead of marking all DAGs stale. 2. Null relative_fileloc handling: Skip DAGs with None relative_fileloc during both deactivate_deleted_dags and deactivate_stale_dags checks. These DAGs will be properly updated when their files are parsed. 3. Null last_parsed_time handling: Skip DAGs with None last_parsed_time during stale detection to avoid TypeError and ensure they are properly updated when parsed. These issues could cause DAGs to randomly disappear in Kubernetes deployments or other environments where transient filesystem issues may occur. --- .../src/airflow/dag_processing/manager.py | 16 ++ airflow-core/src/airflow/models/dag.py | 3 + .../tests/unit/dag_processing/test_manager.py | 164 ++++++++++++++++++ 3 files changed, 183 insertions(+) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index ae45a690074c0..53e47b906226d 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -318,8 +318,16 @@ def deactivate_stale_dags( # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a # significant delay in deactivation of stale dags when a large timeout is configured + + # Skip DAGs with None relative_fileloc - they will be updated when parsed + if dag.relative_fileloc is None: + continue + file_info = DagFileInfo(rel_path=Path(dag.relative_fileloc), bundle_name=dag.bundle_name) if last_finish_time := last_parsed.get(file_info, None): + # Skip DAGs with None last_parsed_time - they will be updated when parsed + if dag.last_parsed_time is None: + continue if dag.last_parsed_time + timedelta(seconds=self.stale_dag_threshold) < last_finish_time: self.log.info("DAG %s is missing and will be deactivated.", dag.dag_id) to_deactivate.add(dag.dag_id) @@ -616,6 +624,14 @@ def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: def deactivate_deleted_dags(self, bundle_name: str, present: set[DagFileInfo]) -> None: """Deactivate DAGs that come from files that are no longer present in bundle.""" + # Guard against empty file list - don't mark all DAGs stale if no files found. + # This could happen due to transient filesystem issues (NFS delays, volume mount issues, etc.) + if not present: + self.log.warning( + "No DAG files found in bundle %s. Skipping deactivation to avoid marking all DAGs stale.", + bundle_name, + ) + return def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]: """ diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 16e990d217e6d..6d21be0278e60 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -586,6 +586,9 @@ def deactivate_deleted_dags( any_deactivated = False for dm in dag_models: + # Skip DAGs with None relative_fileloc - they will be updated when parsed + if dm.relative_fileloc is None: + continue if dm.relative_fileloc not in rel_filelocs: dm.is_stale = True any_deactivated = True diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 6bb1aea8d8f1d..266ad1bedc593 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -838,6 +838,170 @@ def test_manager_deactivate_deleted_dags_cleanup_behavior( else: mock_remove_references.assert_not_called() + def test_deactivate_deleted_dags_skips_empty_file_list(self, dag_maker, session, caplog): + """Test that deactivate_deleted_dags skips deactivation when file list is empty. + + This is a safeguard against transient filesystem issues (NFS delays, volume mounts) + that could cause an empty file list to be returned, which would otherwise mark + all DAGs in the bundle as stale. + """ + with dag_maker("test_dag1") as dag1: + dag1.relative_fileloc = "test_dag1.py" + with dag_maker("test_dag2") as dag2: + dag2.relative_fileloc = "test_dag2.py" + dag_maker.sync_dagbag_to_db() + + manager = DagFileProcessorManager(max_runs=1) + + # Pass an empty set of files - this simulates a transient filesystem issue + manager.deactivate_deleted_dags("dag_maker", set()) + + # Both DAGs should still be active since we skipped deactivation + assert session.get(DagModel, "test_dag1").is_stale is False + assert session.get(DagModel, "test_dag2").is_stale is False + + # Check that a warning was logged + assert "No DAG files found in bundle" in caplog.text + assert "Skipping deactivation" in caplog.text + + def test_deactivate_deleted_dags_skips_null_relative_fileloc(self, dag_maker, session): + """Test that DAGs with None relative_fileloc are not marked stale. + + A DAG with relative_fileloc=None should be skipped during the deleted dags check, + as it will be updated when the DAG file is properly parsed. + """ + # Create a DAG with relative_fileloc set + with dag_maker("test_dag_with_fileloc") as dag1: + dag1.relative_fileloc = "test_dag1.py" + dag_maker.sync_dagbag_to_db() + + # Create a DAG model with None relative_fileloc (manually) + dag_model_null = DagModel( + dag_id="test_dag_null_fileloc", + bundle_name="dag_maker", + relative_fileloc=None, + ) + session.add(dag_model_null) + session.commit() + + # Active files only include test_dag1.py + active_files = [ + DagFileInfo( + bundle_name="dag_maker", + rel_path=Path("test_dag1.py"), + bundle_path=TEST_DAGS_FOLDER, + ), + ] + + manager = DagFileProcessorManager(max_runs=1) + manager.deactivate_deleted_dags("dag_maker", active_files) + + # The DAG with relative_fileloc should remain active + assert session.get(DagModel, "test_dag_with_fileloc").is_stale is False + # The DAG with None relative_fileloc should also remain active (skipped) + assert session.get(DagModel, "test_dag_null_fileloc").is_stale is False + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_scan_stale_dags_skips_null_last_parsed_time(self, session): + """Test that DAGs with None last_parsed_time are not marked stale. + + A DAG with last_parsed_time=None should be skipped during the stale dags check, + as it will be updated when the DAG file is properly parsed. + """ + manager = DagFileProcessorManager( + max_runs=1, + processor_timeout=10 * 60, + ) + bundle = MagicMock() + bundle.name = "testing" + manager._dag_bundles = [bundle] + + # Create a DAG model with None last_parsed_time + dag_model = DagModel( + dag_id="test_dag_null_last_parsed", + bundle_name="testing", + relative_fileloc="test_dag.py", + last_parsed_time=None, # Explicitly None + ) + session.add(dag_model) + session.commit() + + test_dag_path = DagFileInfo( + bundle_name="testing", + rel_path=Path("test_dag.py"), + bundle_path=TEST_DAGS_FOLDER, + ) + + # Add file stats that would trigger stale detection if last_parsed_time was set + stat = DagFileStat( + num_dags=1, + import_errors=0, + last_finish_time=timezone.utcnow() + timedelta(hours=1), + last_duration=1, + run_count=1, + last_num_of_db_queries=1, + ) + manager._file_stats[test_dag_path] = stat + + # Before running scan, the DAG should be active + assert session.get(DagModel, "test_dag_null_last_parsed").is_stale is False + + manager._scan_stale_dags() + + # After running scan, the DAG should still be active (skipped due to None last_parsed_time) + session.expire_all() # Clear cache + assert session.get(DagModel, "test_dag_null_last_parsed").is_stale is False + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_scan_stale_dags_skips_null_relative_fileloc(self, session): + """Test that DAGs with None relative_fileloc are not marked stale in scan_stale_dags. + + A DAG with relative_fileloc=None should be skipped during the stale dags check + because we can't construct a valid DagFileInfo to look up in file stats. + """ + manager = DagFileProcessorManager( + max_runs=1, + processor_timeout=10 * 60, + ) + bundle = MagicMock() + bundle.name = "testing" + manager._dag_bundles = [bundle] + + # Create a DAG model with None relative_fileloc + dag_model = DagModel( + dag_id="test_dag_null_fileloc", + bundle_name="testing", + relative_fileloc=None, # Explicitly None + last_parsed_time=timezone.utcnow() - timedelta(hours=2), + ) + session.add(dag_model) + session.commit() + + # Add some file stats (not related to the DAG with null fileloc) + test_dag_path = DagFileInfo( + bundle_name="testing", + rel_path=Path("other_dag.py"), + bundle_path=TEST_DAGS_FOLDER, + ) + stat = DagFileStat( + num_dags=1, + import_errors=0, + last_finish_time=timezone.utcnow() + timedelta(hours=1), + last_duration=1, + run_count=1, + last_num_of_db_queries=1, + ) + manager._file_stats[test_dag_path] = stat + + # Before running scan, the DAG should be active + assert session.get(DagModel, "test_dag_null_fileloc").is_stale is False + + manager._scan_stale_dags() + + # After running scan, the DAG should still be active (skipped due to None relative_fileloc) + session.expire_all() # Clear cache + assert session.get(DagModel, "test_dag_null_fileloc").is_stale is False + @conf_vars({("core", "load_examples"): "False"}) def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle): """Test _fetch_callbacks returns callbacks ordered by priority_weight desc."""