Skip to content

Commit

Permalink
Fetch asset materialization planned event from index shard in Sqlite (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Oct 31, 2022
1 parent 36f7c38 commit 69bfd0c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.errors import DagsterEventLogInvalidForRun
from dagster._core.event_api import RunShardedEventsCursor
from dagster._core.events import MARKER_EVENTS, DagsterEventType
from dagster._core.events import ASSET_EVENTS, MARKER_EVENTS, DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.stats import build_run_step_stats_from_events
from dagster._serdes import (
Expand Down Expand Up @@ -217,11 +217,7 @@ def store_event(self, event):

if (
event.is_dagster_event
and (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_observation
or event.dagster_event.is_asset_materialization_planned
)
and event.dagster_event_type in ASSET_EVENTS
and event.dagster_event.asset_key
):
self.store_asset_event(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._config import StringSource
from dagster._core.events import DagsterEventType
from dagster._core.events import ASSET_EVENTS
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.event_log.base import EventLogCursor, EventLogRecord, EventRecordsFilter
from dagster._core.storage.pipeline_run import PipelineRunStatus, RunsFilter
Expand Down Expand Up @@ -227,22 +227,15 @@ def store_event(self, event):

if event.is_dagster_event and event.dagster_event.asset_key:
check.invariant(
event.dagster_event_type == DagsterEventType.ASSET_MATERIALIZATION
or event.dagster_event_type == DagsterEventType.ASSET_OBSERVATION
or event.dagster_event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
event.dagster_event_type in ASSET_EVENTS,
"Can only store asset materializations, materialization_planned, and observations in index database",
)

# mirror the event in the cross-run index database
with self.index_connection() as conn:
conn.execute(insert_event_statement)

if (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_observation
or event.dagster_event.is_asset_materialization_planned
):
self.store_asset_event(event)
self.store_asset_event(event)

def get_event_records(
self,
Expand All @@ -259,13 +252,10 @@ def get_event_records(
check.opt_int_param(limit, "limit")
check.bool_param(ascending, "ascending")

is_asset_query = event_records_filter and (
event_records_filter.event_type == DagsterEventType.ASSET_MATERIALIZATION
or event_records_filter.event_type == DagsterEventType.ASSET_OBSERVATION
)
is_asset_query = event_records_filter and event_records_filter.event_type in ASSET_EVENTS
if is_asset_query:
# asset materializations and observations get mirrored into the index shard, so no
# custom run shard-aware cursor logic needed
# asset materializations, observations and materialization planned events
# get mirrored into the index shard, so no custom run shard-aware cursor logic needed
return super(SqliteEventLogStorage, self).get_event_records(
event_records_filter=event_records_filter, limit=limit, ascending=ascending
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from collections import Counter
from contextlib import ExitStack, contextmanager
from typing import List, Optional, cast
from typing import List, Optional, Sequence, cast

import mock
import pendulum
Expand All @@ -19,6 +19,7 @@
EventLogRecord,
EventRecordsFilter,
Field,
JobDefinition,
Output,
RetryRequested,
RunShardedEventsCursor,
Expand Down Expand Up @@ -48,6 +49,7 @@
InProcessRepositoryLocationOrigin,
)
from dagster._core.storage.event_log import InMemoryEventLogStorage, SqlEventLogStorage
from dagster._core.storage.event_log.base import EventLogStorage
from dagster._core.storage.event_log.migration import (
EVENT_LOG_DATA_MIGRATIONS,
migrate_asset_key_data,
Expand Down Expand Up @@ -314,8 +316,16 @@ def cursor_datetime_args():
yield datetime.datetime.now()


def _execute_job_and_store_events(instance, storage, job, run_id):
result = job.execute_in_process(instance=instance, raise_on_error=False, run_id=run_id)
def _execute_job_and_store_events(
instance: DagsterInstance,
storage: EventLogStorage,
job: JobDefinition,
run_id: Optional[str] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
):
result = job.execute_in_process(
instance=instance, raise_on_error=False, run_id=run_id, asset_selection=asset_selection
)
events = instance.all_logs(run_id=result.run_id)
for event in events:
storage.store_event(event)
Expand Down Expand Up @@ -2129,6 +2139,56 @@ def observe_asset():
asset_entry = storage.get_asset_records([asset_key])[0].asset_entry
assert asset_entry.last_run_id is None

def test_fetch_asset_materialization_planned(self, storage, instance):
@asset
def materializes_asset():
return 1

@asset
def never_materializes_asset():
raise Exception("foo")

asset_job = build_assets_job("asset_job", [never_materializes_asset, materializes_asset])

run_id_1 = make_new_run_id()
run_id_2 = make_new_run_id()
with create_and_delete_test_runs(instance, [run_id_1, run_id_2]):

with instance_for_test() as created_instance:
if not storage._instance: # pylint: disable=protected-access
storage.register_instance(created_instance)

result = _execute_job_and_store_events(
created_instance,
storage,
asset_job,
run_id=run_id_1,
asset_selection=[AssetKey("materializes_asset")],
)
assert result.success

materializations = created_instance.get_event_records(
EventRecordsFilter(DagsterEventType.ASSET_MATERIALIZATION)
)
assert len(materializations) == 1
storage_id = materializations[0].storage_id

result = _execute_job_and_store_events(
created_instance,
storage,
asset_job,
run_id=run_id_2,
asset_selection=[AssetKey("never_materializes_asset")],
)

materialization_planned_events = created_instance.get_event_records(
EventRecordsFilter(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED, after_cursor=storage_id
)
)

assert len(materialization_planned_events) == 1

def test_last_run_id_updates_on_materialization_planned(self, storage, instance):
@asset
def never_materializes_asset():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sqlalchemy as db

import dagster._check as check
from dagster._core.events import ASSET_EVENTS
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.config import pg_config
from dagster._core.storage.event_log import (
Expand Down Expand Up @@ -163,11 +164,7 @@ def store_event(self, event):

if (
event.is_dagster_event
and (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_observation
or event.dagster_event.is_asset_materialization_planned
)
and event.dagster_event_type in ASSET_EVENTS
and event.dagster_event.asset_key
):
self.store_asset_event(event)
Expand Down

0 comments on commit 69bfd0c

Please sign in to comment.