Skip to content

Commit

Permalink
create get_records_for_run for event log storage, using an opaque str…
Browse files Browse the repository at this point in the history
…ing cursor (#7973)

* create underlying get_records_for_run, which uses opaque string cursor

* base64 encode event log cursor strings

* fix docstring

* fix run observable

* fix

* fix more tests; use deserialize_as

* add cursor to the watch callback signature, observable payload

* fix tests

* change logs provider to use eventsConnection, pass through opaque cursor

* fix tests

* fix graphql test

* address comments
  • Loading branch information
prha committed May 23, 2022
1 parent d5d7068 commit 1bda005
Show file tree
Hide file tree
Showing 30 changed files with 904 additions and 704 deletions.
17 changes: 12 additions & 5 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 28 additions & 33 deletions js_modules/dagit/packages/core/src/runs/LogsProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ const BATCH_INTERVAL = 100;

type State = {
nodes: Nodes;
cursor: number;
cursor: string | null;
loading: boolean;
};

type Action =
| {type: 'append'; queued: RunDagsterRunEventFragment[]; hasMore: boolean}
| {type: 'set-cursor'}
| {type: 'append'; queued: RunDagsterRunEventFragment[]; hasMore: boolean; cursor: string}
| {type: 'reset'};

const reducer = (state: State, action: Action) => {
Expand All @@ -79,25 +78,22 @@ const reducer = (state: State, action: Action) => {
...m,
clientsideKey: `csk${idx}`,
}));
return {...state, nodes, loading: action.hasMore};
case 'set-cursor':
return {...state, cursor: state.nodes.length - 1};
return {...state, nodes, loading: action.hasMore, cursor: action.cursor};
case 'reset':
return {nodes: [], cursor: -1, loading: true};
return {nodes: [], cursor: null, loading: true};
default:
return state;
}
};

const initialState = {
nodes: [],
cursor: -1,
cursor: null,
loading: true,
};

const useLogsProviderWithSubscription = (runId: string) => {
const client = useApolloClient();
const {websocketClient} = React.useContext(WebSocketContext);
const queue = React.useRef<RunDagsterRunEventFragment[]>([]);
const [state, dispatch] = React.useReducer(reducer, initialState);

Expand Down Expand Up @@ -132,13 +128,6 @@ const useLogsProviderWithSubscription = (runId: string) => {
[client, runId],
);

// If the WebSocket disconnects, move the cursor to the end to ensure that we don't
// incorrectly refetch logs that we already have.
React.useEffect(() => {
const unlisten = websocketClient?.onDisconnected(() => dispatch({type: 'set-cursor'}));
return () => unlisten && unlisten();
}, [websocketClient]);

React.useEffect(() => {
queue.current = [];
dispatch({type: 'reset'});
Expand All @@ -147,10 +136,10 @@ const useLogsProviderWithSubscription = (runId: string) => {
// Batch the nodes together so they don't overwhelm the animation of the Gantt,
// which depends on a bit of a timing delay to maintain smoothness.
const throttledSetNodes = React.useMemo(() => {
return throttle((hasMore: boolean) => {
return throttle((hasMore: boolean, cursor: string) => {
const queued = [...queue.current];
queue.current = [];
dispatch({type: 'append', queued, hasMore});
dispatch({type: 'append', queued, hasMore, cursor});
}, BATCH_INTERVAL);
}, []);

Expand All @@ -160,14 +149,14 @@ const useLogsProviderWithSubscription = (runId: string) => {
PIPELINE_RUN_LOGS_SUBSCRIPTION,
{
fetchPolicy: 'no-cache',
variables: {runId, after: cursor},
variables: {runId, cursor},
onSubscriptionData: ({subscriptionData}) => {
const logs = subscriptionData.data?.pipelineRunLogs;
if (!logs || logs.__typename === 'PipelineRunLogsSubscriptionFailure') {
return;
}

const {messages, hasMorePastEvents} = logs;
const {messages, hasMorePastEvents, cursor} = logs;
const nextPipelineStatus = pipelineStatusFromMessages(messages);

// If we're still loading past events, don't sync to the cache -- event chunks could
Expand All @@ -178,7 +167,7 @@ const useLogsProviderWithSubscription = (runId: string) => {

// Maintain a queue of messages as they arrive, and call the throttled setter.
queue.current = [...queue.current, ...messages];
throttledSetNodes(hasMorePastEvents);
throttledSetNodes(hasMorePastEvents, cursor);
},
},
);
Expand Down Expand Up @@ -209,13 +198,13 @@ const POLL_INTERVAL = 5000;
const LogsProviderWithQuery = (props: LogsProviderWithQueryProps) => {
const {children, runId} = props;
const [nodes, setNodes] = React.useState<LogNode[]>(() => []);
const [after, setAfter] = React.useState<number>(-1);
const [cursor, setCursor] = React.useState<string | null>(null);

const {stopPolling, startPolling} = useQuery<RunLogsQuery, RunLogsQueryVariables>(
RUN_LOGS_QUERY,
{
notifyOnNetworkStatusChange: true,
variables: {runId, after},
variables: {runId, cursor},
pollInterval: POLL_INTERVAL,
onCompleted: (data: RunLogsQuery) => {
// We have to stop polling in order to update the `after` value.
Expand All @@ -224,7 +213,7 @@ const LogsProviderWithQuery = (props: LogsProviderWithQueryProps) => {
const slice = () => {
const count = nodes.length;
if (data?.pipelineRunOrError.__typename === 'Run') {
return data?.pipelineRunOrError.events.map((event, ii) => ({
return data?.pipelineRunOrError.eventConnection.events.map((event, ii) => ({
...event,
clientsideKey: `csk${count + ii}`,
}));
Expand All @@ -234,7 +223,9 @@ const LogsProviderWithQuery = (props: LogsProviderWithQueryProps) => {

const newSlice = slice();
setNodes((current) => [...current, ...newSlice]);
setAfter((current) => current + newSlice.length);
if (data?.pipelineRunOrError.__typename === 'Run') {
setCursor(data.pipelineRunOrError.eventConnection.cursor);
}

const status =
data?.pipelineRunOrError.__typename === 'Run' ? data?.pipelineRunOrError.status : null;
Expand Down Expand Up @@ -279,8 +270,8 @@ export const LogsProvider: React.FC<LogsProviderProps> = (props) => {
};

const PIPELINE_RUN_LOGS_SUBSCRIPTION = gql`
subscription PipelineRunLogsSubscription($runId: ID!, $after: Cursor) {
pipelineRunLogs(runId: $runId, after: $after) {
subscription PipelineRunLogsSubscription($runId: ID!, $cursor: String) {
pipelineRunLogs(runId: $runId, cursor: $cursor) {
__typename
... on PipelineRunLogsSubscriptionSuccess {
messages {
Expand All @@ -290,6 +281,7 @@ const PIPELINE_RUN_LOGS_SUBSCRIPTION = gql`
...RunDagsterRunEventFragment
}
hasMorePastEvents
cursor
}
... on PipelineRunLogsSubscriptionFailure {
missingRunId
Expand All @@ -311,19 +303,22 @@ const PIPELINE_RUN_LOGS_SUBSCRIPTION_STATUS_FRAGMENT = gql`
`;

const RUN_LOGS_QUERY = gql`
query RunLogsQuery($runId: ID!, $after: Cursor) {
query RunLogsQuery($runId: ID!, $cursor: String) {
pipelineRunOrError(runId: $runId) {
... on Run {
id
runId
status
canTerminate
events(after: $after) {
... on MessageEvent {
runId
eventConnection(afterCursor: $cursor) {
events {
__typename
... on MessageEvent {
runId
}
...RunDagsterRunEventFragment
}
...RunDagsterRunEventFragment
__typename
cursor
}
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1bda005

Please sign in to comment.