Skip to content

Commit

Permalink
Filter out asset_materialization_planned event logs client-side (#7397)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Apr 13, 2022
1 parent 9f9db4e commit 6d1cf77
Show file tree
Hide file tree
Showing 16 changed files with 54 additions and 19 deletions.

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ export const LogsRowStructuredContent: React.FC<IStructuredContentProps> = ({nod
timestamp={node.timestamp}
/>
);
case 'AssetMaterializationPlannedEvent':
return <DefaultContent eventType={eventType} message={node.message} />;
case 'ObjectStoreOperationEvent':
return (
<DefaultContent message={node.message} eventType={eventType}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ interface ILogsScrollingTableSizedProps {

function filterLogs(logs: LogsProviderLogs, filter: LogFilter, filterStepKeys: string[]) {
const filteredNodes = logs.allNodes.filter((node) => {
// These events are used to determine which assets a run will materialize and are not intended
// to be displayed in Dagit. Pagination is offset based, so we remove these logs client-side.
if (node.__typename === 'AssetMaterializationPlannedEvent') {
return false;
}
const l = node.__typename === 'LogMessageEvent' ? node.level : 'EVENT';
if (!filter.levels[l]) {
return false;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def from_dagster_event_record(event_record, pipeline_name):
GrapheneAlertFailureEvent,
GrapheneAlertStartEvent,
GrapheneAlertSuccessEvent,
GrapheneAssetMaterializationPlannedEvent,
GrapheneEngineEvent,
GrapheneExecutionStepFailureEvent,
GrapheneExecutionStepInputEvent,
Expand Down Expand Up @@ -227,6 +228,8 @@ def from_dagster_event_record(event_record, pipeline_name):
return GrapheneObservationEvent(
event=event_record,
)
elif dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED:
return GrapheneAssetMaterializationPlannedEvent(event=event_record)
elif dagster_event.event_type == DagsterEventType.STEP_EXPECTATION_RESULT:
expectation_result = dagster_event.event_specific_data.expectation_result
return GrapheneStepExpectationResultEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,6 @@ def _get_error_observable(observer):

def _handle_events(payload):
events, loading_past = payload
events = [
event
for event in events
if event.dagster_event_type != DagsterEventType.ASSET_MATERIALIZATION_PLANNED
]
return GraphenePipelineRunLogsSubscriptionSuccess(
run=GrapheneRun(record),
messages=[from_event_record(event, run.pipeline_name) for event in events],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
def types():
from .compute_logs import GrapheneComputeLogFile, GrapheneComputeLogs
from .events import (
GrapheneAssetMaterializationPlannedEvent,
GrapheneDisplayableEvent,
GrapheneEngineEvent,
GrapheneExecutionStepFailureEvent,
Expand Down Expand Up @@ -87,4 +88,5 @@ def types():
GrapheneMaterializationEvent,
GrapheneObservationEvent,
GrapheneTypeCheck,
GrapheneAssetMaterializationPlannedEvent,
]
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,25 @@ def __init__(self, event):
)


class GrapheneAssetMaterializationPlannedEvent(graphene.ObjectType):
assetKey = graphene.Field(GrapheneAssetKey)
runOrError = graphene.NonNull("dagster_graphql.schema.pipelines.pipeline.GrapheneRunOrError")

class Meta:
name = "AssetMaterializationPlannedEvent"
interfaces = (GrapheneMessageEvent, GrapheneRunEvent)

def __init__(self, event):
self._event = event
super().__init__(**construct_basic_params(event))

def resolve_assetKey(self, _graphene_info):
return self._event.dagster_event.asset_materialization_planned_data

def resolve_runOrError(self, graphene_info):
return get_run_by_id(graphene_info, self._event.run_id)


class GrapheneHandledOutputEvent(graphene.ObjectType):
class Meta:
interfaces = (GrapheneMessageEvent, GrapheneStepEvent, GrapheneDisplayableEvent)
Expand Down Expand Up @@ -464,6 +483,7 @@ class Meta:
GrapheneAlertStartEvent,
GrapheneAlertSuccessEvent,
GrapheneAlertFailureEvent,
GrapheneAssetMaterializationPlannedEvent,
)
name = "DagsterRunEvent"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import yaml

from dagster import check
from dagster.core.events import DagsterEventType
from dagster.core.host_representation.external import ExternalExecutionPlan, ExternalPipeline
from dagster.core.host_representation.external_data import ExternalPresetData
from dagster.core.storage.pipeline_run import PipelineRunStatus, RunRecord, RunsFilter
Expand Down Expand Up @@ -369,11 +368,6 @@ def resolve_assetMaterializations(self, graphene_info):

def resolve_events(self, graphene_info, after=-1):
events = graphene_info.context.instance.logs_after(self.run_id, cursor=after)
events = [
event
for event in events
if event.dagster_event_type != DagsterEventType.ASSET_MATERIALIZATION_PLANNED
]
return [from_event_record(event, self._pipeline_run.pipeline_name) for event in events]

def _get_run_record(self, instance):
Expand Down

0 comments on commit 6d1cf77

Please sign in to comment.