Skip to content

Commit

Permalink
remove experimental materialization tags (#6650)
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jun 10, 2022
1 parent a8bc2a2 commit 6ec3af6
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def solid_partitioned_asset(_):

@solid
def tag_asset_solid(_):
yield AssetMaterialization(asset_key="a", tags={"foo": "FOO"})
yield AssetMaterialization(asset_key="a")
yield Output(1)


Expand Down
16 changes: 0 additions & 16 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import dagster._check as check
import dagster.seven as seven
from dagster.serdes import DefaultNamedTupleSerializer, whitelist_for_serdes
from dagster.utils.backcompat import experimental_class_param_warning

from .metadata import (
MetadataEntry,
Expand Down Expand Up @@ -373,7 +372,6 @@ class AssetMaterialization(
("description", Optional[str]),
("metadata_entries", List[Union[MetadataEntry, PartitionMetadataEntry]]),
("partition", Optional[str]),
("tags", Dict[str, str]),
],
)
):
Expand All @@ -395,9 +393,6 @@ class AssetMaterialization(
metadata_entries (Optional[List[Union[MetadataEntry, PartitionMetadataEntry]]]): Arbitrary metadata about the
materialized value.
partition (Optional[str]): The name of the partition that was materialized.
tags (Optional[Dict[str, str]]): (Experimental) Tag metadata for a given asset
materialization. Used for search and organization of the asset entry in the asset
catalog in Dagit.
metadata (Optional[Dict[str, RawMetadataValue]]):
Arbitrary metadata about the asset. Keys are displayed string labels, and values are
one of the following: string, float, int, JSON-serializable dict, JSON-serializable
Expand All @@ -410,7 +405,6 @@ def __new__(
description: Optional[str] = None,
metadata_entries: Optional[Sequence[Union[MetadataEntry, PartitionMetadataEntry]]] = None,
partition: Optional[str] = None,
tags: Optional[Mapping[str, str]] = None,
metadata: Optional[Mapping[str, RawMetadataValue]] = None,
):
if isinstance(asset_key, AssetKey):
Expand All @@ -424,9 +418,6 @@ def __new__(
check.tuple_param(asset_key, "asset_key", of_type=str)
asset_key = AssetKey(asset_key)

if tags:
experimental_class_param_warning("tags", "AssetMaterialization")

metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
metadata_entries = check.opt_sequence_param(
metadata_entries, "metadata_entries", of_type=(MetadataEntry, PartitionMetadataEntry)
Expand All @@ -438,7 +429,6 @@ def __new__(
description=check.opt_str_param(description, "description"),
metadata_entries=normalize_metadata(metadata, metadata_entries),
partition=check.opt_str_param(partition, "partition"),
tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str),
)

@property
Expand Down Expand Up @@ -485,7 +475,6 @@ class Materialization(
("metadata_entries", List[MetadataEntry]),
("asset_key", AssetKey),
("partition", Optional[str]),
("tags", Dict[str, str]),
],
)
):
Expand All @@ -507,9 +496,6 @@ class Materialization(
asset_key (Optional[Union[str, AssetKey]]): An optional parameter to identify the materialized asset
across runs
partition (Optional[str]): The name of the partition that was materialized.
tags (Optional[Dict[str, str]]): (Experimental) Tag metadata for a given asset
materialization. Used for search and organization of the asset entry in the asset
catalog in Dagit.
"""

def __new__(
Expand All @@ -519,7 +505,6 @@ def __new__(
metadata_entries: Optional[List[MetadataEntry]] = None,
asset_key: Optional[Union[str, AssetKey]] = None,
partition: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
skip_deprecation_warning: Optional[bool] = False,
):
if asset_key and isinstance(asset_key, str):
Expand Down Expand Up @@ -552,7 +537,6 @@ def __new__(
),
asset_key=asset_key,
partition=check.opt_str_param(partition, "partition"),
tags=check.opt_dict_param(tags, "tags"),
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,6 @@ def _gen_fn():
description=materialization.description,
metadata_entries=manager_metadata_entries,
partition=materialization.partition,
tags=materialization.tags,
metadata=None,
)
yield DagsterEvent.asset_materialization(step_context, materialization, input_lineage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ def store_asset_event(self, event):

asset["last_materialization_timestamp"] = utc_datetime_from_timestamp(event.timestamp)
if event.dagster_event.is_step_materialization:
materialization = event.dagster_event.step_materialization_data.materialization
asset["last_materialization"] = event
asset["tags"] = materialization.tags if materialization.tags else None
if (
event.dagster_event.is_step_materialization
or event.dagster_event.is_asset_materialization_planned
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sqlalchemy as db
from tqdm import tqdm

from dagster import AssetKey, seven
from dagster import AssetKey
from dagster.core.events.log import EventLogEntry
from dagster.serdes import deserialize_json_to_dagster_namedtuple
from dagster.utils import utc_datetime_from_timestamp
Expand Down Expand Up @@ -142,14 +142,12 @@ def migrate_asset_keys_index_columns(event_log_storage, print_fn=None):
wipe_timestamp=utc_datetime_from_timestamp(wipe_timestamp)
if wipe_timestamp
else None,
tags=None,
)
.where(
AssetKeyTable.c.asset_key == asset_key.to_string(),
)
)
else:
tags = event.dagster_event.step_materialization_data.materialization.tags
conn.execute(
AssetKeyTable.update()
.values( # pylint: disable=no-value-for-parameter
Expand All @@ -158,7 +156,6 @@ def migrate_asset_keys_index_columns(event_log_storage, print_fn=None):
wipe_timestamp=utc_datetime_from_timestamp(wipe_timestamp)
if wipe_timestamp
else None,
tags=seven.json.dumps(tags) if tags else None,
)
.where(
AssetKeyTable.c.asset_key == asset_key.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,11 @@ def _get_asset_entry_values(self, event, has_asset_key_index_cols):
}
)
if has_asset_key_index_cols:
materialization = event.dagster_event.step_materialization_data.materialization
entry_values.update(
{
"last_materialization_timestamp": utc_datetime_from_timestamp(
event.timestamp
),
"tags": seven.json.dumps(materialization.tags)
if materialization.tags
else None,
}
)
elif event.dagster_event.is_asset_materialization_planned:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,18 @@ def test_backcompat_asset_materializations():

@op
def materialize():
yield AssetMaterialization(AssetKey("c"), tags={"foo": "bar"})
yield AssetMaterialization(AssetKey("c"))
yield Output(None)

@job
def my_job():
materialize()

def _validate_materialization(asset_key, event, expected_tags):
def _validate_materialization(asset_key, event):
assert isinstance(event, EventLogEntry)
assert event.dagster_event
assert event.dagster_event.is_step_materialization
assert event.dagster_event.step_materialization_data.materialization.asset_key == asset_key
assert event.dagster_event.step_materialization_data.materialization.tags == expected_tags

a = AssetKey("a")
b = AssetKey("b")
Expand All @@ -110,45 +109,43 @@ def _validate_materialization(asset_key, event, expected_tags):
assert a_mat is None

b_mat = storage.get_latest_materialization_events([b]).get(b)
_validate_materialization(b, b_mat, expected_tags={})
_validate_materialization(b, b_mat)

c_mat = storage.get_latest_materialization_events([c]).get(c)
_validate_materialization(c, c_mat, expected_tags={})
_validate_materialization(c, c_mat)

mat_by_key = storage.get_latest_materialization_events([a, b, c])
assert mat_by_key.get(a) is None
_validate_materialization(b, mat_by_key.get(b), expected_tags={})
_validate_materialization(c, mat_by_key.get(c), expected_tags={})
_validate_materialization(b, mat_by_key.get(b))
_validate_materialization(c, mat_by_key.get(c))

# materialize c with tags
my_job.execute_in_process(instance=instance)

a_mat = storage.get_latest_materialization_events([a]).get(a)
assert a_mat is None

b_mat = storage.get_latest_materialization_events([b]).get(b)
_validate_materialization(b, b_mat, expected_tags={})
_validate_materialization(b, b_mat)

c_mat = storage.get_latest_materialization_events([c]).get(c)
_validate_materialization(c, c_mat, expected_tags={"foo": "bar"})
_validate_materialization(c, c_mat)

mat_by_key = storage.get_latest_materialization_events([a, b, c])
assert mat_by_key.get(a) is None
_validate_materialization(b, mat_by_key.get(b), expected_tags={})
_validate_materialization(c, c_mat, expected_tags={"foo": "bar"})
_validate_materialization(b, mat_by_key.get(b))
_validate_materialization(c, c_mat)


def test_backcompat_get_asset_records():
src_dir = file_relative_path(__file__, "compat_tests/snapshot_0_11_0_asset_materialization")
# should contain materialization events for asset keys a, b, c, d, e, f
# events a and b have been wiped, but b has been rematerialized

def _validate_materialization(asset_key, event, expected_tags):
def _validate_materialization(asset_key, event):
assert isinstance(event, EventLogEntry)
assert event.dagster_event
assert event.dagster_event.is_step_materialization
assert event.dagster_event.step_materialization_data.materialization.asset_key == asset_key
assert event.dagster_event.step_materialization_data.materialization.tags == expected_tags

b = AssetKey("b")

Expand All @@ -159,7 +156,7 @@ def _validate_materialization(asset_key, event, expected_tags):
records = storage.get_asset_records([b])
asset_entry = records[0].asset_entry
assert asset_entry.asset_key == b
_validate_materialization(b, asset_entry.last_materialization, expected_tags={})
_validate_materialization(b, asset_entry.last_materialization)


def test_asset_lazy_migration():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,22 +1449,22 @@ def a_pipe():
def test_latest_materializations(self, storage, instance):
@solid
def one(_):
yield AssetMaterialization(AssetKey("a"), tags={"num": str(1)})
yield AssetMaterialization(AssetKey("b"), tags={"num": str(1)})
yield AssetMaterialization(AssetKey("c"), tags={"num": str(1)})
yield AssetMaterialization(AssetKey("d"), tags={"num": str(1)})
yield AssetMaterialization(AssetKey("a"), partition="1")
yield AssetMaterialization(AssetKey("b"), partition="1")
yield AssetMaterialization(AssetKey("c"), partition="1")
yield AssetMaterialization(AssetKey("d"), partition="1")
yield AssetObservation(AssetKey("a"), metadata={"foo": "bar"})
yield Output(1)

@solid
def two(_):
yield AssetMaterialization(AssetKey("b"), tags={"num": str(2)})
yield AssetMaterialization(AssetKey("c"), tags={"num": str(2)})
yield AssetMaterialization(AssetKey("b"), partition="2")
yield AssetMaterialization(AssetKey("c"), partition="2")
yield Output(2)

def _event_tags(event):
def _event_partition(event):
assert event.dagster_event_type == DagsterEventType.ASSET_MATERIALIZATION
return event.dagster_event.step_materialization_data.materialization.tags
return event.dagster_event.step_materialization_data.materialization.partition

def _fetch_events(storage):
return storage.get_latest_materialization_events(
Expand All @@ -1487,10 +1487,6 @@ def _fetch_events(storage):

events_by_key = _fetch_events(storage)
assert len(events_by_key) == 4
assert _event_tags(events_by_key[AssetKey("a")])["num"] == "1"
assert _event_tags(events_by_key[AssetKey("b")])["num"] == "1"
assert _event_tags(events_by_key[AssetKey("c")])["num"] == "1"
assert _event_tags(events_by_key[AssetKey("d")])["num"] == "1"

# wipe 2 of the assets, make sure we respect that
if self.can_wipe():
Expand All @@ -1499,8 +1495,6 @@ def _fetch_events(storage):
events_by_key = _fetch_events(storage)
assert events_by_key.get(AssetKey("a")) is None
assert events_by_key.get(AssetKey("b")) is None
assert _event_tags(events_by_key[AssetKey("c")])["num"] == "1"
assert _event_tags(events_by_key[AssetKey("d")])["num"] == "1"

# rematerialize one of the wiped assets, one of the existing assets
events, _ = _synthesize_events(lambda: two(), run_id=run_id_2)
Expand All @@ -1509,19 +1503,12 @@ def _fetch_events(storage):

events_by_key = _fetch_events(storage)
assert events_by_key.get(AssetKey("a")) is None
assert _event_tags(events_by_key[AssetKey("b")])["num"] == "2"
assert _event_tags(events_by_key[AssetKey("c")])["num"] == "2"
assert _event_tags(events_by_key[AssetKey("d")])["num"] == "1"

else:
events, _ = _synthesize_events(lambda: two(), run_id=run_id_2)
for event in events:
storage.store_event(event)
events_by_key = _fetch_events(storage)
assert _event_tags(events_by_key[AssetKey("a")])["num"] == "1"
assert _event_tags(events_by_key[AssetKey("b")])["num"] == "2"
assert _event_tags(events_by_key[AssetKey("c")])["num"] == "2"
assert _event_tags(events_by_key[AssetKey("d")])["num"] == "1"

def test_asset_keys(self, storage, instance):
with instance_for_test() as created_instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,8 @@ def test_0_12_0_extract_asset_index_cols():

@solid
def asset_solid(_):
yield AssetMaterialization(
asset_key=AssetKey(["a"]), partition="partition_1", tags={"foo": "FOO"}
)
yield AssetMaterialization(asset_key=AssetKey(["b"]), tags={"bar": "BAR"})
yield AssetMaterialization(asset_key=AssetKey(["a"]), partition="partition_1")
yield AssetMaterialization(asset_key=AssetKey(["b"]))
yield Output(1)

@pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,7 @@ def test_0_12_0_extract_asset_index_cols(hostname, conn_string):

@solid
def asset_solid(_):
yield AssetMaterialization(
asset_key=AssetKey(["a"]), partition="partition_1", tags={"foo": "FOO"}
)
yield AssetMaterialization(asset_key=AssetKey(["a"]), partition="partition_1")
yield Output(1)

@pipeline
Expand Down

0 comments on commit 6ec3af6

Please sign in to comment.