Skip to content

Commit

Permalink
Add TaskFail entries to Gantt chart (apache#37918)
Browse files Browse the repository at this point in the history
* Add TaskFail entries to Gantt chart

* Fix some autorefresh
  • Loading branch information
bbovenzi authored and howardyoo committed Mar 18, 2024
1 parent ba8c804 commit 2393ca1
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 7 deletions.
2 changes: 2 additions & 0 deletions airflow/www/static/js/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import useHistoricalMetricsData from "./useHistoricalMetricsData";
import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom";
import useEventLogs from "./useEventLogs";
import useCalendarData from "./useCalendarData";
import useTaskFails from "./useTaskFails";

axios.interceptors.request.use((config) => {
config.paramsSerializer = {
Expand Down Expand Up @@ -100,4 +101,5 @@ export {
useTaskXcomCollection,
useEventLogs,
useCalendarData,
useTaskFails,
};
67 changes: 67 additions & 0 deletions airflow/www/static/js/api/useTaskFails.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { useQuery } from "react-query";
import axios, { AxiosResponse } from "axios";

import { getMetaValue } from "src/utils";
import { useAutoRefresh } from "src/context/autorefresh";

const DAG_ID_PARAM = "dag_id";
const RUN_ID_PARAM = "run_id";
const TASK_ID_PARAM = "task_id";

const dagId = getMetaValue(DAG_ID_PARAM);
const taskFailsUrl = getMetaValue("task_fails_url");

export interface TaskFail {
runId: string;
taskId: string;
mapIndex?: number;
startDate?: string;
endDate?: string;
}

interface Props {
runId?: string;
taskId?: string;
enabled?: boolean;
}

const useTaskFails = ({ runId, taskId, enabled = true }: Props) => {
const { isRefreshOn } = useAutoRefresh();

return useQuery(
["taskFails", runId, taskId],
async () => {
const params = {
[DAG_ID_PARAM]: dagId,
[RUN_ID_PARAM]: runId,
[TASK_ID_PARAM]: taskId,
};
return axios.get<AxiosResponse, TaskFail[]>(taskFailsUrl, { params });
},
{
enabled,
refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000,
}
);
};

export default useTaskFails;
41 changes: 35 additions & 6 deletions airflow/www/static/js/dag/details/gantt/Row.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

import React from "react";
import { Box, Tooltip, Flex } from "@chakra-ui/react";

import useSelection from "src/dag/useSelection";
import { getDuration } from "src/datetime_utils";
import { SimpleStatus } from "src/dag/StatusBox";
import { SimpleStatus, boxSize } from "src/dag/StatusBox";
import { useContainerRef } from "src/context/containerRef";
import { hoverDelay } from "src/utils";
import type { Task } from "src/types";
import { useTaskFails } from "src/api";

import GanttTooltip from "./GanttTooltip";
import TaskFail from "./TaskFail";

interface Props {
ganttWidth?: number;
Expand Down Expand Up @@ -59,6 +63,12 @@ const Row = ({
: true);
const isOpen = openGroupIds.includes(task.id || "");

const { data: taskFails } = useTaskFails({
taskId: task.id || undefined,
runId: runId || undefined,
enabled: !!(instance?.tryNumber && instance?.tryNumber > 1) && !!task.id, // Only try to look up task fails if it even has a try number > 1
});

// Calculate durations in ms
const taskDuration = getDuration(instance?.startDate, instance?.endDate);
const queuedDuration = hasValidQueuedDttm
Expand All @@ -84,12 +94,14 @@ const Row = ({
return (
<div>
<Box
py="4px"
borderBottomWidth={1}
borderBottomColor={!!task.children && isOpen ? "gray.400" : "gray.200"}
bg={isSelected ? "blue.100" : "inherit"}
position="relative"
width={ganttWidth}
height={`${boxSize + 9}px`}
>
{instance ? (
{instance && (
<Tooltip
label={<GanttTooltip task={task} instance={instance} />}
hasArrow
Expand All @@ -99,9 +111,11 @@ const Row = ({
>
<Flex
width={`${width + queuedWidth}px`}
position="absolute"
cursor="pointer"
pointerEvents="auto"
marginLeft={`${offsetMargin}px`}
top="4px"
left={`${offsetMargin}px`}
onClick={() => {
onSelect({
runId: instance.runId,
Expand Down Expand Up @@ -129,9 +143,24 @@ const Row = ({
/>
</Flex>
</Tooltip>
) : (
<Box height="10px" />
)}
{/* Only show fails before the most recent task instance */}
{(taskFails || [])
.filter(
(tf) =>
tf.startDate !== instance?.startDate &&
// @ts-ignore
moment(tf.startDate).isAfter(ganttStartDate)
)
.map((taskFail) => (
<TaskFail
key={`${taskFail.taskId}-${taskFail.startDate}`}
taskFail={taskFail}
ganttStartDate={ganttStartDate}
ganttWidth={ganttWidth}
runDuration={runDuration}
/>
))}
</Box>
{isOpen &&
!!task.children &&
Expand Down
91 changes: 91 additions & 0 deletions airflow/www/static/js/dag/details/gantt/TaskFail.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import React from "react";
import { Box, Tooltip, Text } from "@chakra-ui/react";

import { getDuration } from "src/datetime_utils";
import { SimpleStatus } from "src/dag/StatusBox";
import { useContainerRef } from "src/context/containerRef";
import { hoverDelay } from "src/utils";
import Time from "src/components/Time";

import type { TaskFail as TaskFailType } from "src/api/useTaskFails";

interface Props {
taskFail: TaskFailType;
runDuration: number;
ganttWidth: number;
ganttStartDate?: string | null;
}

const TaskFail = ({
taskFail,
runDuration,
ganttWidth,
ganttStartDate,
}: Props) => {
const containerRef = useContainerRef();

const duration = getDuration(taskFail?.startDate, taskFail?.endDate);
const percent = duration / runDuration;
const failWidth = ganttWidth * percent;

const startOffset = getDuration(ganttStartDate, taskFail?.startDate);
const offsetLeft = (startOffset / runDuration) * ganttWidth;

return (
<Tooltip
label={
<Box>
<Text mb={2}>Task Fail</Text>
{taskFail?.startDate && (
<Text>
Start: <Time dateTime={taskFail?.startDate} />
</Text>
)}
{taskFail?.endDate && (
<Text>
End: <Time dateTime={taskFail?.endDate} />
</Text>
)}
<Text mt={2} fontSize="sm">
Can only show previous Task Fails, other tries are not yet saved.
</Text>
</Box>
}
hasArrow
portalProps={{ containerRef }}
placement="top"
openDelay={hoverDelay}
top="4px"
>
<Box
position="absolute"
left={`${offsetLeft}px`}
cursor="pointer"
top="4px"
>
<SimpleStatus state="failed" width={`${failWidth}px`} />
</Box>
</Tooltip>
);
};

export default TaskFail;
6 changes: 5 additions & 1 deletion airflow/www/static/js/dag/details/task/TaskDuration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import React from "react";

import useSelection from "src/dag/useSelection";
import { useGridData } from "src/api";
import { useGridData, useTaskFails } from "src/api";
import { startCase } from "lodash";
import { getDuration, formatDateTime, defaultFormat } from "src/datetime_utils";
import ReactECharts, { ReactEChartsProps } from "src/components/ReactECharts";
Expand All @@ -45,6 +45,10 @@ const TaskDuration = () => {
onSelect,
} = useSelection();

const { data: taskFails } = useTaskFails({ taskId: taskId || undefined });

console.log(taskFails);

const {
data: { dagRuns, groups, ordering },
} = useGridData();
Expand Down
1 change: 1 addition & 0 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}">
<meta name="graph_data_url" content="{{ url_for('Airflow.graph_data') }}">
<meta name="calendar_data_url" content="{{ url_for('Airflow.calendar_data') }}">
<meta name="task_fails_url" content="{{ url_for('Airflow.task_fails') }}">
<meta name="next_run_datasets_url" content="{{ url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}">
<meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id) }}">
<meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}">
Expand Down
25 changes: 25 additions & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3505,6 +3505,31 @@ def graph_data(self):
{"Content-Type": "application/json; charset=utf-8"},
)

@expose("/object/task_fails")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def task_fails(self, session):
"""Return task fails."""
dag_id = request.args.get("dag_id")
task_id = request.args.get("task_id")
run_id = request.args.get("run_id")

query = select(
TaskFail.task_id, TaskFail.run_id, TaskFail.map_index, TaskFail.start_date, TaskFail.end_date
).where(TaskFail.dag_id == dag_id)

if run_id:
query = query.where(TaskFail.run_id == run_id)
if task_id:
query = query.where(TaskFail.task_id == task_id)

task_fails = [dict(tf) for tf in session.execute(query).all()]

return (
htmlsafe_json_dumps(task_fails, separators=(",", ":"), dumps=flask.json.dumps),
{"Content-Type": "application/json; charset=utf-8"},
)

@expose("/object/task_instances")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
def task_instances(self):
Expand Down

0 comments on commit 2393ca1

Please sign in to comment.