Skip to content

Commit

Permalink
Handle backcompat asset observation (#7831)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed May 11, 2022
1 parent 77dff37 commit 7207a6e
Show file tree
Hide file tree
Showing 5 changed files with 449 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,27 @@ def store_asset_event(self, event):
# See SqlEventLogStorage.store_asset_event method for more details

values = self._get_asset_entry_values(event, self.has_secondary_index(ASSET_KEY_INDEX_COLS))

with self.index_connection() as conn:
conn.execute(
db.dialects.mysql.insert(AssetKeyTable)
.values(
asset_key=event.dagster_event.asset_key.to_string(),
**values,
)
.on_duplicate_key_update(
**values,
if values:
conn.execute(
db.dialects.mysql.insert(AssetKeyTable)
.values(
asset_key=event.dagster_event.asset_key.to_string(),
**values,
)
.on_duplicate_key_update(
**values,
)
)
)
else:
try:
conn.execute(
db.dialects.mysql.insert(AssetKeyTable).values(
asset_key=event.dagster_event.asset_key.to_string(),
)
)
except db.exc.IntegrityError:
pass

def _connect(self):
return create_mysql_connection(self._engine, __file__, "event log")
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

from sqlalchemy import create_engine

from dagster import AssetKey, AssetObservation, Output, job, op
from dagster.core.instance import DagsterInstance
from dagster.core.storage.event_log.migration import ASSET_KEY_INDEX_COLS
from dagster.utils import file_relative_path


Expand Down Expand Up @@ -76,6 +78,39 @@ def test_instigators_table_backcompat(hostname, conn_string):
assert instance.schedule_storage.has_instigators_table()


def test_asset_observation_backcompat(hostname, conn_string):
_reconstruct_from_file(
hostname,
conn_string,
file_relative_path(__file__, "snapshot_0_11_16_pre_add_asset_key_index_cols.sql"),
)

@op
def asset_op(_):
yield AssetObservation(asset_key=AssetKey(["a"]))
yield Output(1)

@job
def asset_job():
asset_op()

with tempfile.TemporaryDirectory() as tempdir:
with open(
file_relative_path(__file__, "dagster.yaml"), "r", encoding="utf8"
) as template_fd:
with open(os.path.join(tempdir, "dagster.yaml"), "w", encoding="utf8") as target_fd:
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with DagsterInstance.from_config(tempdir) as instance:
storage = instance._event_storage

assert not instance.event_log_storage.has_secondary_index(ASSET_KEY_INDEX_COLS)

asset_job.execute_in_process(instance=instance)
assert storage.has_asset_key(AssetKey(["a"]))


def test_jobs_selector_id_migration(hostname, conn_string):
import sqlalchemy as db

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,18 @@ def store_asset_event(self, event):

values = self._get_asset_entry_values(event, self.has_secondary_index(ASSET_KEY_INDEX_COLS))
with self.index_connection() as conn:
conn.execute(
db.dialects.postgresql.insert(AssetKeyTable)
.values(
asset_key=event.dagster_event.asset_key.to_string(),
**values,
)
.on_conflict_do_update(
query = db.dialects.postgresql.insert(AssetKeyTable).values(
asset_key=event.dagster_event.asset_key.to_string(),
**values,
)
if values:
query = query.on_conflict_do_update(
index_elements=[AssetKeyTable.c.asset_key],
set_=dict(**values),
)
)
else:
query = query.on_conflict_do_nothing()
conn.execute(query)

def _connect(self):
return create_pg_connection(self._engine, pg_alembic_config(__file__), "event log")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
from dagster import (
AssetKey,
AssetMaterialization,
AssetObservation,
Output,
execute_pipeline,
job,
op,
pipeline,
reconstructable,
solid,
)
from dagster.core.errors import DagsterInstanceSchemaOutdated
from dagster.core.instance import DagsterInstance
from dagster.core.storage.event_log.migration import ASSET_KEY_INDEX_COLS
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG
from dagster.utils import file_relative_path
Expand Down Expand Up @@ -342,6 +345,40 @@ def asset_pipeline():
assert not storage.has_asset_key(AssetKey(["a"]))


def test_0_12_0_asset_observation_backcompat(hostname, conn_string):
_reconstruct_from_file(
hostname,
conn_string,
file_relative_path(__file__, "snapshot_0_12_0_pre_asset_index_cols/postgres/pg_dump.txt"),
)

@op
def asset_op(_):
yield AssetObservation(asset_key=AssetKey(["a"]))
yield Output(1)

@job
def asset_job():
asset_op()

with tempfile.TemporaryDirectory() as tempdir:
with open(
file_relative_path(__file__, "dagster.yaml"), "r", encoding="utf8"
) as template_fd:
with open(os.path.join(tempdir, "dagster.yaml"), "w", encoding="utf8") as target_fd:
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with DagsterInstance.from_config(tempdir) as instance:
storage = instance._event_storage

assert not storage.has_secondary_index(ASSET_KEY_INDEX_COLS)

# make sure that executing the pipeline works
asset_job.execute_in_process(instance=instance)
assert storage.has_asset_key(AssetKey(["a"]))


def _reconstruct_from_file(hostname, conn_string, path, username="test", password="test"):
engine = create_engine(conn_string)
engine.execute("drop schema public cascade;")
Expand Down

0 comments on commit 7207a6e

Please sign in to comment.