Skip to content

Commit

Permalink
Add XCom tab to Grid (#35719)
Browse files Browse the repository at this point in the history
* Add XCom tab to Grid

* Combine showLogs and showXcom logic evaluation to isIndividualTaskInstance

* Remove link to /xcom page from UI grid view

* Use consistent naming to distinguish XcomCollection and XcomEntry

* Refactor boolean vars

(cherry picked from commit 77c0103)
  • Loading branch information
hduong-mwam authored and ephraimbuddy committed Dec 5, 2023
1 parent beba3b8 commit be86dd3
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 20 deletions.
3 changes: 3 additions & 0 deletions airflow/www/static/js/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import usePools from "./usePools";
import useDags from "./useDags";
import useDagRuns from "./useDagRuns";
import useHistoricalMetricsData from "./useHistoricalMetricsData";
import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom";

axios.interceptors.request.use((config) => {
config.paramsSerializer = {
Expand Down Expand Up @@ -91,4 +92,6 @@ export {
useTaskInstance,
useUpstreamDatasetEvents,
useHistoricalMetricsData,
useTaskXcomEntry,
useTaskXcomCollection,
};
71 changes: 71 additions & 0 deletions airflow/www/static/js/api/useTaskXcom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import type { API } from "src/types";
import { getMetaValue } from "src/utils";
import { useQuery } from "react-query";
import axios, { AxiosResponse } from "axios";

// tryNumber is not required to get XCom keys or values but is used
// in query key so refetch will occur if new tries are available
interface TaskXcomCollectionProps extends API.GetXcomEntriesVariables {
tryNumber: number;
}
interface TaskXcomProps extends API.GetXcomEntryVariables {
tryNumber: number;
}

export const useTaskXcomCollection = ({
dagId,
dagRunId,
taskId,
mapIndex,
tryNumber,
}: TaskXcomCollectionProps) =>
useQuery(["taskXcoms", dagId, dagRunId, taskId, mapIndex, tryNumber], () =>
axios.get<AxiosResponse, API.XComCollection>(
getMetaValue("task_xcom_entries_api")
.replace("_DAG_RUN_ID_", dagRunId)
.replace("_TASK_ID_", taskId),
{ params: { map_index: mapIndex } }
)
);

export const useTaskXcomEntry = ({
dagId,
dagRunId,
taskId,
mapIndex,
xcomKey,
tryNumber,
}: TaskXcomProps) =>
useQuery(
["taskXcom", dagId, dagRunId, taskId, mapIndex, xcomKey, tryNumber],
() =>
axios.get<AxiosResponse, API.XCom>(
getMetaValue("task_xcom_entry_api")
.replace("_DAG_RUN_ID_", dagRunId)
.replace("_TASK_ID_", taskId)
.replace("_XCOM_KEY_", xcomKey),
{ params: { map_index: mapIndex } }
),
{
enabled: !!xcomKey,
}
);
71 changes: 55 additions & 16 deletions airflow/www/static/js/dag/details/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
MdReorder,
MdCode,
MdOutlineViewTimeline,
MdSyncAlt,
} from "react-icons/md";
import { BiBracket } from "react-icons/bi";
import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper";
Expand All @@ -58,6 +59,7 @@ import ClearRun from "./dagRun/ClearRun";
import MarkRunAs from "./dagRun/MarkRunAs";
import ClearInstance from "./taskInstance/taskActions/ClearInstance";
import MarkInstanceAs from "./taskInstance/taskActions/MarkInstanceAs";
import XcomCollection from "./taskInstance/Xcom";

const dagId = getMetaValue("dag_id")!;

Expand All @@ -80,6 +82,8 @@ const tabToIndex = (tab?: string) => {
case "logs":
case "mapped_tasks":
return 4;
case "xcom":
return 5;
case "details":
default:
return 0;
Expand All @@ -89,8 +93,8 @@ const tabToIndex = (tab?: string) => {
const indexToTab = (
index: number,
taskId: string | null,
showLogs: boolean,
showMappedTasks: boolean
isTaskInstance: boolean,
isMappedTaskSummary: boolean
) => {
switch (index) {
case 1:
Expand All @@ -100,8 +104,11 @@ const indexToTab = (
case 3:
return "code";
case 4:
if (showMappedTasks) return "mapped_tasks";
if (showLogs) return "logs";
if (isMappedTaskSummary) return "mapped_tasks";
if (isTaskInstance) return "logs";
return undefined;
case 5:
if (isTaskInstance) return "xcom";
return undefined;
case 0:
default:
Expand All @@ -124,20 +131,28 @@ const Details = ({
} = useSelection();
const isDag = !runId && !taskId;
const isDagRun = runId && !taskId;
const isTaskInstance = taskId && runId;

const {
data: { dagRuns, groups },
} = useGridData();
const group = getTask({ taskId, task: groups });
const children = group?.children;
const isMapped = group?.isMapped;

const isMappedTaskSummary = isMapped && mapIndex === undefined && taskId;
const isGroup = !!children;
const isGroupOrMappedTaskSummary = isGroup || isMappedTaskSummary;
const showLogs = !!(isTaskInstance && !isGroupOrMappedTaskSummary);
const showMappedTasks = !!(isTaskInstance && isMappedTaskSummary && !isGroup);

const isMappedTaskSummary = !!(
taskId &&
runId &&
!isGroup &&
isMapped &&
mapIndex === undefined
);
const isTaskInstance = !!(
taskId &&
runId &&
!isGroup &&
!isMappedTaskSummary
);

const [searchParams, setSearchParams] = useSearchParams();
const tab = searchParams.get(TAB_PARAM) || undefined;
Expand All @@ -146,12 +161,17 @@ const Details = ({
const onChangeTab = useCallback(
(index: number) => {
const params = new URLSearchParamsWrapper(searchParams);
const newTab = indexToTab(index, taskId, showLogs, showMappedTasks);
const newTab = indexToTab(
index,
taskId,
isTaskInstance,
isMappedTaskSummary
);
if (newTab) params.set(TAB_PARAM, newTab);
else params.delete(TAB_PARAM);
setSearchParams(params);
},
[setSearchParams, searchParams, showLogs, showMappedTasks, taskId]
[setSearchParams, searchParams, isTaskInstance, isMappedTaskSummary, taskId]
);

useEffect(() => {
Expand Down Expand Up @@ -252,22 +272,30 @@ const Details = ({
Code
</Text>
</Tab>
{showLogs && (
{isTaskInstance && (
<Tab>
<MdReorder size={16} />
<Text as="strong" ml={1}>
Logs
</Text>
</Tab>
)}
{showMappedTasks && (
{isMappedTaskSummary && (
<Tab>
<BiBracket size={16} />
<Text as="strong" ml={1}>
Mapped Tasks
</Text>
</Tab>
)}
{isTaskInstance && (
<Tab>
<MdSyncAlt size={16} />
<Text as="strong" ml={1}>
XCom
</Text>
</Tab>
)}
</TabList>
<TabPanels height="100%">
<TabPanel height="100%">
Expand Down Expand Up @@ -304,7 +332,7 @@ const Details = ({
<TabPanel height="100%">
<DagCode />
</TabPanel>
{showLogs && run && (
{isTaskInstance && run && (
<TabPanel
pt={mapIndex !== undefined ? "0px" : undefined}
height="100%"
Expand All @@ -324,7 +352,7 @@ const Details = ({
/>
</TabPanel>
)}
{showMappedTasks && (
{isMappedTaskSummary && (
<TabPanel height="100%">
<MappedInstances
dagId={dagId}
Expand All @@ -336,6 +364,17 @@ const Details = ({
/>
</TabPanel>
)}
{isTaskInstance && (
<TabPanel height="100%">
<XcomCollection
dagId={dagId}
dagRunId={runId}
taskId={taskId}
mapIndex={mapIndex}
tryNumber={instance?.tryNumber}
/>
</TabPanel>
)}
</TabPanels>
</Tabs>
</Flex>
Expand Down
3 changes: 0 additions & 3 deletions airflow/www/static/js/dag/details/taskInstance/Nav.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const isK8sExecutor = getMetaValue("k8s_or_k8scelery_executor") === "True";
const taskInstancesUrl = getMetaValue("task_instances_list_url");
const renderedK8sUrl = getMetaValue("rendered_k8s_url");
const renderedTemplatesUrl = getMetaValue("rendered_templates_url");
const xcomUrl = getMetaValue("xcom_url");
const taskUrl = getMetaValue("task_url");
const gridUrl = getMetaValue("grid_url");

Expand All @@ -52,7 +51,6 @@ const Nav = forwardRef<HTMLDivElement, Props>(
});
const detailsLink = `${taskUrl}&${params}`;
const renderedLink = `${renderedTemplatesUrl}&${params}`;
const xcomLink = `${xcomUrl}&${params}`;
const k8sLink = `${renderedK8sUrl}&${params}`;
const listParams = new URLSearchParamsWrapper({
_flt_3_dag_id: dagId,
Expand Down Expand Up @@ -88,7 +86,6 @@ const Nav = forwardRef<HTMLDivElement, Props>(
{isSubDag && (
<LinkButton href={subDagLink}>Zoom into SubDag</LinkButton>
)}
<LinkButton href={xcomLink}>XCom</LinkButton>
</>
)}
<LinkButton
Expand Down
82 changes: 82 additions & 0 deletions airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react";
import React from "react";
import { useTaskXcomEntry } from "src/api";
import type { Dag, DagRun, TaskInstance } from "src/types";

interface Props {
dagId: Dag["id"];
dagRunId: DagRun["runId"];
taskId: TaskInstance["taskId"];
mapIndex?: TaskInstance["mapIndex"];
xcomKey: string;
tryNumber: TaskInstance["tryNumber"];
}

const XcomEntry = ({
dagId,
dagRunId,
taskId,
mapIndex,
xcomKey,
tryNumber,
}: Props) => {
const {
data: xcom,
isLoading,
error,
} = useTaskXcomEntry({
dagId,
dagRunId,
taskId,
mapIndex,
xcomKey,
tryNumber: tryNumber || 1,
});

let content = <Text fontFamily="monospace">{xcom?.value}</Text>;
if (isLoading) {
content = <Spinner />;
} else if (error) {
content = (
<Alert status="error">
<AlertIcon />
Error loading XCom entry
</Alert>
);
} else if (!xcom) {
content = (
<Alert status="info">
<AlertIcon />
No value found for XCom key
</Alert>
);
}

return (
<Tr>
<Td>{xcomKey}</Td>
<Td>{content}</Td>
</Tr>
);
};

export default XcomEntry;

0 comments on commit be86dd3

Please sign in to comment.