From 2cb78c4263ff06a70fee9f79c97e1702920b49d3 Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Thu, 22 Feb 2024 15:46:50 -0500 Subject: [PATCH 1/4] Filter datasets graph by dag_id (#37464) * Initial filter datasets graph by dag_id * Fix bug with merging dataset subgraphs * Fix index for dataset group merging * Add tooltip --- .../static/js/api/useDatasetDependencies.ts | 121 ++++++++++++------ airflow/www/static/js/datasets/DagFilter.tsx | 117 +++++++++++++++++ .../www/static/js/datasets/Graph/DagNode.tsx | 37 +++++- airflow/www/static/js/datasets/Graph/Node.tsx | 11 +- .../www/static/js/datasets/Graph/index.tsx | 100 +++++++++------ airflow/www/static/js/datasets/List.test.tsx | 18 ++- airflow/www/static/js/datasets/List.tsx | 29 ++++- airflow/www/static/js/datasets/Main.tsx | 49 +++++-- 8 files changed, 371 insertions(+), 111 deletions(-) create mode 100644 airflow/www/static/js/datasets/DagFilter.tsx diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts b/airflow/www/static/js/api/useDatasetDependencies.ts index 2680ee9a782d45..42308f87a463d9 100644 --- a/airflow/www/static/js/api/useDatasetDependencies.ts +++ b/airflow/www/static/js/api/useDatasetDependencies.ts @@ -26,7 +26,7 @@ import { getTextWidth } from "src/utils/graph"; import type { NodeType, DepEdge, DepNode } from "src/types"; -interface DatasetDependencies { +export interface DatasetDependencies { edges: DepEdge[]; nodes: DepNode[]; } @@ -41,16 +41,11 @@ interface GenerateProps { font: string; } -interface Graph extends ElkShape { +export interface DatasetGraph extends ElkShape { children: NodeType[]; edges: ElkExtendedEdge[]; } -interface Data { - fullGraph: Graph; - subGraphs: Graph[]; -} - const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({ id: "root", layoutOptions: { @@ -91,32 +86,43 @@ const findDownstreamGraph = ({ }: SeparateGraphsProps): EdgeGroup[] => { let unassignedEdges = [...edges]; + const otherIndexes: number[] = []; + const mergedGraphs = graphs .reduce((newGraphs, graph) => { - const otherGroupIndex = newGraphs.findIndex((otherGroup) => - otherGroup.edges.some((otherEdge) => - graph.edges.some((edge) => edge.target === otherEdge.target) - ) + // Find all overlapping graphs where at least one edge in each graph has the same target node + const otherGroups = newGraphs.filter((otherGroup, i) => + otherGroup.edges.some((otherEdge) => { + if (graph.edges.some((edge) => edge.target === otherEdge.target)) { + otherIndexes.push(i); + return true; + } + return false; + }) ); - if (otherGroupIndex === -1) { + if (!otherGroups.length) { return [...newGraphs, graph]; } - const mergedEdges = [ - ...newGraphs[otherGroupIndex].edges, - ...graph.edges, - ].filter( - (edge, edgeIndex, otherEdges) => - edgeIndex === - otherEdges.findIndex( - (otherEdge) => - otherEdge.source === edge.source && - otherEdge.target === edge.target - ) - ); + // Merge the edges of every overlapping group + const mergedEdges = otherGroups + .reduce( + (totalEdges, group) => [...totalEdges, ...group.edges], + [...graph.edges] + ) + .filter( + (edge, edgeIndex, otherEdges) => + edgeIndex === + otherEdges.findIndex( + (otherEdge) => + otherEdge.source === edge.source && + otherEdge.target === edge.target + ) + ); return [ + // filter out the merged graphs ...newGraphs.filter( - (_, newGraphIndex) => newGraphIndex !== otherGroupIndex + (_, newGraphIndex) => !otherIndexes.includes(newGraphIndex) ), { edges: mergedEdges }, ]; @@ -180,33 +186,64 @@ const separateGraphs = ({ const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => { const elk = new ELK(); - const graphs = separateGraphs({ edges, nodes }); - // get computed style to calculate how large each node should be const font = `bold ${16}px ${ window.getComputedStyle(document.body).fontFamily }`; - // Finally generate the graph data with elk - const subGraphs = await Promise.all( - graphs.map(async (g) => - elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font })) - ) - ); - const fullGraph = await elk.layout(generateGraph({ nodes, edges, font })); + const graph = await elk.layout(generateGraph({ nodes, edges, font })); - return { - fullGraph, - subGraphs, - } as Data; + return graph as DatasetGraph; }; export default function useDatasetDependencies() { return useQuery("datasetDependencies", async () => { const datasetDepsUrl = getMetaValue("dataset_dependencies_url"); - const rawData = await axios.get( - datasetDepsUrl - ); - return formatDependencies(rawData); + return axios.get(datasetDepsUrl); }); } + +interface GraphsProps { + dagIds?: string[]; + selectedUri: string | null; +} + +export const useDatasetGraphs = ({ dagIds, selectedUri }: GraphsProps) => { + const { data: datasetDependencies } = useDatasetDependencies(); + return useQuery( + ["datasetGraphs", datasetDependencies, dagIds, selectedUri], + () => { + if (datasetDependencies) { + let graph = datasetDependencies; + const subGraphs = datasetDependencies + ? separateGraphs(datasetDependencies) + : []; + + // Filter by dataset URI takes precedence + if (selectedUri) { + graph = + subGraphs.find((g) => + g.nodes.some((n) => n.value.label === selectedUri) + ) || graph; + } else if (dagIds?.length) { + const filteredSubGraphs = subGraphs.filter((sg) => + dagIds.some((dagId) => + sg.nodes.some((c) => c.value.label === dagId) + ) + ); + + graph = filteredSubGraphs.reduce( + (graphs, subGraph) => ({ + edges: [...graphs.edges, ...subGraph.edges], + nodes: [...graphs.nodes, ...subGraph.nodes], + }), + { edges: [], nodes: [] } + ); + } + + return formatDependencies(graph); + } + return undefined; + } + ); +}; diff --git a/airflow/www/static/js/datasets/DagFilter.tsx b/airflow/www/static/js/datasets/DagFilter.tsx new file mode 100644 index 00000000000000..71b923594d6d69 --- /dev/null +++ b/airflow/www/static/js/datasets/DagFilter.tsx @@ -0,0 +1,117 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { HStack } from "@chakra-ui/react"; +import { Size, useChakraSelectProps } from "chakra-react-select"; + +import type { DatasetDependencies } from "src/api/useDatasetDependencies"; +import MultiSelect from "src/components/MultiSelect"; +import InfoTooltip from "src/components/InfoTooltip"; + +interface Props { + datasetDependencies?: DatasetDependencies; + filteredDagIds: string[]; + onFilterDags: (dagIds: string[]) => void; +} + +const transformArrayToMultiSelectOptions = ( + options: string[] | null +): { label: string; value: string }[] => + options === null + ? [] + : options.map((option) => ({ label: option, value: option })); + +const DagFilter = ({ + datasetDependencies, + filteredDagIds, + onFilterDags, +}: Props) => { + const dagIds = (datasetDependencies?.nodes || []) + .filter((node) => node.value.class === "dag") + .map((dag) => dag.value.label); + const options = dagIds.map((dagId) => ({ label: dagId, value: dagId })); + + const inputStyles: { backgroundColor: string; size: Size } = { + backgroundColor: "white", + size: "lg", + }; + const multiSelectStyles = useChakraSelectProps({ + ...inputStyles, + isMulti: true, + tagVariant: "solid", + hideSelectedOptions: false, + isClearable: false, + selectedOptionStyle: "check", + chakraStyles: { + container: (p) => ({ + ...p, + width: "100%", + }), + placeholder: (p) => ({ + ...p, + color: "gray.700", + fontSize: "md", + }), + inputContainer: (p) => ({ + ...p, + color: "gray.700", + fontSize: "md", + }), + downChevron: (p) => ({ + ...p, + fontSize: "lg", + }), + control: (p) => ({ + ...p, + cursor: "pointer", + }), + option: (p) => ({ + ...p, + transition: "background-color 0.2s", + _hover: { + bg: "gray.100", + }, + }), + }, + }); + + return ( + + { + if ( + Array.isArray(dagOptions) && + dagOptions.every((dagOption) => "value" in dagOption) + ) { + onFilterDags(dagOptions.map((option) => option.value)); + } + }} + options={options} + placeholder="Filter graph by DAG ID" + /> + + + ); +}; + +export default DagFilter; diff --git a/airflow/www/static/js/datasets/Graph/DagNode.tsx b/airflow/www/static/js/datasets/Graph/DagNode.tsx index aa12c575221f11..8a686f8acc78dc 100644 --- a/airflow/www/static/js/datasets/Graph/DagNode.tsx +++ b/airflow/www/static/js/datasets/Graph/DagNode.tsx @@ -19,13 +19,14 @@ import React from "react"; import { + Button, Flex, Link, Popover, PopoverArrow, - PopoverBody, PopoverCloseButton, PopoverContent, + PopoverFooter, PopoverHeader, PopoverTrigger, Portal, @@ -40,9 +41,13 @@ import { getMetaValue } from "src/utils"; const DagNode = ({ dagId, isHighlighted, + isSelected, + onSelect, }: { dagId: string; isHighlighted?: boolean; + isSelected?: boolean; + onSelect?: (dagId: string, type: string) => void; }) => { const { colors } = useTheme(); const containerRef = useContainerRef(); @@ -52,9 +57,12 @@ const DagNode = ({ {dagId} - - + + + + diff --git a/airflow/www/static/js/datasets/Graph/Node.tsx b/airflow/www/static/js/datasets/Graph/Node.tsx index ed02653139c4ec..425f25e3c9e3f3 100644 --- a/airflow/www/static/js/datasets/Graph/Node.tsx +++ b/airflow/www/static/js/datasets/Graph/Node.tsx @@ -32,7 +32,7 @@ export interface CustomNodeProps { width?: number; isSelected?: boolean; isHighlighted?: boolean; - onSelect: (datasetUri: string) => void; + onSelect: (datasetUri: string, type: string) => void; isOpen?: boolean; isActive?: boolean; } @@ -45,7 +45,12 @@ const BaseNode = ({ return ( {type === "dag" && ( - + )} {type !== "dag" && ( { e.preventDefault(); e.stopPropagation(); - if (type === "dataset") onSelect(label); + onSelect(label, "dataset"); }} cursor="pointer" fontSize={16} diff --git a/airflow/www/static/js/datasets/Graph/index.tsx b/airflow/www/static/js/datasets/Graph/index.tsx index f137be36d10736..d40c53bc6ac27e 100644 --- a/airflow/www/static/js/datasets/Graph/index.tsx +++ b/airflow/www/static/js/datasets/Graph/index.tsx @@ -31,9 +31,9 @@ import ReactFlow, { import { Box, Tooltip, useTheme } from "@chakra-ui/react"; import { RiFocus3Line } from "react-icons/ri"; -import { useDatasetDependencies } from "src/api"; import Edge from "src/components/Graph/Edge"; import { useContainerRef } from "src/context/containerRef"; +import { useDatasetGraphs } from "src/api/useDatasetDependencies"; import Node, { CustomNodeProps } from "./Node"; import Legend from "./Legend"; @@ -41,17 +41,28 @@ import Legend from "./Legend"; interface Props { onSelect: (datasetId: string) => void; selectedUri: string | null; + filteredDagIds: string[]; + onFilterDags: (dagIds: string[]) => void; } const nodeTypes = { custom: Node }; const edgeTypes = { custom: Edge }; -const Graph = ({ onSelect, selectedUri }: Props) => { - const { data } = useDatasetDependencies(); +const Graph = ({ + onSelect, + selectedUri, + filteredDagIds, + onFilterDags, +}: Props) => { const { colors } = useTheme(); const { setCenter, setViewport } = useReactFlow(); const containerRef = useContainerRef(); + const { data: graph } = useDatasetGraphs({ + dagIds: filteredDagIds, + selectedUri, + }); + useEffect(() => { setViewport({ x: 0, y: 0, zoom: 1 }); }, [selectedUri, setViewport]); @@ -61,46 +72,51 @@ const Graph = ({ onSelect, selectedUri }: Props) => { }: ReactFlowNode) => isSelected ? colors.blue["300"] : colors.gray["300"]; - if (!data || !data.fullGraph || !data.subGraphs) return null; - const graph = selectedUri - ? data.subGraphs.find((g) => - g.children.some((n) => n.id === `dataset:${selectedUri}`) - ) - : data.fullGraph; - if (!graph) return null; - - const edges = graph.edges.map((e) => ({ - id: e.id, - source: e.sources[0], - target: e.targets[0], - type: "custom", - data: { - rest: { - ...e, - isSelected: selectedUri && e.id.includes(selectedUri), + const edges = + graph?.edges?.map((e) => ({ + id: e.id, + source: e.sources[0], + target: e.targets[0], + type: "custom", + data: { + rest: { + ...e, + isSelected: selectedUri && e.id.includes(selectedUri), + }, + }, + })) || []; + + const handleSelect = (id: string, type: string) => { + if (type === "dataset") onSelect(id); + if (type === "dag") { + if (filteredDagIds.includes(id)) + onFilterDags(filteredDagIds.filter((dagId) => dagId !== id)); + else onFilterDags([...filteredDagIds, id]); + } + }; + + const nodes: ReactFlowNode[] = + graph?.children?.map((c) => ({ + id: c.id, + data: { + label: c.value.label, + type: c.value.class, + width: c.width, + height: c.height, + onSelect: handleSelect, + isSelected: + selectedUri === c.value.label || + (c.value.class === "dag" && filteredDagIds.includes(c.value.label)), + isHighlighted: edges.some( + (e) => e.data.rest.isSelected && e.id.includes(c.id) + ), + }, + type: "custom", + position: { + x: c.x || 0, + y: c.y || 0, }, - }, - })); - - const nodes: ReactFlowNode[] = graph.children.map((c) => ({ - id: c.id, - data: { - label: c.value.label, - type: c.value.class, - width: c.width, - height: c.height, - onSelect, - isSelected: selectedUri === c.value.label, - isHighlighted: edges.some( - (e) => e.data.rest.isSelected && e.id.includes(c.id) - ), - }, - type: "custom", - position: { - x: c.x || 0, - y: c.y || 0, - }, - })); + })) || []; const focusNode = () => { if (selectedUri) { diff --git a/airflow/www/static/js/datasets/List.test.tsx b/airflow/www/static/js/datasets/List.test.tsx index f0c15230292200..64a5e3a4493354 100644 --- a/airflow/www/static/js/datasets/List.test.tsx +++ b/airflow/www/static/js/datasets/List.test.tsx @@ -87,7 +87,11 @@ describe("Test Datasets List", () => { .mockImplementation(() => returnValue); const { getByText, queryAllByTestId } = render( - {}} />, + {}} + filteredDagIds={[]} + onFilterDags={() => {}} + />, { wrapper: Wrapper } ); @@ -111,7 +115,11 @@ describe("Test Datasets List", () => { .mockImplementation(() => emptyReturnValue); const { getByText, queryAllByTestId, getByTestId } = render( - {}} />, + {}} + filteredDagIds={[]} + onFilterDags={() => {}} + />, { wrapper: Wrapper } ); @@ -130,7 +138,11 @@ describe("Test Datasets List", () => { const { getByDisplayValue } = render( - {}} /> + {}} + filteredDagIds={[]} + onFilterDags={() => {}} + /> ); diff --git a/airflow/www/static/js/datasets/List.tsx b/airflow/www/static/js/datasets/List.tsx index 9d83406d7f4f6b..69b945d192cad3 100644 --- a/airflow/www/static/js/datasets/List.tsx +++ b/airflow/www/static/js/datasets/List.tsx @@ -42,9 +42,14 @@ import { Table, TimeCell } from "src/components/Table"; import type { API } from "src/types"; import { getMetaValue } from "src/utils"; import type { DateOption } from "src/api/useDatasetsSummary"; +import type { DatasetDependencies } from "src/api/useDatasetDependencies"; +import DagFilter from "./DagFilter"; interface Props { onSelect: (datasetId: string) => void; + datasetDependencies?: DatasetDependencies; + filteredDagIds: string[]; + onFilterDags: (dagIds: string[]) => void; } interface CellProps { @@ -80,7 +85,12 @@ const dateOptions: Record = { hour: { count: 1, unit: "hour" }, }; -const DatasetsList = ({ onSelect }: Props) => { +const DatasetsList = ({ + onSelect, + datasetDependencies, + onFilterDags, + filteredDagIds, +}: Props) => { const limit = 25; const [offset, setOffset] = useState(0); @@ -133,7 +143,11 @@ const DatasetsList = ({ onSelect }: Props) => { const docsUrl = getMetaValue("datasets_docs"); const onSearch = (e: React.ChangeEvent) => { - searchParams.set(SEARCH_PARAM, encodeURIComponent(e.target.value)); + if (e.target.value) { + searchParams.set(SEARCH_PARAM, encodeURIComponent(e.target.value)); + } else { + searchParams.delete(SEARCH_PARAM); + } setSearchParams(searchParams); }; @@ -158,7 +172,7 @@ const DatasetsList = ({ onSelect }: Props) => { to learn how to create a dataset. )} - + Filter datasets with updates in the past: