diff --git a/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts b/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts index 42060cac610f3..2c43ff47eae95 100644 --- a/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts +++ b/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts @@ -21,7 +21,10 @@ import { UseDagServiceGetDagDetailsKeyFn, UseDagServiceGetLatestRunInfoKeyFn, UseGridServiceGetGridRunsKeyFn, + useTaskInstanceServiceGetExtraLinksKey, + useTaskInstanceServiceGetLogKey, UseTaskInstanceServiceGetTaskInstancesKeyFn, + useTaskInstanceServiceGetTaskInstanceTryDetailsKey, } from "openapi/queries"; export const gridQueryKeys = (dagId: string) => @@ -32,3 +35,10 @@ export const gridQueryKeys = (dagId: string) => UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]), UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ dagId, dagRunId: "~" }]), ] as const; + +/** Prefix keys for per-attempt TI caches that become stale after any mutation. */ +export const tiPerAttemptQueryKeys = [ + [useTaskInstanceServiceGetLogKey], + [useTaskInstanceServiceGetExtraLinksKey], + [useTaskInstanceServiceGetTaskInstanceTryDetailsKey], +] as const; diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts index 063b1c6d77d86..8c14e36664c52 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts @@ -20,11 +20,17 @@ import { useQueryClient } from "@tanstack/react-query"; import { useState } from "react"; import { useTranslation } from "react-i18next"; -import { useDagRunServiceGetDagRunsKey, useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries"; +import { + useDagRunServiceGetDagRunsKey, + useTaskInstanceServiceGetMappedTaskInstanceKey, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; import { TaskInstanceService } from "openapi/requests/services.gen"; import type { TaskInstanceResponse } from "openapi/requests/types.gen"; import { toaster } from "src/components/ui"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; + type Props = { readonly clearSelections: VoidFunction; readonly onSuccessConfirm: VoidFunction; @@ -46,10 +52,15 @@ export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm }: const [isPending, setIsPending] = useState(false); const { t: translate } = useTranslation(["common", "dags"]); - const invalidateQueries = async () => { + const invalidateQueries = async (dagIds: ReadonlySet) => { await Promise.all([ queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetMappedTaskInstanceKey] }), + ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })), + ...[...dagIds].flatMap((dagId) => + gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ queryKey: key })), + ), ]); }; @@ -92,7 +103,7 @@ export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm }: ), ); - await invalidateQueries(); + await invalidateQueries(new Set([...byDagRun.values()].map(({ dagId }) => dagId))); toaster.create({ description: translate("toaster.bulkClear.success.description", { diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts index 8ad4d616197f5..ac47c550b3724 100644 --- a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts @@ -32,6 +32,8 @@ import type { } from "openapi/requests/types.gen"; import { toaster } from "src/components/ui"; +import { tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; + type Props = { readonly clearSelections: VoidFunction; readonly onSuccessConfirm: VoidFunction; @@ -62,6 +64,7 @@ export const useBulkTaskInstances = ({ clearSelections, onSuccessConfirm }: Prop await Promise.all([ queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }), queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }), + ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })), ]); const isDelete = Boolean(responseData.delete); diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts index f4b23c90bd47a..18629915a7517 100644 --- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts +++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts @@ -28,7 +28,7 @@ import { } from "openapi/queries"; import { createErrorToaster } from "src/utils"; -import { gridQueryKeys } from "./gridViewQueryKeys"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun"; export const useClearDagRun = ({ @@ -61,6 +61,7 @@ export const useClearDagRun = ({ [useDagRunServiceGetDagRunsKey], [useClearDagRunDryRunKey, dagId], UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }), + ...tiPerAttemptQueryKeys, ]; await Promise.all([ diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts index 2d70c49bc5bd5..33103445472de 100644 --- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts @@ -31,7 +31,7 @@ import type { ApiError } from "openapi/requests"; import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from "openapi/requests/types.gen"; import { toaster } from "src/components/ui"; -import { gridQueryKeys } from "./gridViewQueryKeys"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun"; import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun"; @@ -119,6 +119,7 @@ export const useClearTaskInstances = ({ [useClearTaskInstancesDryRunKey, dagId], [usePatchTaskInstanceDryRunKey, dagId, dagRunId], UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }), + ...tiPerAttemptQueryKeys, ]; await Promise.all([ diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts index fc45986ac268c..c92dcdd3a4e62 100644 --- a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts +++ b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts @@ -23,11 +23,13 @@ import { useDagRunServiceDeleteDagRun, useDagRunServiceGetDagRunsKey, UseDagRunServiceGetDagRunKeyFn, - useTaskInstanceServiceGetTaskInstancesKey, + UseGanttServiceGetGanttDataKeyFn, useTaskInstanceServiceGetHitlDetailsKey, + useTaskInstanceServiceGetMappedTaskInstanceKey, + useTaskInstanceServiceGetTaskInstancesKey, } from "openapi/queries"; import { toaster } from "src/components/ui"; -import { gridQueryKeys } from "src/queries/gridViewQueryKeys"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "src/queries/gridViewQueryKeys"; import { createErrorToaster } from "src/utils"; type DeleteDagRunParams = { @@ -57,6 +59,9 @@ export const useDeleteDagRun = ({ dagId, dagRunId, onSuccessConfirm }: DeleteDag [useDagRunServiceGetDagRunsKey], [useTaskInstanceServiceGetTaskInstancesKey], [useTaskInstanceServiceGetHitlDetailsKey], + UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }), + [useTaskInstanceServiceGetMappedTaskInstanceKey], + ...tiPerAttemptQueryKeys, ]; await Promise.all([ diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts index cdf07e4905646..4a387d86cd7c2 100644 --- a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts +++ b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts @@ -25,11 +25,15 @@ import { useTaskInstanceServiceGetTaskInstancesKey, useDagRunServiceGetDagRunsKey, UseDagRunServiceGetDagRunKeyFn, + UseGanttServiceGetGanttDataKeyFn, useTaskInstanceServiceGetHitlDetailsKey, + useTaskInstanceServiceGetMappedTaskInstanceKey, } from "openapi/queries"; import { toaster } from "src/components/ui"; import { createErrorToaster } from "src/utils"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; + type DeleteTaskInstanceParams = { dagId: string; dagRunId: string; @@ -66,9 +70,15 @@ export const useDeleteTaskInstance = ({ [useTaskInstanceServiceGetTaskInstancesKey], [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, taskId }], [useTaskInstanceServiceGetHitlDetailsKey], + UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }), + [useTaskInstanceServiceGetMappedTaskInstanceKey], + ...tiPerAttemptQueryKeys, ]; - await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); + await Promise.all([ + ...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })), + ...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ queryKey: key })), + ]); toaster.create({ description: translate("dags:runAndTaskActions.delete.success.description", { diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts index 35ff56988fcb1..953f1d149339b 100644 --- a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts +++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts @@ -16,12 +16,24 @@ * specific language governing permissions and limitations * under the License. */ +import { useQueryClient } from "@tanstack/react-query"; import { useEffect, useState } from "react"; +import { + useDagRunServiceGetDagRunsKey, + useGridServiceGetGridRunsKey, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; import type { GridTISummaries, TaskInstanceState } from "openapi/requests"; import { OpenAPI } from "openapi/requests/core/OpenAPI"; import { isStatePending, useAutoRefresh } from "src/utils"; +const GRID_MUTATION_WATCHED_KEYS = new Set([ + useTaskInstanceServiceGetTaskInstancesKey, + useGridServiceGetGridRunsKey, + useDagRunServiceGetDagRunsKey, +]); + /** * Streams TI summaries for all grid runs over a single HTTP connection (NDJSON). * @@ -41,6 +53,7 @@ export const useGridTiSummariesStream = ({ runIds: Array; states?: Array; }) => { + const queryClient = useQueryClient(); const [summariesByRunId, setSummariesByRunId] = useState>(new Map()); const [refreshTick, setRefreshTick] = useState(0); @@ -124,5 +137,34 @@ export const useGridTiSummariesStream = ({ return () => clearInterval(timer); }, [hasActiveRuns, baseRefetchInterval]); + // Re-stream whenever a mutation invalidates a grid-related query (TI states, + // run states, or grid structure). Invalidation events only fire from explicit + // invalidateQueries() calls — never from polling intervals — so this never + // double-fires with the interval-based refresh above. + useEffect(() => { + let timeoutId: ReturnType | undefined; + + const unsubscribe = queryClient.getQueryCache().subscribe((event) => { + const [firstKey] = event.query.queryKey as Array; + + if ( + event.type === "updated" && + event.action.type === "invalidate" && + typeof firstKey === "string" && + GRID_MUTATION_WATCHED_KEYS.has(firstKey) + ) { + // Debounce: a single mutation invalidates several matching queries in one tick. + clearTimeout(timeoutId); + // eslint-disable-next-line max-nested-callbacks + timeoutId = setTimeout(() => setRefreshTick((tick) => tick + 1), 0); + } + }); + + return () => { + unsubscribe(); + clearTimeout(timeoutId); + }; + }, [queryClient]); + return { summariesByRunId }; }; diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts index 6b78396f89173..c032926dc95f0 100644 --- a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts +++ b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts @@ -23,11 +23,14 @@ import { UseDagRunServiceGetDagRunKeyFn, useDagRunServiceGetDagRunsKey, useDagRunServicePatchDagRun, + UseGanttServiceGetGanttDataKeyFn, + useTaskInstanceServiceGetMappedTaskInstanceKey, + useTaskInstanceServiceGetTaskInstanceKey, useTaskInstanceServiceGetTaskInstancesKey, } from "openapi/queries"; import { createErrorToaster } from "src/utils"; -import { gridQueryKeys } from "./gridViewQueryKeys"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun"; export const usePatchDagRun = ({ @@ -58,7 +61,11 @@ export const usePatchDagRun = ({ UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), [useDagRunServiceGetDagRunsKey], [useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }], + [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId }], + [useTaskInstanceServiceGetMappedTaskInstanceKey, { dagId, dagRunId }], [useClearDagRunDryRunKey, dagId], + UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }), + ...tiPerAttemptQueryKeys, ]; await Promise.all([ diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts b/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts index f3e95c5dd8d0c..dcdcaca8c521a 100644 --- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts +++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts @@ -25,7 +25,7 @@ import { } from "openapi/queries"; import { createErrorToaster } from "src/utils"; -import { gridQueryKeys } from "./gridViewQueryKeys"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun"; import { usePatchTaskGroupDryRunKey } from "./usePatchTaskGroupDryRun"; @@ -59,6 +59,7 @@ export const usePatchTaskGroup = ({ [useTaskInstanceServiceGetTaskInstancesKey], [usePatchTaskGroupDryRunKey, dagId, dagRunId, groupId], [useClearTaskInstancesDryRunKey, dagId], + ...tiPerAttemptQueryKeys, ]; await Promise.all([ diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts index 33e1f689e4bfa..0f0a5988ed8d3 100644 --- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts +++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts @@ -28,7 +28,7 @@ import { } from "openapi/queries"; import { createErrorToaster } from "src/utils"; -import { gridQueryKeys } from "./gridViewQueryKeys"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun"; import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun"; @@ -65,6 +65,7 @@ export const usePatchTaskInstance = ({ [useTaskInstanceServiceGetTaskInstancesKey], [usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }], [useClearTaskInstancesDryRunKey, dagId], + ...tiPerAttemptQueryKeys, ]; if (mapIndex !== undefined) { diff --git a/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts b/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts index 656ba03f82ede..358b8b87bbc38 100644 --- a/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts +++ b/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts @@ -23,6 +23,7 @@ import { useTranslation } from "react-i18next"; import { UseDagRunServiceGetDagRunKeyFn, useDagRunServiceGetDagRunsKey, + UseGanttServiceGetGanttDataKeyFn, useTaskInstanceServiceGetHitlDetailsKey, useTaskInstanceServiceGetHitlDetailKey, useTaskInstanceServiceUpdateHitlDetail, @@ -33,6 +34,8 @@ import { toaster } from "src/components/ui/Toaster"; import { createErrorToaster } from "src/utils"; import type { HITLResponseParams } from "src/utils/hitl"; +import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys"; + export const useUpdateHITLDetail = ({ dagId, dagRunId, @@ -55,9 +58,14 @@ export const useUpdateHITLDetail = ({ [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, taskId }], [useTaskInstanceServiceGetHitlDetailsKey, { dagIdPrefixPattern: dagId, dagRunId }], [useTaskInstanceServiceGetHitlDetailKey, { dagId, dagRunId }], + UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }), + ...tiPerAttemptQueryKeys, ]; - await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); + await Promise.all([ + ...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })), + ...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ queryKey: key })), + ]); toaster.create({ title: translate("response.success", { taskId }),