Skip to content

Commit

Permalink
Improve datasets graph UX (#38476)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbovenzi committed Mar 26, 2024
1 parent a697bb4 commit 2218311
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 368 deletions.
161 changes: 6 additions & 155 deletions airflow/www/static/js/api/useDatasetDependencies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ export interface DatasetDependencies {
edges: DepEdge[];
nodes: DepNode[];
}

interface EdgeGroup {
edges: DepEdge[];
}

interface GenerateProps {
nodes: DepNode[];
edges: DepEdge[];
Expand Down Expand Up @@ -74,115 +69,6 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({
})),
});

interface SeparateGraphsProps {
edges: DepEdge[];
graphs: EdgeGroup[];
}

// find the downstream graph of each upstream edge
const findDownstreamGraph = ({
edges,
graphs = [],
}: SeparateGraphsProps): EdgeGroup[] => {
let unassignedEdges = [...edges];

const otherIndexes: number[] = [];

const mergedGraphs = graphs
.reduce((newGraphs, graph) => {
// Find all overlapping graphs where at least one edge in each graph has the same target node
const otherGroups = newGraphs.filter((otherGroup, i) =>
otherGroup.edges.some((otherEdge) => {
if (graph.edges.some((edge) => edge.target === otherEdge.target)) {
otherIndexes.push(i);
return true;
}
return false;
})
);
if (!otherGroups.length) {
return [...newGraphs, graph];
}

// Merge the edges of every overlapping group
const mergedEdges = otherGroups
.reduce(
(totalEdges, group) => [...totalEdges, ...group.edges],
[...graph.edges]
)
.filter(
(edge, edgeIndex, otherEdges) =>
edgeIndex ===
otherEdges.findIndex(
(otherEdge) =>
otherEdge.source === edge.source &&
otherEdge.target === edge.target
)
);
return [
// filter out the merged graphs
...newGraphs.filter(
(_, newGraphIndex) => !otherIndexes.includes(newGraphIndex)
),
{ edges: mergedEdges },
];
}, [] as EdgeGroup[])
.map((graph) => {
// find the next set of downstream edges and filter them out of the unassigned edges list
const downstreamEdges: DepEdge[] = [];
unassignedEdges = unassignedEdges.filter((edge) => {
const isDownstream = graph.edges.some(
(graphEdge) => graphEdge.target === edge.source
);
if (isDownstream) downstreamEdges.push(edge);
return !isDownstream;
});

return {
edges: [...graph.edges, ...downstreamEdges],
};
});

// recursively find downstream edges until there are no unassigned edges
return unassignedEdges.length
? findDownstreamGraph({ edges: unassignedEdges, graphs: mergedGraphs })
: mergedGraphs;
};

// separate the list of nodes/edges into distinct dataset pipeline graphs
const separateGraphs = ({
edges,
nodes,
}: DatasetDependencies): DatasetDependencies[] => {
const separatedGraphs: EdgeGroup[] = [];
const remainingEdges: DepEdge[] = [];

edges.forEach((e) => {
// add a separate graph for each edge without an upstream
if (!edges.some((ee) => e.source === ee.target)) {
separatedGraphs.push({ edges: [e] });
} else {
remainingEdges.push(e);
}
});

const edgeGraphs = findDownstreamGraph({
edges: remainingEdges,
graphs: separatedGraphs,
});

// once all the edges are found, add the nodes
return edgeGraphs.map((eg) => {
const graphNodes = nodes.filter((n) =>
eg.edges.some((e) => e.target === n.id || e.source === n.id)
);
return {
edges: eg.edges,
nodes: graphNodes,
};
});
};

const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
const elk = new ELK();

Expand All @@ -203,47 +89,12 @@ export default function useDatasetDependencies() {
});
}

interface GraphsProps {
dagIds?: string[];
selectedUri: string | null;
}

export const useDatasetGraphs = ({ dagIds, selectedUri }: GraphsProps) => {
export const useDatasetGraphs = () => {
const { data: datasetDependencies } = useDatasetDependencies();
return useQuery(
["datasetGraphs", datasetDependencies, dagIds, selectedUri],
() => {
if (datasetDependencies) {
let graph = datasetDependencies;
const subGraphs = datasetDependencies
? separateGraphs(datasetDependencies)
: [];

// Filter by dataset URI takes precedence
if (selectedUri) {
graph =
subGraphs.find((g) =>
g.nodes.some((n) => n.value.label === selectedUri)
) || graph;
} else if (dagIds?.length) {
const filteredSubGraphs = subGraphs.filter((sg) =>
dagIds.some((dagId) =>
sg.nodes.some((c) => c.value.label === dagId)
)
);

graph = filteredSubGraphs.reduce(
(graphs, subGraph) => ({
edges: [...graphs.edges, ...subGraph.edges],
nodes: [...graphs.nodes, ...subGraph.nodes],
}),
{ edges: [], nodes: [] }
);
}

return formatDependencies(graph);
}
return undefined;
return useQuery(["datasetGraphs", datasetDependencies], () => {
if (datasetDependencies) {
return formatDependencies(datasetDependencies);
}
);
return undefined;
});
};
17 changes: 6 additions & 11 deletions airflow/www/static/js/datasets/Graph/DagNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const DagNode = ({

const gridUrl = getMetaValue("grid_url").replace("__DAG_ID__", dagId);
return (
<Popover>
<Popover trigger="hover">
<PopoverTrigger>
<Flex
borderColor={
Expand All @@ -70,6 +70,11 @@ const DagNode = ({
fontSize={16}
justifyContent="space-between"
alignItems="center"
onClick={(e) => {
e.preventDefault();
e.stopPropagation();
if (onSelect) onSelect(dagId, "dag");
}}
>
<MdOutlineAccountTree size="16px" />
<Text ml={2}>{dagId}</Text>
Expand All @@ -81,16 +86,6 @@ const DagNode = ({
<PopoverCloseButton />
<PopoverHeader>{dagId}</PopoverHeader>
<PopoverFooter as={Flex} justifyContent="space-between">
<Button
variant="outline"
onClick={(e) => {
e.preventDefault();
e.stopPropagation();
if (onSelect) onSelect(dagId, "dag");
}}
>
Filter by DAG
</Button>
<Button
as={Link}
href={gridUrl}
Expand Down
65 changes: 25 additions & 40 deletions airflow/www/static/js/datasets/Graph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import React, { useEffect } from "react";
import React, { useCallback, useEffect } from "react";
import ReactFlow, {
ReactFlowProvider,
Controls,
Expand All @@ -27,6 +27,7 @@ import ReactFlow, {
useReactFlow,
ControlButton,
Panel,
useNodesInitialized,
} from "reactflow";
import { Box, Tooltip, useTheme } from "@chakra-ui/react";
import { RiFocus3Line } from "react-icons/ri";
Expand All @@ -39,33 +40,19 @@ import Node, { CustomNodeProps } from "./Node";
import Legend from "./Legend";

interface Props {
onSelect: (datasetId: string) => void;
selectedUri: string | null;
filteredDagIds: string[];
onFilterDags: (dagIds: string[]) => void;
selectedNodeId: string | null;
onSelectNode: (id: string, type: string) => void;
}

const nodeTypes = { custom: Node };
const edgeTypes = { custom: Edge };

const Graph = ({
onSelect,
selectedUri,
filteredDagIds,
onFilterDags,
}: Props) => {
const Graph = ({ selectedNodeId, onSelectNode }: Props) => {
const { colors } = useTheme();
const { setCenter, setViewport } = useReactFlow();
const { setCenter } = useReactFlow();
const containerRef = useContainerRef();

const { data: graph } = useDatasetGraphs({
dagIds: filteredDagIds,
selectedUri,
});

useEffect(() => {
setViewport({ x: 0, y: 0, zoom: 1 });
}, [selectedUri, setViewport]);
const { data: graph } = useDatasetGraphs();

const nodeColor = ({
data: { isSelected },
Expand All @@ -81,20 +68,14 @@ const Graph = ({
data: {
rest: {
...e,
isSelected: selectedUri && e.id.includes(selectedUri),
isSelected:
selectedNodeId &&
(e.id.includes(`dataset:${selectedNodeId}`) ||
e.id.includes(`dag:${selectedNodeId}`)),
},
},
})) || [];

const handleSelect = (id: string, type: string) => {
if (type === "dataset") onSelect(id);
if (type === "dag") {
if (filteredDagIds.includes(id))
onFilterDags(filteredDagIds.filter((dagId) => dagId !== id));
else onFilterDags([...filteredDagIds, id]);
}
};

const nodes: ReactFlowNode<CustomNodeProps>[] =
graph?.children?.map((c) => ({
id: c.id,
Expand All @@ -103,10 +84,8 @@ const Graph = ({
type: c.value.class,
width: c.width,
height: c.height,
onSelect: handleSelect,
isSelected:
selectedUri === c.value.label ||
(c.value.class === "dag" && filteredDagIds.includes(c.value.label)),
onSelect: onSelectNode,
isSelected: selectedNodeId === c.value.label,
isHighlighted: edges.some(
(e) => e.data.rest.isSelected && e.id.includes(c.id)
),
Expand All @@ -118,10 +97,10 @@ const Graph = ({
},
})) || [];

const focusNode = () => {
if (selectedUri) {
const node = nodes.find((n) => n.data.label === selectedUri);
if (!node || !node.position) return;
const node = nodes.find((n) => n.data.label === selectedNodeId);

const focusNode = useCallback(() => {
if (node && node.position) {
const { x, y } = node.position;
setCenter(
x + (node.data.width || 0) / 2,
Expand All @@ -131,7 +110,13 @@ const Graph = ({
}
);
}
};
}, [setCenter, node]);

const nodesInitialized = useNodesInitialized();

useEffect(() => {
if (nodesInitialized) focusNode();
}, [selectedNodeId, nodesInitialized, focusNode]);

return (
<ReactFlow
Expand All @@ -147,7 +132,7 @@ const Graph = ({
>
<Background />
<Controls showInteractive={false}>
<ControlButton onClick={focusNode} disabled={!selectedUri}>
<ControlButton onClick={focusNode} disabled={!selectedNodeId}>
<Tooltip
portalProps={{ containerRef }}
label="Center selected dataset"
Expand Down

0 comments on commit 2218311

Please sign in to comment.