Skip to content

Commit

Permalink
Add datasets to dag graph (#37604)
Browse files Browse the repository at this point in the history
* Add dataset nodes to dag graph

* Clean up dataset nodes
  • Loading branch information
bbovenzi committed Feb 22, 2024
1 parent b2f0279 commit 1c37891
Show file tree
Hide file tree
Showing 14 changed files with 490 additions and 251 deletions.
4 changes: 3 additions & 1 deletion airflow/www/static/js/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import useGraphData from "./useGraphData";
import useGridData from "./useGridData";
import useMappedInstances from "./useMappedInstances";
import useDatasets from "./useDatasets";
import useDatasetsSummary from "./useDatasetsSummary";
import useDataset from "./useDataset";
import useDatasetDependencies from "./useDatasetDependencies";
import useDatasetEvents from "./useDatasetEvents";
Expand Down Expand Up @@ -72,9 +73,10 @@ export {
useDagRuns,
useDags,
useDataset,
useDatasets,
useDatasetDependencies,
useDatasetEvents,
useDatasets,
useDatasetsSummary,
useExtraLinks,
useGraphData,
useGridData,
Expand Down
60 changes: 12 additions & 48 deletions airflow/www/static/js/api/useDatasets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,65 +21,29 @@ import axios, { AxiosResponse } from "axios";
import { useQuery } from "react-query";

import { getMetaValue } from "src/utils";
import type { DatasetListItem } from "src/types";
import type { unitOfTime } from "moment";

export interface DatasetsData {
datasets: DatasetListItem[];
totalEntries: number;
}

export interface DateOption {
count: number;
unit: unitOfTime.DurationConstructor;
}
import type { API } from "src/types";

interface Props {
limit?: number;
offset?: number;
order?: string;
uri?: string;
updatedAfter?: DateOption;
dagIds?: string[];
enabled?: boolean;
}

export default function useDatasets({
limit,
offset,
order,
uri,
updatedAfter,
}: Props) {
const query = useQuery(
["datasets", limit, offset, order, uri, updatedAfter],
export default function useDatasets({ dagIds, enabled = true }: Props) {
return useQuery(
["datasets", dagIds],
() => {
const datasetsUrl = getMetaValue("datasets_api");
const orderParam = order ? { order_by: order } : {};
const uriParam = uri ? { uri_pattern: uri } : {};
const updatedAfterParam =
updatedAfter && updatedAfter.count && updatedAfter.unit
? {
// @ts-ignore
updated_after: moment()
.subtract(updatedAfter.count, updatedAfter.unit)
.toISOString(),
}
: {};
return axios.get<AxiosResponse, DatasetsData>(datasetsUrl, {
const dagIdsParam =
dagIds && dagIds.length ? { dag_ids: dagIds.join(",") } : {};

return axios.get<AxiosResponse, API.DatasetCollection>(datasetsUrl, {
params: {
offset,
limit,
...orderParam,
...uriParam,
...updatedAfterParam,
...dagIdsParam,
},
});
},
{
keepPreviousData: true,
enabled,
}
);
return {
...query,
data: query.data ?? { datasets: [], totalEntries: 0 },
};
}
85 changes: 85 additions & 0 deletions airflow/www/static/js/api/useDatasetsSummary.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import axios, { AxiosResponse } from "axios";
import { useQuery } from "react-query";

import { getMetaValue } from "src/utils";
import type { DatasetListItem } from "src/types";
import type { unitOfTime } from "moment";

export interface DatasetsData {
datasets: DatasetListItem[];
totalEntries: number;
}

export interface DateOption {
count: number;
unit: unitOfTime.DurationConstructor;
}

interface Props {
limit?: number;
offset?: number;
order?: string;
uri?: string;
updatedAfter?: DateOption;
}

export default function useDatasetsSummary({
limit,
offset,
order,
uri,
updatedAfter,
}: Props) {
const query = useQuery(
["datasets_summary", limit, offset, order, uri, updatedAfter],
() => {
const datasetsUrl = getMetaValue("datasets_summary");
const orderParam = order ? { order_by: order } : {};
const uriParam = uri ? { uri_pattern: uri } : {};
const updatedAfterParam =
updatedAfter && updatedAfter.count && updatedAfter.unit
? {
// @ts-ignore
updated_after: moment()
.subtract(updatedAfter.count, updatedAfter.unit)
.toISOString(),
}
: {};
return axios.get<AxiosResponse, DatasetsData>(datasetsUrl, {
params: {
offset,
limit,
...orderParam,
...uriParam,
...updatedAfterParam,
},
});
},
{
keepPreviousData: true,
}
);
return {
...query,
data: query.data ?? { datasets: [], totalEntries: 0 },
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ import { Wrapper } from "src/utils/testUtils";

import type { NodeProps } from "reactflow";
import type { Task, TaskInstance } from "src/types";
import { CustomNodeProps, BaseNode as Node } from "./Node";
import type { CustomNodeProps } from "./Node";
import DagNode from "./DagNode";

const mockNode: NodeProps<CustomNodeProps> = {
id: "task_id",
data: {
label: "task_id",
height: 50,
width: 200,
class: "dag",
instance: {
state: "success",
runId: "run_id",
Expand Down Expand Up @@ -65,7 +67,7 @@ const mockNode: NodeProps<CustomNodeProps> = {

describe("Test Graph Node", () => {
test("Renders normal task correctly", async () => {
const { getByText, getByTestId } = render(<Node {...mockNode} />, {
const { getByText, getByTestId } = render(<DagNode {...mockNode} />, {
wrapper: Wrapper,
});

Expand All @@ -77,7 +79,7 @@ describe("Test Graph Node", () => {

test("Renders mapped task correctly", async () => {
const { getByText } = render(
<Node
<DagNode
{...mockNode}
data={{
...mockNode.data,
Expand All @@ -99,7 +101,7 @@ describe("Test Graph Node", () => {

test("Renders task group correctly", async () => {
const { getByText } = render(
<Node
<DagNode
{...mockNode}
data={{ ...mockNode.data, childCount: 5, isOpen: false }}
/>,
Expand All @@ -114,7 +116,7 @@ describe("Test Graph Node", () => {

test("Renders normal task correctly", async () => {
const { getByTestId } = render(
<Node {...mockNode} data={{ ...mockNode.data, isActive: false }} />,
<DagNode {...mockNode} data={{ ...mockNode.data, isActive: false }} />,
{
wrapper: Wrapper,
}
Expand Down

0 comments on commit 1c37891

Please sign in to comment.