Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve datasets graph UX #38476

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
161 changes: 6 additions & 155 deletions airflow/www/static/js/api/useDatasetDependencies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ export interface DatasetDependencies {
edges: DepEdge[];
nodes: DepNode[];
}

interface EdgeGroup {
edges: DepEdge[];
}

interface GenerateProps {
nodes: DepNode[];
edges: DepEdge[];
Expand Down Expand Up @@ -74,115 +69,6 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({
})),
});

interface SeparateGraphsProps {
edges: DepEdge[];
graphs: EdgeGroup[];
}

// find the downstream graph of each upstream edge
const findDownstreamGraph = ({
edges,
graphs = [],
}: SeparateGraphsProps): EdgeGroup[] => {
let unassignedEdges = [...edges];

const otherIndexes: number[] = [];

const mergedGraphs = graphs
.reduce((newGraphs, graph) => {
// Find all overlapping graphs where at least one edge in each graph has the same target node
const otherGroups = newGraphs.filter((otherGroup, i) =>
otherGroup.edges.some((otherEdge) => {
if (graph.edges.some((edge) => edge.target === otherEdge.target)) {
otherIndexes.push(i);
return true;
}
return false;
})
);
if (!otherGroups.length) {
return [...newGraphs, graph];
}

// Merge the edges of every overlapping group
const mergedEdges = otherGroups
.reduce(
(totalEdges, group) => [...totalEdges, ...group.edges],
[...graph.edges]
)
.filter(
(edge, edgeIndex, otherEdges) =>
edgeIndex ===
otherEdges.findIndex(
(otherEdge) =>
otherEdge.source === edge.source &&
otherEdge.target === edge.target
)
);
return [
// filter out the merged graphs
...newGraphs.filter(
(_, newGraphIndex) => !otherIndexes.includes(newGraphIndex)
),
{ edges: mergedEdges },
];
}, [] as EdgeGroup[])
.map((graph) => {
// find the next set of downstream edges and filter them out of the unassigned edges list
const downstreamEdges: DepEdge[] = [];
unassignedEdges = unassignedEdges.filter((edge) => {
const isDownstream = graph.edges.some(
(graphEdge) => graphEdge.target === edge.source
);
if (isDownstream) downstreamEdges.push(edge);
return !isDownstream;
});

return {
edges: [...graph.edges, ...downstreamEdges],
};
});

// recursively find downstream edges until there are no unassigned edges
return unassignedEdges.length
? findDownstreamGraph({ edges: unassignedEdges, graphs: mergedGraphs })
: mergedGraphs;
};

// separate the list of nodes/edges into distinct dataset pipeline graphs
const separateGraphs = ({
edges,
nodes,
}: DatasetDependencies): DatasetDependencies[] => {
const separatedGraphs: EdgeGroup[] = [];
const remainingEdges: DepEdge[] = [];

edges.forEach((e) => {
// add a separate graph for each edge without an upstream
if (!edges.some((ee) => e.source === ee.target)) {
separatedGraphs.push({ edges: [e] });
} else {
remainingEdges.push(e);
}
});

const edgeGraphs = findDownstreamGraph({
edges: remainingEdges,
graphs: separatedGraphs,
});

// once all the edges are found, add the nodes
return edgeGraphs.map((eg) => {
const graphNodes = nodes.filter((n) =>
eg.edges.some((e) => e.target === n.id || e.source === n.id)
);
return {
edges: eg.edges,
nodes: graphNodes,
};
});
};

const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
const elk = new ELK();

Expand All @@ -203,47 +89,12 @@ export default function useDatasetDependencies() {
});
}

interface GraphsProps {
dagIds?: string[];
selectedUri: string | null;
}

export const useDatasetGraphs = ({ dagIds, selectedUri }: GraphsProps) => {
export const useDatasetGraphs = () => {
const { data: datasetDependencies } = useDatasetDependencies();
return useQuery(
["datasetGraphs", datasetDependencies, dagIds, selectedUri],
() => {
if (datasetDependencies) {
let graph = datasetDependencies;
const subGraphs = datasetDependencies
? separateGraphs(datasetDependencies)
: [];

// Filter by dataset URI takes precedence
if (selectedUri) {
graph =
subGraphs.find((g) =>
g.nodes.some((n) => n.value.label === selectedUri)
) || graph;
} else if (dagIds?.length) {
const filteredSubGraphs = subGraphs.filter((sg) =>
dagIds.some((dagId) =>
sg.nodes.some((c) => c.value.label === dagId)
)
);

graph = filteredSubGraphs.reduce(
(graphs, subGraph) => ({
edges: [...graphs.edges, ...subGraph.edges],
nodes: [...graphs.nodes, ...subGraph.nodes],
}),
{ edges: [], nodes: [] }
);
}

return formatDependencies(graph);
}
return undefined;
return useQuery(["datasetGraphs", datasetDependencies], () => {
if (datasetDependencies) {
return formatDependencies(datasetDependencies);
}
);
return undefined;
});
};
17 changes: 6 additions & 11 deletions airflow/www/static/js/datasets/Graph/DagNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const DagNode = ({

const gridUrl = getMetaValue("grid_url").replace("__DAG_ID__", dagId);
return (
<Popover>
<Popover trigger="hover">
<PopoverTrigger>
<Flex
borderColor={
Expand All @@ -70,6 +70,11 @@ const DagNode = ({
fontSize={16}
justifyContent="space-between"
alignItems="center"
onClick={(e) => {
e.preventDefault();
e.stopPropagation();
if (onSelect) onSelect(dagId, "dag");
}}
>
<MdOutlineAccountTree size="16px" />
<Text ml={2}>{dagId}</Text>
Expand All @@ -81,16 +86,6 @@ const DagNode = ({
<PopoverCloseButton />
<PopoverHeader>{dagId}</PopoverHeader>
<PopoverFooter as={Flex} justifyContent="space-between">
<Button
variant="outline"
onClick={(e) => {
e.preventDefault();
e.stopPropagation();
if (onSelect) onSelect(dagId, "dag");
}}
>
Filter by DAG
</Button>
<Button
as={Link}
href={gridUrl}
Expand Down
65 changes: 25 additions & 40 deletions airflow/www/static/js/datasets/Graph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import React, { useEffect } from "react";
import React, { useCallback, useEffect } from "react";
import ReactFlow, {
ReactFlowProvider,
Controls,
Expand All @@ -27,6 +27,7 @@ import ReactFlow, {
useReactFlow,
ControlButton,
Panel,
useNodesInitialized,
} from "reactflow";
import { Box, Tooltip, useTheme } from "@chakra-ui/react";
import { RiFocus3Line } from "react-icons/ri";
Expand All @@ -39,33 +40,19 @@ import Node, { CustomNodeProps } from "./Node";
import Legend from "./Legend";

interface Props {
onSelect: (datasetId: string) => void;
selectedUri: string | null;
filteredDagIds: string[];
onFilterDags: (dagIds: string[]) => void;
selectedNodeId: string | null;
onSelectNode: (id: string, type: string) => void;
}

const nodeTypes = { custom: Node };
const edgeTypes = { custom: Edge };

const Graph = ({
onSelect,
selectedUri,
filteredDagIds,
onFilterDags,
}: Props) => {
const Graph = ({ selectedNodeId, onSelectNode }: Props) => {
const { colors } = useTheme();
const { setCenter, setViewport } = useReactFlow();
const { setCenter } = useReactFlow();
const containerRef = useContainerRef();

const { data: graph } = useDatasetGraphs({
dagIds: filteredDagIds,
selectedUri,
});

useEffect(() => {
setViewport({ x: 0, y: 0, zoom: 1 });
}, [selectedUri, setViewport]);
const { data: graph } = useDatasetGraphs();

const nodeColor = ({
data: { isSelected },
Expand All @@ -81,20 +68,14 @@ const Graph = ({
data: {
rest: {
...e,
isSelected: selectedUri && e.id.includes(selectedUri),
isSelected:
selectedNodeId &&
(e.id.includes(`dataset:${selectedNodeId}`) ||
e.id.includes(`dag:${selectedNodeId}`)),
},
},
})) || [];

const handleSelect = (id: string, type: string) => {
if (type === "dataset") onSelect(id);
if (type === "dag") {
if (filteredDagIds.includes(id))
onFilterDags(filteredDagIds.filter((dagId) => dagId !== id));
else onFilterDags([...filteredDagIds, id]);
}
};

const nodes: ReactFlowNode<CustomNodeProps>[] =
graph?.children?.map((c) => ({
id: c.id,
Expand All @@ -103,10 +84,8 @@ const Graph = ({
type: c.value.class,
width: c.width,
height: c.height,
onSelect: handleSelect,
isSelected:
selectedUri === c.value.label ||
(c.value.class === "dag" && filteredDagIds.includes(c.value.label)),
onSelect: onSelectNode,
isSelected: selectedNodeId === c.value.label,
isHighlighted: edges.some(
(e) => e.data.rest.isSelected && e.id.includes(c.id)
),
Expand All @@ -118,10 +97,10 @@ const Graph = ({
},
})) || [];

const focusNode = () => {
if (selectedUri) {
const node = nodes.find((n) => n.data.label === selectedUri);
if (!node || !node.position) return;
const node = nodes.find((n) => n.data.label === selectedNodeId);

const focusNode = useCallback(() => {
if (node && node.position) {
const { x, y } = node.position;
setCenter(
x + (node.data.width || 0) / 2,
Expand All @@ -131,7 +110,13 @@ const Graph = ({
}
);
}
};
}, [setCenter, node]);

const nodesInitialized = useNodesInitialized();

useEffect(() => {
if (nodesInitialized) focusNode();
}, [selectedNodeId, nodesInitialized, focusNode]);

return (
<ReactFlow
Expand All @@ -147,7 +132,7 @@ const Graph = ({
>
<Background />
<Controls showInteractive={false}>
<ControlButton onClick={focusNode} disabled={!selectedUri}>
<ControlButton onClick={focusNode} disabled={!selectedNodeId}>
<Tooltip
portalProps={{ containerRef }}
label="Center selected dataset"
Expand Down