Skip to content

Commit

Permalink
[dagit] Show job failures on asset graph, fix in-progress runs on glo…
Browse files Browse the repository at this point in the history
…bal graph (#7067)
  • Loading branch information
bengotow committed Mar 16, 2022
1 parent f32802e commit d07a108
Show file tree
Hide file tree
Showing 19 changed files with 739 additions and 566 deletions.
339 changes: 6 additions & 333 deletions js_modules/dagit/packages/core/src/assets/AssetEvents.tsx

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import {Box, ColorsWIP, NonIdealState, Caption, Subheading, Checkbox} from '@dagster-io/ui';
import flatMap from 'lodash/flatMap';
import uniq from 'lodash/uniq';
import * as React from 'react';

import {useStateWithStorage} from '../hooks/useStateWithStorage';

import {AssetValueGraph, AssetValueGraphData} from './AssetValueGraph';
import {AssetEventGroup} from './groupByPartition';

export const AssetMaterializationGraphs: React.FC<{
groups: AssetEventGroup[];
xAxis: 'partition' | 'time';
asSidebarSection?: boolean;
}> = (props) => {
const [xHover, setXHover] = React.useState<string | number | null>(null);

const reversed = React.useMemo(() => {
return [...props.groups].reverse();
}, [props.groups]);

const graphDataByMetadataLabel = extractNumericData(reversed, props.xAxis);
const graphLabels = Object.keys(graphDataByMetadataLabel).slice(0, 20).sort();

const [collapsedLabels, setCollapsedLabels] = useStateWithStorage(
'hidden-graphs',
validateHiddenGraphsState,
);

const toggleCollapsed = React.useCallback(
(label: string) => {
setCollapsedLabels((current = []) =>
current.includes(label) ? current.filter((c) => c !== label) : [...current, label],
);
},
[setCollapsedLabels],
);

return (
<>
<div
style={{
display: 'flex',
flexWrap: 'wrap',
justifyContent: 'stretch',
flexDirection: 'column',
}}
>
{graphLabels.map((label) => (
<Box
key={label}
style={{width: '100%'}}
border={{side: 'bottom', width: 1, color: ColorsWIP.KeylineGray}}
>
{props.asSidebarSection ? (
<Box padding={{horizontal: 24, top: 8}} flex={{justifyContent: 'space-between'}}>
<Caption style={{fontWeight: 700}}>{label}</Caption>
<Checkbox
format="switch"
checked={!collapsedLabels.includes(label)}
onChange={() => toggleCollapsed(label)}
size="small"
/>
</Box>
) : (
<Box
padding={{horizontal: 24, vertical: 16}}
border={{side: 'bottom', width: 1, color: ColorsWIP.KeylineGray}}
flex={{justifyContent: 'space-between'}}
>
<Subheading>{label}</Subheading>
<Checkbox
format="switch"
checked={!collapsedLabels.includes(label)}
onChange={() => toggleCollapsed(label)}
size="small"
/>
</Box>
)}
{!collapsedLabels.includes(label) ? (
<Box padding={{horizontal: 24, vertical: 16}}>
<AssetValueGraph
label={label}
width="100%"
data={graphDataByMetadataLabel[label]}
xHover={xHover}
onHoverX={(x) => x !== xHover && setXHover(x)}
/>
</Box>
) : undefined}
</Box>
))}
</div>
{graphLabels.length === 0 ? (
props.asSidebarSection ? (
<Box
margin={{horizontal: 24, vertical: 12}}
style={{color: ColorsWIP.Gray500, fontSize: '0.8rem'}}
>
No numeric metadata entries available to be graphed.
</Box>
) : (
<Box padding={{horizontal: 24, top: 64}}>
<NonIdealState
shrinkable
icon="linear_scale"
title="No numeric metadata"
description={`Include numeric metadata entries in your materializations and observations to see data graphed by ${props.xAxis}.`}
/>
</Box>
)
) : (
props.xAxis === 'partition' && (
<Box padding={{vertical: 16, horizontal: 24}} style={{color: ColorsWIP.Gray400}}>
When graphing values by partition, the highest data point for each materialized event
label is displayed.
</Box>
)
)}
</>
);
};

const validateHiddenGraphsState = (json: string[]) => (Array.isArray(json) ? json : []);

/**
* Helper function that iterates over the asset materializations and assembles time series data
* and stats for all numeric metadata entries. This function makes the following guaruntees:
*
* - If a metadata entry is sparsely emitted, points are still included for missing x values
* with y = NaN. (For compatiblity with react-chartjs-2)
* - If a metadata entry is generated many times for the same partition, and xAxis = partition,
* the MAX value emitted is used as the data point.
*
* Assumes that the data is pre-sorted in ascending partition order if using xAxis = partition.
*/
const extractNumericData = (datapoints: AssetEventGroup[], xAxis: 'time' | 'partition') => {
const series: {
[metadataEntryLabel: string]: AssetValueGraphData;
} = {};

// Build a set of the numeric metadata entry labels (note they may be sparsely emitted)
const numericMetadataLabels = uniq(
flatMap(datapoints, (e) =>
(e.latest?.metadataEntries || [])
.filter((k) => ['IntMetadataEntry', 'FloatMetadataEntry'].includes(k.__typename))
.map((k) => k.label),
),
);

const append = (label: string, {x, y}: {x: number | string; y: number}) => {
series[label] = series[label] || {minX: 0, maxX: 0, minY: 0, maxY: 0, values: [], xAxis};

if (xAxis === 'partition') {
// If the xAxis is partition keys, the graph may only contain one value for each partition.
// If the existing sample for the partition was null, replace it. Otherwise take the
// most recent value.
const existingForPartition = series[label].values.find((v) => v.x === x);
if (existingForPartition) {
if (!isNaN(y)) {
existingForPartition.y = y;
}
return;
}
}
series[label].values.push({
xNumeric: typeof x === 'number' ? x : series[label].values.length,
x,
y,
});
};

for (const {partition, latest} of datapoints) {
const x = (xAxis === 'partition' ? partition : Number(latest?.timestamp)) || null;

if (x === null) {
// exclude materializations where partition = null from partitioned graphs
continue;
}

// Add an entry for every numeric metadata label
for (const label of numericMetadataLabels) {
const entry = latest?.metadataEntries.find((l) => l.label === label);
if (!entry) {
append(label, {x, y: NaN});
continue;
}

let y = NaN;
if (entry.__typename === 'IntMetadataEntry') {
if (entry.intValue !== null) {
y = entry.intValue;
} else {
// will incur precision loss here
y = parseInt(entry.intRepr);
}
}
if (entry.__typename === 'FloatMetadataEntry' && entry.floatValue !== null) {
y = entry.floatValue;
}

append(label, {x, y});
}
}

for (const serie of Object.values(series)) {
const xs = serie.values.map((v) => v.xNumeric);
const ys = serie.values.map((v) => v.y).filter((v) => !isNaN(v));
serie.minXNumeric = Math.min(...xs);
serie.maxXNumeric = Math.max(...xs);
serie.minY = Math.min(...ys);
serie.maxY = Math.max(...ys);
}
return series;
};
4 changes: 2 additions & 2 deletions js_modules/dagit/packages/core/src/assets/AssetNodeList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {useHistory} from 'react-router-dom';

import {AssetNode} from '../workspace/asset-graph/AssetNode';
import {ForeignNode} from '../workspace/asset-graph/ForeignNode';
import {LiveData} from '../workspace/asset-graph/Utils';
import {LiveData, toGraphId} from '../workspace/asset-graph/Utils';

import {AssetNodeDefinitionFragment_dependencies} from './types/AssetNodeDefinitionFragment';

Expand Down Expand Up @@ -42,7 +42,7 @@ export const AssetNodeList: React.FC<{
inAssetCatalog
jobName={asset.jobNames[0]}
selected={false}
liveData={liveDataByNode[asset.id]}
liveData={liveDataByNode[toGraphId(asset.assetKey)]}
/>
) : (
<ForeignNode assetKey={asset.assetKey} />
Expand Down
59 changes: 25 additions & 34 deletions js_modules/dagit/packages/core/src/assets/AssetView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import {LaunchAssetExecutionButton} from '../workspace/asset-graph/LaunchAssetEx
import {
buildGraphDataFromSingleNode,
buildLiveData,
IN_PROGRESS_RUNS_FRAGMENT,
REPOSITORY_LIVE_FRAGMENT,
LiveData,
toGraphId,
} from '../workspace/asset-graph/Utils';
import {buildRepoAddress} from '../workspace/buildRepoAddress';

Expand All @@ -38,9 +39,9 @@ import {AssetNodeInstigatorTag, ASSET_NODE_INSTIGATORS_FRAGMENT} from './AssetNo
import {AssetPageHeader} from './AssetPageHeader';
import {AssetKey} from './types';
import {
AssetNodeDefinitionRunsQuery,
AssetNodeDefinitionRunsQueryVariables,
} from './types/AssetNodeDefinitionRunsQuery';
AssetNodeDefinitionLiveQuery,
AssetNodeDefinitionLiveQueryVariables,
} from './types/AssetNodeDefinitionLiveQuery';
import {AssetQuery, AssetQueryVariables} from './types/AssetQuery';

interface Props {
Expand All @@ -64,9 +65,6 @@ export const AssetView: React.FC<Props> = ({assetKey}) => {
notifyOnNetworkStatusChange: true,
});

// Refresh immediately when a run is launched from this page
useDidLaunchEvent(queryResult.refetch);

const {assetOrError} = queryResult.data || queryResult.previousData || {};
const asset = assetOrError && assetOrError.__typename === 'Asset' ? assetOrError : null;
const lastMaterializedAt = asset?.assetMaterializations[0]?.timestamp;
Expand All @@ -76,10 +74,10 @@ export const AssetView: React.FC<Props> = ({assetKey}) => {
? buildRepoAddress(definition.repository.name, definition.repository.location.name)
: null;

const inProgressRunsQuery = useQuery<
AssetNodeDefinitionRunsQuery,
AssetNodeDefinitionRunsQueryVariables
>(ASSET_NODE_DEFINITION_RUNS_QUERY, {
const liveQueryResult = useQuery<
AssetNodeDefinitionLiveQuery,
AssetNodeDefinitionLiveQueryVariables
>(ASSET_NODE_DEFINITION_LIVE_QUERY, {
skip: !repoAddress,
variables: {
repositorySelector: {
Expand All @@ -91,16 +89,20 @@ export const AssetView: React.FC<Props> = ({assetKey}) => {
});

const refreshState = useQueryRefreshAtInterval(queryResult, FIFTEEN_SECONDS);
useQueryRefreshAtInterval(inProgressRunsQuery, FIFTEEN_SECONDS);
useQueryRefreshAtInterval(liveQueryResult, FIFTEEN_SECONDS);

// Refresh immediately when a run is launched from this page
useDidLaunchEvent(queryResult.refetch);
useDidLaunchEvent(liveQueryResult.refetch);

let liveDataByNode: LiveData = {};

if (definition) {
const inProgressRuns =
inProgressRunsQuery.data?.repositoryOrError.__typename === 'Repository'
? inProgressRunsQuery.data.repositoryOrError.inProgressRunsByStep
: [];
const repo =
liveQueryResult.data?.repositoryOrError.__typename === 'Repository'
? liveQueryResult.data.repositoryOrError
: null;

if (definition && repo) {
const nodesWithLatestMaterialization = [
definition,
...definition.dependencies.map((d) => d.asset),
Expand All @@ -109,7 +111,7 @@ export const AssetView: React.FC<Props> = ({assetKey}) => {
liveDataByNode = buildLiveData(
buildGraphDataFromSingleNode(definition),
nodesWithLatestMaterialization,
inProgressRuns,
[repo],
);
}

Expand Down Expand Up @@ -209,16 +211,7 @@ export const AssetView: React.FC<Props> = ({assetKey}) => {
params={params}
paramsTimeWindowOnly={!!params.asOf}
setParams={setParams}
liveData={definition ? liveDataByNode[definition.id] : undefined}
repository={
definition?.repository
? {
repositoryLocationName: definition?.repository.location.name,
repositoryName: definition.repository.name,
}
: undefined
}
opName={definition?.opName}
liveData={definition ? liveDataByNode[toGraphId(definition.assetKey)] : undefined}
/>
))}
</div>
Expand Down Expand Up @@ -260,20 +253,18 @@ const ASSET_QUERY = gql`
${ASSET_NODE_DEFINITION_FRAGMENT}
`;

const ASSET_NODE_DEFINITION_RUNS_QUERY = gql`
query AssetNodeDefinitionRunsQuery($repositorySelector: RepositorySelector!) {
const ASSET_NODE_DEFINITION_LIVE_QUERY = gql`
query AssetNodeDefinitionLiveQuery($repositorySelector: RepositorySelector!) {
repositoryOrError(repositorySelector: $repositorySelector) {
__typename
... on Repository {
id
name
inProgressRunsByStep {
...InProgressRunsFragment
}
...RepositoryLiveFragment
}
}
}
${IN_PROGRESS_RUNS_FRAGMENT}
${REPOSITORY_LIVE_FRAGMENT}
`;

const HistoricalViewAlert: React.FC<{
Expand Down

0 comments on commit d07a108

Please sign in to comment.