Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/n] Move subscription to web worker #9795

Closed
79 changes: 79 additions & 0 deletions js_modules/dagit/packages/core/src/app/MetadataEntryFragment.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {gql} from '@apollo/client';

export const TABLE_SCHEMA_FRAGMENT = gql`
fragment TableSchemaFragment on TableSchema {
__typename
columns {
name
description
type
constraints {
nullable
unique
other
}
}
constraints {
other
}
}
`;

export const METADATA_ENTRY_FRAGMENT = gql`
fragment MetadataEntryFragment on MetadataEntry {
__typename
label
description
... on PathMetadataEntry {
path
}
... on JsonMetadataEntry {
jsonString
}
... on UrlMetadataEntry {
url
}
... on TextMetadataEntry {
text
}
... on MarkdownMetadataEntry {
mdStr
}
... on PythonArtifactMetadataEntry {
module
name
}
... on FloatMetadataEntry {
floatValue
}
... on IntMetadataEntry {
intValue
intRepr
}
... on BoolMetadataEntry {
boolValue
}
... on PipelineRunMetadataEntry {
runId
}
... on AssetMetadataEntry {
assetKey {
path
}
}
... on TableMetadataEntry {
table {
records
schema {
...TableSchemaFragment
}
}
}
... on TableSchemaMetadataEntry {
schema {
...TableSchemaFragment
}
}
}
${TABLE_SCHEMA_FRAGMENT}
`;
13 changes: 13 additions & 0 deletions js_modules/dagit/packages/core/src/app/PythonErrorFragment.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {gql} from '@apollo/client';

export const PYTHON_ERROR_FRAGMENT = gql`
fragment PythonErrorFragment on PythonError {
__typename
message
stack
cause {
message
stack
}
}
`;
Comment on lines +1 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fragment is already defined in PythonErrorInfo, can you reuse that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I didn't want the initial PR to have a ton of changes so that its easier to review but I'll do what I did in the original PR where I moved the dependencies around and changed all the imports so that theres only 1 definition

67 changes: 23 additions & 44 deletions js_modules/dagit/packages/core/src/runs/LogsProvider.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {gql, useApolloClient, useQuery, useSubscription} from '@apollo/client';
import {gql, useApolloClient, useQuery} from '@apollo/client';
import {TokenizingFieldValue} from '@dagster-io/ui';
import throttle from 'lodash/throttle';
import * as React from 'react';
Expand All @@ -10,14 +10,14 @@ import {LogLevelCounts} from './LogsToolbar';
import {RunFragments} from './RunFragments';
import {logNodeLevel} from './logNodeLevel';
import {LogNode} from './types';
import {
PipelineRunLogsSubscription,
PipelineRunLogsSubscriptionVariables,
PipelineRunLogsSubscription_pipelineRunLogs_PipelineRunLogsSubscriptionSuccess,
} from './types/PipelineRunLogsSubscription';
import {PipelineRunLogsSubscription_pipelineRunLogs_PipelineRunLogsSubscriptionSuccess} from './types/PipelineRunLogsSubscription';
import {PipelineRunLogsSubscriptionStatusFragment} from './types/PipelineRunLogsSubscriptionStatusFragment';
import {RunDagsterRunEventFragment} from './types/RunDagsterRunEventFragment';
import {RunLogsQuery, RunLogsQueryVariables} from './types/RunLogsQuery';
import {
usePipelineRunLogsSubscription,
usePipelineRunLogsSubscriptionWorker,
} from './usePipelineRunLogsSubscription';

export interface LogFilterValue extends TokenizingFieldValue {
token?: 'step' | 'type' | 'query';
Expand Down Expand Up @@ -174,36 +174,39 @@ const useLogsProviderWithSubscription = (runId: string) => {
const cursor = lastLog.cursor;

dispatch({type: 'append', queued: queuedMessages, hasMore, cursor});
const nextPipelineStatus = pipelineStatusFromMessages(queuedMessages);

// If we're still loading past events, don't sync to the cache -- event chunks could
// give us `status` values that don't match the actual state of the run.
if (nextPipelineStatus && !hasMore) {
syncPipelineStatusToApolloCache(nextPipelineStatus);
if (!hasMore) {
const nextPipelineStatus = pipelineStatusFromMessages(queuedMessages);
if (nextPipelineStatus) {
syncPipelineStatusToApolloCache(nextPipelineStatus);
}
}
}, BATCH_INTERVAL);
}, [syncPipelineStatusToApolloCache]);

const {nodes, counts, cursor, loading} = state;

useSubscription<PipelineRunLogsSubscription, PipelineRunLogsSubscriptionVariables>(
PIPELINE_RUN_LOGS_SUBSCRIPTION,
{
fetchPolicy: 'no-cache',
variables: {runId, cursor},
onSubscriptionData: ({subscriptionData}) => {
const logs = subscriptionData.data?.pipelineRunLogs;
if (!logs || logs.__typename === 'PipelineRunLogsSubscriptionFailure') {
return;
}
const useHook = new URLSearchParams(window.location.search).get('worker')
? usePipelineRunLogsSubscriptionWorker
: usePipelineRunLogsSubscription;
salazarm marked this conversation as resolved.
Show resolved Hide resolved

console.log({useHook});

useHook(
{runId, cursor},
React.useCallback(
(logs: PipelineRunLogsSubscription_pipelineRunLogs_PipelineRunLogsSubscriptionSuccess) => {
// Maintain a queue of messages as they arrive, and call the throttled setter.
queue.current = [...queue.current, logs];
// Wait until end of animation frame to call throttled set nodes
// otherwise we wont end up batching anything if rendering takes
// longer than the BATCH_INTERVAL
requestAnimationFrame(throttledSetNodes);
},
},
[throttledSetNodes],
),
);

return React.useMemo(
Expand Down Expand Up @@ -298,30 +301,6 @@ export const LogsProvider: React.FC<LogsProviderProps> = (props) => {
return <LogsProviderWithSubscription runId={runId}>{children}</LogsProviderWithSubscription>;
};

const PIPELINE_RUN_LOGS_SUBSCRIPTION = gql`
subscription PipelineRunLogsSubscription($runId: ID!, $cursor: String) {
pipelineRunLogs(runId: $runId, cursor: $cursor) {
__typename
... on PipelineRunLogsSubscriptionSuccess {
messages {
... on MessageEvent {
runId
}
...RunDagsterRunEventFragment
}
hasMorePastEvents
cursor
}
... on PipelineRunLogsSubscriptionFailure {
missingRunId
message
}
}
}

${RunFragments.RunDagsterRunEventFragment}
`;

const PIPELINE_RUN_LOGS_SUBSCRIPTION_STATUS_FRAGMENT = gql`
fragment PipelineRunLogsSubscriptionStatusFragment on Run {
id
Expand Down