Skip to content

Commit

Permalink
Update AssetKeyTable.last_run_id upon materialization planned (#7319)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Apr 12, 2022
1 parent 925ddbc commit f23a00b
Show file tree
Hide file tree
Showing 10 changed files with 541 additions and 271 deletions.
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ def is_expectation_result(self) -> bool:
def is_asset_observation(self) -> bool:
return self.event_type == DagsterEventType.ASSET_OBSERVATION

@property
def is_asset_materialization_planned(self) -> bool:
return self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED

@property
def asset_key(self) -> Optional[AssetKey]:
if self.event_type == DagsterEventType.ASSET_MATERIALIZATION:
Expand Down
8 changes: 7 additions & 1 deletion python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
from dagster.core.snap import ExecutionPlanSnapshot, PipelineSnapshot
from dagster.core.storage.compute_log_manager import ComputeLogManager
from dagster.core.storage.event_log import EventLogStorage
from dagster.core.storage.event_log.base import EventLogRecord, EventRecordsFilter
from dagster.core.storage.event_log.base import AssetRecord, EventLogRecord, EventRecordsFilter
from dagster.core.storage.root import LocalArtifactStorage
from dagster.core.storage.runs import RunStorage
from dagster.core.storage.schedules import ScheduleStorage
Expand Down Expand Up @@ -1227,6 +1227,12 @@ def get_event_records(
"""
return self._event_storage.get_event_records(event_records_filter, limit, ascending)

@traced
def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
) -> Iterable["AssetRecord"]:
return self._event_storage.get_asset_records(asset_keys)

@traced
def events_for_asset_key(
self,
Expand Down
41 changes: 41 additions & 0 deletions python_modules/dagster/dagster/core/storage/event_log/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)

from dagster import check
from dagster.core.assets import AssetDetails
from dagster.core.definitions.events import AssetKey
from dagster.core.events import DagsterEventType
from dagster.core.events.log import EventLogEntry
Expand Down Expand Up @@ -47,6 +48,40 @@ class EventLogRecord(NamedTuple):
event_log_entry: EventLogEntry


class AssetEntry(
NamedTuple(
"_AssetEntry",
[
("asset_key", AssetKey),
("last_materialization", Optional[EventLogEntry]),
("last_run_id", Optional[str]),
("asset_details", Optional[AssetDetails]),
],
)
):
def __new__(
cls,
asset_key: AssetKey,
last_materialization: Optional[EventLogEntry] = None,
last_run_id: Optional[str] = None,
asset_details: Optional[AssetDetails] = None,
):
return super(AssetEntry, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
last_materialization=check.opt_inst_param(
last_materialization, "last_materialization", EventLogEntry
),
last_run_id=check.opt_str_param(last_run_id, "last_run_id"),
asset_details=check.opt_inst_param(asset_details, "asset_details", AssetDetails),
)


class AssetRecord(NamedTuple):
storage_id: int
asset_entry: AssetEntry


@whitelist_for_serdes
class EventRecordsFilter(
NamedTuple(
Expand Down Expand Up @@ -215,6 +250,12 @@ def get_event_records(
) -> Iterable[EventLogRecord]:
pass

@abstractmethod
def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
) -> Iterable[AssetRecord]:
pass

@abstractmethod
def has_asset_key(self, asset_key: AssetKey) -> bool:
pass
Expand Down
60 changes: 60 additions & 0 deletions python_modules/dagster/dagster/core/storage/event_log/in_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
from typing import Dict, Iterable, Mapping, Optional, Sequence

from dagster import check
from dagster.core.assets import AssetDetails
from dagster.core.definitions.events import AssetKey
from dagster.core.events import DagsterEventType
from dagster.core.events.log import EventLogEntry
from dagster.core.storage.event_log.base import AssetEntry, AssetRecord
from dagster.serdes import ConfigurableClass
from dagster.utils import utc_datetime_from_timestamp

from .base import (
EventLogRecord,
Expand All @@ -33,6 +36,7 @@ def __init__(self, inst_data=None, preload=None):
if preload:
for payload in preload:
self._logs[payload.pipeline_run.run_id] = payload.event_list
self._assets = defaultdict(dict)

super().__init__()

Expand Down Expand Up @@ -100,6 +104,17 @@ def store_event(self, event):
run_id = event.run_id
self._logs[run_id].append(event)

if (
event.is_dagster_event
and (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_observation
or event.dagster_event.is_asset_materialization_planned
)
and event.dagster_event.asset_key
):
self.store_asset_event(event)

# snapshot handlers
handlers = list(self._handlers[run_id])

Expand All @@ -109,6 +124,23 @@ def store_event(self, event):
except Exception:
logging.exception("Exception in callback for event watch on run %s.", run_id)

def store_asset_event(self, event):
asset_key = event.dagster_event.asset_key
asset = self._assets[asset_key] if asset_key in self._assets else {"id": len(self._assets)}

asset["last_materialization_timestamp"] = utc_datetime_from_timestamp(event.timestamp)
if event.dagster_event.is_step_materialization:
materialization = event.dagster_event.step_materialization_data.materialization
asset["last_materialization"] = event
asset["tags"] = materialization.tags if materialization.tags else None
if (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_materialization_planned
):
asset["last_run_id"] = event.run_id

self._assets[asset_key] = asset

def delete_events(self, run_id):
del self._logs[run_id]

Expand Down Expand Up @@ -220,6 +252,32 @@ def _apply_filters(record):

return event_records

def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
) -> Iterable[AssetRecord]:
records = []
for asset_key, asset in self._assets.items():
if not asset_keys or asset_key in asset_keys:
wipe_timestamp = self._wiped_asset_keys.get(asset_key)
if (
not wipe_timestamp
or wipe_timestamp < asset.get("last_materialization_timestamp").timestamp()
):
records.append(
AssetRecord(
storage_id=asset["id"],
asset_entry=AssetEntry(
asset_key=asset_key,
last_materialization=asset.get("last_materialization"),
last_run_id=asset.get("last_run_id"),
asset_details=AssetDetails(last_wipe_timestamp=wipe_timestamp)
if wipe_timestamp
else None,
),
)
)
return records

def has_asset_key(self, asset_key: AssetKey) -> bool:
for records in self._logs.values():
for record in records:
Expand Down Expand Up @@ -325,6 +383,8 @@ def get_asset_run_ids(self, asset_key):
def wipe_asset(self, asset_key):
check.inst_param(asset_key, "asset_key", AssetKey)
self._wiped_asset_keys[asset_key] = time.time()
if asset_key in self._assets:
self._assets[asset_key]["last_run_id"] = None

def get_materialization_count_by_partition(
self, asset_keys: Sequence[AssetKey]
Expand Down

0 comments on commit f23a00b

Please sign in to comment.