Skip to content

Commit

Permalink
extract event connection to top-level graphql query (#8077)
Browse files Browse the repository at this point in the history
* extract event connection to top-level graphql query

* fix graphql tests

* leave old field in place to avoid push errors

* add meta name
  • Loading branch information
prha committed Jun 9, 2022
1 parent 455fb97 commit d1319e2
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 478 deletions.

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 27 additions & 15 deletions js_modules/dagit/packages/core/src/runs/LogsProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const pipelineStatusFromMessages = (messages: RunDagsterRunEventFragment[]) => {
};

const BATCH_INTERVAL = 100;
const QUERY_LOG_LIMIT = 10000;

type State = {
nodes: Nodes;
Expand Down Expand Up @@ -204,16 +205,23 @@ const LogsProviderWithQuery = (props: LogsProviderWithQueryProps) => {
RUN_LOGS_QUERY,
{
notifyOnNetworkStatusChange: true,
variables: {runId, cursor},
variables: {runId, cursor, limit: QUERY_LOG_LIMIT},
pollInterval: POLL_INTERVAL,
onCompleted: (data: RunLogsQuery) => {
// We have to stop polling in order to update the `after` value.
stopPolling();

if (
data?.pipelineRunOrError.__typename !== 'Run' ||
data?.logsForRun.__typename !== 'EventConnection'
) {
return;
}

const slice = () => {
const count = nodes.length;
if (data?.pipelineRunOrError.__typename === 'Run') {
return data?.pipelineRunOrError.eventConnection.events.map((event, ii) => ({
if (data?.logsForRun.__typename === 'EventConnection') {
return data?.logsForRun.events.map((event, ii) => ({
...event,
clientsideKey: `csk${count + ii}`,
}));
Expand All @@ -223,9 +231,7 @@ const LogsProviderWithQuery = (props: LogsProviderWithQueryProps) => {

const newSlice = slice();
setNodes((current) => [...current, ...newSlice]);
if (data?.pipelineRunOrError.__typename === 'Run') {
setCursor(data.pipelineRunOrError.eventConnection.cursor);
}
setCursor(data.logsForRun.cursor);

const status =
data?.pipelineRunOrError.__typename === 'Run' ? data?.pipelineRunOrError.status : null;
Expand Down Expand Up @@ -303,23 +309,29 @@ const PIPELINE_RUN_LOGS_SUBSCRIPTION_STATUS_FRAGMENT = gql`
`;

const RUN_LOGS_QUERY = gql`
query RunLogsQuery($runId: ID!, $cursor: String) {
query RunLogsQuery($runId: ID!, $cursor: String, $limit: Int) {
pipelineRunOrError(runId: $runId) {
... on Run {
id
runId
status
canTerminate
eventConnection(afterCursor: $cursor) {
events {
__typename
... on MessageEvent {
runId
}
...RunDagsterRunEventFragment
}
}
logsForRun(runId: $runId, afterCursor: $cursor, limit: $limit) {
... on EventConnection {
events {
__typename
... on MessageEvent {
runId
}
cursor
...RunDagsterRunEventFragment
}
cursor
}
... on PythonError {
message
stack
}
}
}
Expand Down
905 changes: 459 additions & 446 deletions js_modules/dagit/packages/core/src/runs/types/RunLogsQuery.ts

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions python_modules/dagster-graphql/dagster_graphql/client/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,13 @@
MESSAGE_EVENT_FRAGMENTS
+ """
query pipelineRunEvents($runId: ID!, $cursor: String) {
pipelineRunOrError(runId: $runId) {
logsForRun(runId: $runId, afterCursor: $cursor) {
__typename
... on PipelineRun {
eventConnection(afterCursor: $cursor) {
events {
...messageEventFragment
}
cursor
... on EventConnection {
events {
...messageEventFragment
}
cursor
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dagster.core.storage.pipeline_run import RunRecord, RunsFilter
from dagster.core.storage.tags import TagType, get_tag_type

from .events import from_event_record
from .external import ensure_valid_config, get_external_pipeline_or_raise
from .utils import UserFacingGraphQLError, capture_error

Expand Down Expand Up @@ -314,3 +315,23 @@ def get_step_stats(graphene_info, run_id, step_keys=None):

step_stats = graphene_info.context.instance.get_run_step_stats(run_id, step_keys)
return [GrapheneRunStepStats(stats) for stats in step_stats]


@capture_error
def get_logs_for_run(graphene_info, run_id, cursor=None, limit=None):
from ..schema.errors import GrapheneRunNotFoundError
from ..schema.pipelines.pipeline import GrapheneEventConnection

instance = graphene_info.context.instance
run = instance.get_run_by_id(run_id)
if not run:
return GrapheneRunNotFoundError(run_id)

conn = instance.get_records_for_run(run_id, cursor=cursor, limit=limit)
return GrapheneEventConnection(
events=[
from_event_record(record.event_log_entry, run.pipeline_name) for record in conn.records
],
cursor=conn.cursor,
hasMore=conn.has_more,
)
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,20 @@ def resolve_assetObservations(self, graphene_info, **kwargs):


class GrapheneEventConnection(graphene.ObjectType):
class Meta:
name = "EventConnection"

events = non_null_list(GrapheneDagsterRunEvent)
cursor = graphene.NonNull(graphene.String)
hasMore = graphene.NonNull(graphene.Boolean)


class GrapheneEventConnectionOrError(graphene.Union):
class Meta:
types = (GrapheneEventConnection, GrapheneRunNotFoundError, GraphenePythonError)
name = "EventConnectionOrError"


class GraphenePipelineRun(graphene.Interface):
id = graphene.NonNull(graphene.ID)
runId = graphene.NonNull(graphene.String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from ...implementation.fetch_runs import (
get_execution_plan,
get_logs_for_run,
get_run_by_id,
get_run_group,
get_run_groups,
Expand Down Expand Up @@ -77,7 +78,7 @@
from ..partition_sets import GraphenePartitionSetOrError, GraphenePartitionSetsOrError
from ..permissions import GraphenePermission
from ..pipelines.config_result import GraphenePipelineConfigValidationResult
from ..pipelines.pipeline import GrapheneRunOrError
from ..pipelines.pipeline import GrapheneEventConnectionOrError, GrapheneRunOrError
from ..pipelines.snapshot import GraphenePipelineSnapshotOrError
from ..run_config import GrapheneRunConfigSchemaOrError
from ..runs import (
Expand Down Expand Up @@ -276,6 +277,13 @@ class Meta:
assetKeys=graphene.Argument(graphene.List(graphene.NonNull(GrapheneAssetKeyInput))),
)

logsForRun = graphene.Field(
graphene.NonNull(GrapheneEventConnectionOrError),
runId=graphene.NonNull(graphene.ID),
afterCursor=graphene.String(),
limit=graphene.Int(),
)

def resolve_repositoriesOrError(self, graphene_info, **kwargs):
if kwargs.get("repositorySelector"):
return GrapheneRepositoryConnection(
Expand Down Expand Up @@ -539,3 +547,6 @@ def resolve_assetsLatestInfo(self, graphene_info, **kwargs):
}

return get_assets_live_info(graphene_info, step_keys_by_asset)

def resolve_logsForRun(self, graphene_info, runId, afterCursor=None, limit=None):
return get_logs_for_run(graphene_info, runId, afterCursor, limit)
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,11 @@ def test_basic_start_pipeline_and_fetch(self, graphql_context):

assert not events_result.errors
assert events_result.data
assert events_result.data["pipelineRunOrError"]["__typename"] == "Run"
assert events_result.data["logsForRun"]["__typename"] == "EventConnection"

non_engine_event_types = [
message["__typename"]
for message in events_result.data["pipelineRunOrError"]["eventConnection"]["events"]
for message in events_result.data["logsForRun"]["events"]
if message["__typename"] not in ("EngineEvent", "RunEnqueuedEvent", "RunDequeuedEvent")
]
assert non_engine_event_types == self._csv_hello_world_event_sequence()
Expand Down Expand Up @@ -433,10 +433,10 @@ def _fetch_events(cursor):
)
assert not events_result.errors
assert events_result.data
assert events_result.data["pipelineRunOrError"]["__typename"] == "Run"
assert events_result.data["logsForRun"]["__typename"] == "EventConnection"
return (
events_result.data["pipelineRunOrError"]["eventConnection"]["events"],
events_result.data["pipelineRunOrError"]["eventConnection"]["cursor"],
events_result.data["logsForRun"]["events"],
events_result.data["logsForRun"]["cursor"],
)

full_logs = []
Expand Down

0 comments on commit d1319e2

Please sign in to comment.