Skip to content

Commit

Permalink
Merge branch 'main' into fix_virtualenv_dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
xionams committed Feb 22, 2024
2 parents 9fc6de6 + 254d7eb commit e26972c
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 126 deletions.
2 changes: 2 additions & 0 deletions airflow/providers/google/cloud/sensors/gcs.py
Expand Up @@ -363,6 +363,8 @@ def execute(self, context: Context):
),
method_name="execute_complete",
)
else:
return self._matches

def execute_complete(self, context: dict[str, Any], event: dict[str, str | list[str]]) -> str | list[str]:
"""Return immediately and rely on trigger to throw a success event. Callback for the trigger."""
Expand Down
121 changes: 79 additions & 42 deletions airflow/www/static/js/api/useDatasetDependencies.ts
Expand Up @@ -26,7 +26,7 @@ import { getTextWidth } from "src/utils/graph";

import type { NodeType, DepEdge, DepNode } from "src/types";

interface DatasetDependencies {
export interface DatasetDependencies {
edges: DepEdge[];
nodes: DepNode[];
}
Expand All @@ -41,16 +41,11 @@ interface GenerateProps {
font: string;
}

interface Graph extends ElkShape {
export interface DatasetGraph extends ElkShape {
children: NodeType[];
edges: ElkExtendedEdge[];
}

interface Data {
fullGraph: Graph;
subGraphs: Graph[];
}

const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({
id: "root",
layoutOptions: {
Expand Down Expand Up @@ -91,32 +86,43 @@ const findDownstreamGraph = ({
}: SeparateGraphsProps): EdgeGroup[] => {
let unassignedEdges = [...edges];

const otherIndexes: number[] = [];

const mergedGraphs = graphs
.reduce((newGraphs, graph) => {
const otherGroupIndex = newGraphs.findIndex((otherGroup) =>
otherGroup.edges.some((otherEdge) =>
graph.edges.some((edge) => edge.target === otherEdge.target)
)
// Find all overlapping graphs where at least one edge in each graph has the same target node
const otherGroups = newGraphs.filter((otherGroup, i) =>
otherGroup.edges.some((otherEdge) => {
if (graph.edges.some((edge) => edge.target === otherEdge.target)) {
otherIndexes.push(i);
return true;
}
return false;
})
);
if (otherGroupIndex === -1) {
if (!otherGroups.length) {
return [...newGraphs, graph];
}

const mergedEdges = [
...newGraphs[otherGroupIndex].edges,
...graph.edges,
].filter(
(edge, edgeIndex, otherEdges) =>
edgeIndex ===
otherEdges.findIndex(
(otherEdge) =>
otherEdge.source === edge.source &&
otherEdge.target === edge.target
)
);
// Merge the edges of every overlapping group
const mergedEdges = otherGroups
.reduce(
(totalEdges, group) => [...totalEdges, ...group.edges],
[...graph.edges]
)
.filter(
(edge, edgeIndex, otherEdges) =>
edgeIndex ===
otherEdges.findIndex(
(otherEdge) =>
otherEdge.source === edge.source &&
otherEdge.target === edge.target
)
);
return [
// filter out the merged graphs
...newGraphs.filter(
(_, newGraphIndex) => newGraphIndex !== otherGroupIndex
(_, newGraphIndex) => !otherIndexes.includes(newGraphIndex)
),
{ edges: mergedEdges },
];
Expand Down Expand Up @@ -180,33 +186,64 @@ const separateGraphs = ({
const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
const elk = new ELK();

const graphs = separateGraphs({ edges, nodes });

// get computed style to calculate how large each node should be
const font = `bold ${16}px ${
window.getComputedStyle(document.body).fontFamily
}`;

// Finally generate the graph data with elk
const subGraphs = await Promise.all(
graphs.map(async (g) =>
elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font }))
)
);
const fullGraph = await elk.layout(generateGraph({ nodes, edges, font }));
const graph = await elk.layout(generateGraph({ nodes, edges, font }));

return {
fullGraph,
subGraphs,
} as Data;
return graph as DatasetGraph;
};

export default function useDatasetDependencies() {
return useQuery("datasetDependencies", async () => {
const datasetDepsUrl = getMetaValue("dataset_dependencies_url");
const rawData = await axios.get<AxiosResponse, DatasetDependencies>(
datasetDepsUrl
);
return formatDependencies(rawData);
return axios.get<AxiosResponse, DatasetDependencies>(datasetDepsUrl);
});
}

interface GraphsProps {
dagIds?: string[];
selectedUri: string | null;
}

export const useDatasetGraphs = ({ dagIds, selectedUri }: GraphsProps) => {
const { data: datasetDependencies } = useDatasetDependencies();
return useQuery(
["datasetGraphs", datasetDependencies, dagIds, selectedUri],
() => {
if (datasetDependencies) {
let graph = datasetDependencies;
const subGraphs = datasetDependencies
? separateGraphs(datasetDependencies)
: [];

// Filter by dataset URI takes precedence
if (selectedUri) {
graph =
subGraphs.find((g) =>
g.nodes.some((n) => n.value.label === selectedUri)
) || graph;
} else if (dagIds?.length) {
const filteredSubGraphs = subGraphs.filter((sg) =>
dagIds.some((dagId) =>
sg.nodes.some((c) => c.value.label === dagId)
)
);

graph = filteredSubGraphs.reduce(
(graphs, subGraph) => ({
edges: [...graphs.edges, ...subGraph.edges],
nodes: [...graphs.nodes, ...subGraph.nodes],
}),
{ edges: [], nodes: [] }
);
}

return formatDependencies(graph);
}
return undefined;
}
);
};
1 change: 1 addition & 0 deletions airflow/www/static/js/dag/details/index.tsx
Expand Up @@ -312,6 +312,7 @@ const Details = ({
pr={4}
mt="4px"
onClick={() => {
onChangeTab(0);
onSelect({ taskId });
}}
>
Expand Down
117 changes: 117 additions & 0 deletions airflow/www/static/js/datasets/DagFilter.tsx
@@ -0,0 +1,117 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import React from "react";
import { HStack } from "@chakra-ui/react";
import { Size, useChakraSelectProps } from "chakra-react-select";

import type { DatasetDependencies } from "src/api/useDatasetDependencies";
import MultiSelect from "src/components/MultiSelect";
import InfoTooltip from "src/components/InfoTooltip";

interface Props {
datasetDependencies?: DatasetDependencies;
filteredDagIds: string[];
onFilterDags: (dagIds: string[]) => void;
}

const transformArrayToMultiSelectOptions = (
options: string[] | null
): { label: string; value: string }[] =>
options === null
? []
: options.map((option) => ({ label: option, value: option }));

const DagFilter = ({
datasetDependencies,
filteredDagIds,
onFilterDags,
}: Props) => {
const dagIds = (datasetDependencies?.nodes || [])
.filter((node) => node.value.class === "dag")
.map((dag) => dag.value.label);
const options = dagIds.map((dagId) => ({ label: dagId, value: dagId }));

const inputStyles: { backgroundColor: string; size: Size } = {
backgroundColor: "white",
size: "lg",
};
const multiSelectStyles = useChakraSelectProps({
...inputStyles,
isMulti: true,
tagVariant: "solid",
hideSelectedOptions: false,
isClearable: false,
selectedOptionStyle: "check",
chakraStyles: {
container: (p) => ({
...p,
width: "100%",
}),
placeholder: (p) => ({
...p,
color: "gray.700",
fontSize: "md",
}),
inputContainer: (p) => ({
...p,
color: "gray.700",
fontSize: "md",
}),
downChevron: (p) => ({
...p,
fontSize: "lg",
}),
control: (p) => ({
...p,
cursor: "pointer",
}),
option: (p) => ({
...p,
transition: "background-color 0.2s",
_hover: {
bg: "gray.100",
},
}),
},
});

return (
<HStack width="100%">
<MultiSelect
{...multiSelectStyles}
isDisabled={!datasetDependencies}
value={transformArrayToMultiSelectOptions(filteredDagIds)}
onChange={(dagOptions) => {
if (
Array.isArray(dagOptions) &&
dagOptions.every((dagOption) => "value" in dagOption)
) {
onFilterDags(dagOptions.map((option) => option.value));
}
}}
options={options}
placeholder="Filter graph by DAG ID"
/>
<InfoTooltip label="Filter Datasets graph by anything that may be connected to one or more DAGs. Does not filter the datasets list." />
</HStack>
);
};

export default DagFilter;
37 changes: 30 additions & 7 deletions airflow/www/static/js/datasets/Graph/DagNode.tsx
Expand Up @@ -19,13 +19,14 @@

import React from "react";
import {
Button,
Flex,
Link,
Popover,
PopoverArrow,
PopoverBody,
PopoverCloseButton,
PopoverContent,
PopoverFooter,
PopoverHeader,
PopoverTrigger,
Portal,
Expand All @@ -40,9 +41,13 @@ import { getMetaValue } from "src/utils";
const DagNode = ({
dagId,
isHighlighted,
isSelected,
onSelect,
}: {
dagId: string;
isHighlighted?: boolean;
isSelected?: boolean;
onSelect?: (dagId: string, type: string) => void;
}) => {
const { colors } = useTheme();
const containerRef = useContainerRef();
Expand All @@ -52,9 +57,12 @@ const DagNode = ({
<Popover>
<PopoverTrigger>
<Flex
borderWidth={2}
borderColor={isHighlighted ? colors.blue[400] : undefined}
borderColor={
isHighlighted || isSelected ? colors.blue[400] : undefined
}
borderRadius={5}
borderWidth={isSelected ? 4 : 2}
fontWeight={isSelected ? "bold" : "normal"}
p={2}
height="100%"
width="100%"
Expand All @@ -72,11 +80,26 @@ const DagNode = ({
<PopoverArrow bg="gray.100" />
<PopoverCloseButton />
<PopoverHeader>{dagId}</PopoverHeader>
<PopoverBody>
<Link color="blue" href={gridUrl}>
<PopoverFooter as={Flex} justifyContent="space-between">
<Button
variant="outline"
onClick={(e) => {
e.preventDefault();
e.stopPropagation();
if (onSelect) onSelect(dagId, "dag");
}}
>
Filter by DAG
</Button>
<Button
as={Link}
href={gridUrl}
variant="outline"
colorScheme="blue"
>
View DAG
</Link>
</PopoverBody>
</Button>
</PopoverFooter>
</PopoverContent>
</Portal>
</Popover>
Expand Down

0 comments on commit e26972c

Please sign in to comment.