Skip to content

Commit

Permalink
Update last_materialization_timestamp to also update on observation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Mar 15, 2022
1 parent 07e3403 commit 6bbfce2
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 22 deletions.
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/core/storage/event_log/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
db.Column("migration_completed", db.DateTime),
)

# The AssetKeyTable contains a `last_materialization_timestamp` column that is exclusively
# used to determine if an asset exists (last materialization timestamp > wipe timestamp).
# This column is used nowhere else, and as of AssetObservation creation, we want to extend
# this functionality to ensure that assets with observation OR materialization timestamp
# > wipe timestamp display in Dagit.

# As of the following PR, we update last_materialization_timestamp to store the timestamp
# of the latest asset observation or materialization that has occurred.
# https://github.com/dagster-io/dagster/pull/6885
AssetKeyTable = db.Table(
"asset_keys",
SqlEventLogStorageMetadata,
Expand All @@ -35,6 +44,7 @@
db.Column("last_run_id", db.String(255)),
db.Column("asset_details", db.Text),
db.Column("wipe_timestamp", db.types.TIMESTAMP), # guarded by secondary index check
# last_materialization_timestamp contains timestamp for latest materialization or observation
db.Column(
"last_materialization_timestamp", db.types.TIMESTAMP
), # guarded by secondary index check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,39 @@ def store_asset(self, event):
if not event.is_dagster_event or not event.dagster_event.asset_key:
return

materialization = event.dagster_event.step_materialization_data.materialization
# The AssetKeyTable contains a `last_materialization_timestamp` column that is exclusively
# used to determine if an asset exists (last materialization timestamp > wipe timestamp).
# This column is used nowhere else, and as of AssetObservation creation, we want to extend
# this functionality to ensure that assets with observation OR materialization timestamp
# > wipe timestamp display in Dagit.

# As of the following PR, we update last_materialization_timestamp to store the timestamp
# of the latest asset observation or materialization that has occurred.
# https://github.com/dagster-io/dagster/pull/6885
if event.dagster_event.is_asset_observation:
self.store_asset_observation(event)
elif event.dagster_event.is_step_materialization:
self.store_asset_materialization(event)

def store_asset_observation(self, event):
# last_materialization_timestamp is updated upon observation or materialization
# See store_asset method above for more details
if self.has_asset_key_index_cols():
insert_statement = AssetKeyTable.insert().values(
asset_key=event.dagster_event.asset_key.to_string(),
last_materialization_timestamp=utc_datetime_from_timestamp(event.timestamp),
)
update_statement = AssetKeyTable.update().values(
last_materialization_timestamp=utc_datetime_from_timestamp(event.timestamp),
)

with self.index_connection() as conn:
try:
conn.execute(insert_statement)
except db.exc.IntegrityError:
conn.execute(update_statement)

def store_asset_materialization(self, event):
# We switched to storing the entire event record of the last materialization instead of just
# the AssetMaterialization object, so that we have access to metadata like timestamp,
# pipeline, run_id, etc.
Expand All @@ -120,7 +152,11 @@ def store_asset(self, event):
# to `last_materialization_event`, for clarity. For now, we should do some back-compat.
#
# https://github.com/dagster-io/dagster/issues/3945

# last_materialization_timestamp is updated upon observation or materialization
# See store_asset method above for more details
if self.has_asset_key_index_cols():
materialization = event.dagster_event.step_materialization_data.materialization
insert_statement = (
AssetKeyTable.insert().values( # pylint: disable=no-value-for-parameter
asset_key=event.dagster_event.asset_key.to_string(),
Expand Down Expand Up @@ -182,12 +218,12 @@ def store_event(self, event):

if (
event.is_dagster_event
and event.dagster_event.is_step_materialization
and (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_observation
)
and event.dagster_event.asset_key
):
# Currently, only materializations are stored in the asset catalog.
# We will store observations after adding a column migration to
# store latest asset observation timestamp in the asset key table.
self.store_asset(event)

def get_logs_for_run_by_log_id(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ def store_event(self, event):
with self.index_connection() as conn:
conn.execute(insert_event_statement)

if event.dagster_event.is_step_materialization:
# Currently, only materializations are stored in the asset catalog.
# We will store observations after adding a column migration to
# store latest asset observation timestamp in the asset key table.
if (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_observation
):
self.store_asset(event)

def get_event_records(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Output,
OutputDefinition,
RetryRequested,
job,
op,
pipeline,
resource,
Expand Down Expand Up @@ -1961,3 +1962,33 @@ def gen_op():
)

assert len(records) == 1

def test_asset_key_exists_on_observation(self, storage):

key = AssetKey("hello")

@op
def my_op():
yield AssetObservation(key)
yield Output(5)

with instance_for_test() as instance:
if not storage._instance: # pylint: disable=protected-access
storage.register_instance(instance)

events, _ = _synthesize_events(lambda: my_op(), instance=instance)
for event in events:
storage.store_event(event)

assert [key] == storage.all_asset_keys()

if self.can_wipe():
storage.wipe_asset(key)

assert len(storage.all_asset_keys()) == 0

events, _ = _synthesize_events(lambda: my_op(), instance=instance)
for event in events:
storage.store_event(event)

assert [key] == storage.all_asset_keys()
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import sqlalchemy as db

from dagster import check, seven
from dagster.core.events.log import EventLogEntry
from dagster.core.storage.event_log import (
AssetKeyTable,
SqlEventLogStorage,
Expand Down Expand Up @@ -122,11 +121,25 @@ def create_clean_storage(conn_string):
MySQLEventLogStorage.wipe_storage(conn_string)
return MySQLEventLogStorage(conn_string)

def store_asset(self, event):
check.inst_param(event, "event", EventLogEntry)
if not event.is_dagster_event or not event.dagster_event.asset_key:
return
def store_asset_observation(self, event):
# last_materialization_timestamp is updated upon observation or materialization
# See store_asset method in SqlEventLogStorage for more details
if self.has_secondary_index(ASSET_KEY_INDEX_COLS):
with self.index_connection() as conn:
conn.execute(
db.dialects.mysql.insert(AssetKeyTable)
.values(
asset_key=event.dagster_event.asset_key.to_string(),
last_materialization_timestamp=utc_datetime_from_timestamp(event.timestamp),
)
.on_duplicate_key_update(
last_materialization_timestamp=utc_datetime_from_timestamp(event.timestamp),
)
)

def store_asset_materialization(self, event):
# last_materialization_timestamp is updated upon observation or materialization
# See store_asset method in SqlEventLogStorage for more details
materialization = event.dagster_event.step_materialization_data.materialization

if self.has_secondary_index(ASSET_KEY_INDEX_COLS):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,38 @@ def store_event(self, event):

if (
event.is_dagster_event
and event.dagster_event.is_step_materialization
and (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_observation
)
and event.dagster_event.asset_key
):
# Currently, only materializations are stored in the asset catalog.
# We will store observations after adding a column migration to
# store latest asset observation timestamp in the asset key table.
self.store_asset(event)

def store_asset(self, event):
check.inst_param(event, "event", EventLogEntry)
if not event.is_dagster_event or not event.dagster_event.asset_key:
return
def store_asset_observation(self, event):
# last_materialization_timestamp is updated upon observation or materialization
# See store_asset method in SqlEventLogStorage for more details
if self.has_secondary_index(ASSET_KEY_INDEX_COLS):
with self.index_connection() as conn:
conn.execute(
db.dialects.postgresql.insert(AssetKeyTable)
.values(
asset_key=event.dagster_event.asset_key.to_string(),
last_materialization_timestamp=utc_datetime_from_timestamp(event.timestamp),
)
.on_conflict_do_update(
index_elements=[AssetKeyTable.c.asset_key],
set_=dict(
last_materialization_timestamp=utc_datetime_from_timestamp(
event.timestamp
),
),
)
)

def store_asset_materialization(self, event):
# last_materialization_timestamp is updated upon observation or materialization
# See store_asset method in SqlEventLogStorage for more details
materialization = event.dagster_event.step_materialization_data.materialization
if self.has_secondary_index(ASSET_KEY_INDEX_COLS):
with self.index_connection() as conn:
Expand Down

0 comments on commit 6bbfce2

Please sign in to comment.