Skip to content

Commit

Permalink
Include partition in asset materialization planned event (#12333)
Browse files Browse the repository at this point in the history
Record which partition of an asset a run plans to materialize, so we can
use it to say a materialization attempt failed.

Leaving out asset partition ranges for now. These will require resolving
the range to actual partitions inside a host process.
  • Loading branch information
johannkm committed Feb 27, 2023
1 parent 3a5ff3e commit 7d2b6be
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 10 deletions.
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/definitions/mode.py
Expand Up @@ -48,7 +48,7 @@ class ModeDefinition(
executors (:py:data:`~dagster.default_executors`).
description (Optional[str]): A human-readable description of the mode.
_config_mapping (Optional[ConfigMapping]): Only for internal use.
_partitions (Optional[PartitionedConfig]): Only for internal use.
_partitioned_config (Optional[PartitionedConfig]): Only for internal use.
"""

def __new__(
Expand Down
13 changes: 10 additions & 3 deletions python_modules/dagster/dagster/_core/events/__init__.py
Expand Up @@ -652,6 +652,8 @@ def partition(self) -> Optional[str]:
return self.step_materialization_data.materialization.partition
elif self.event_type == DagsterEventType.ASSET_OBSERVATION:
return self.asset_observation_data.asset_observation.partition
elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED:
return self.asset_materialization_planned_data.partition
else:
return None

Expand Down Expand Up @@ -1451,11 +1453,16 @@ def __new__(

@whitelist_for_serdes
class AssetMaterializationPlannedData(
NamedTuple("_AssetMaterializationPlannedData", [("asset_key", AssetKey)])
NamedTuple(
"_AssetMaterializationPlannedData",
[("asset_key", AssetKey), ("partition", Optional[str])],
)
):
def __new__(cls, asset_key: AssetKey):
def __new__(cls, asset_key: AssetKey, partition: Optional[str] = None):
return super(AssetMaterializationPlannedData, cls).__new__(
cls, asset_key=check.inst_param(asset_key, "asset_key", AssetKey)
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
partition=check.opt_str_param(partition, "partition"),
)


Expand Down
Expand Up @@ -79,6 +79,7 @@ def create_step_outputs(
is_asset=asset_info is not None,
should_materialize=output_def.name in config_output_names,
asset_key=asset_info.key if asset_info and asset_info.is_required else None,
is_asset_partitioned=bool(asset_info.partitions_def) if asset_info else False,
),
)
)
Expand Down
Expand Up @@ -24,6 +24,7 @@ class StepOutputProperties(
("is_asset", bool),
("should_materialize", bool),
("asset_key", Optional[AssetKey]),
("is_asset_partitioned", bool),
],
)
):
Expand All @@ -34,6 +35,7 @@ def __new__(
is_asset: bool,
should_materialize: bool,
asset_key: Optional[AssetKey] = None,
is_asset_partitioned: bool = False,
):
return super(StepOutputProperties, cls).__new__(
cls,
Expand All @@ -42,6 +44,7 @@ def __new__(
check.bool_param(is_asset, "is_asset"),
check.bool_param(should_materialize, "should_materialize"),
check.opt_inst_param(asset_key, "asset_key", AssetKey),
check.bool_param(is_asset_partitioned, "is_asset_partitioned"),
)


Expand Down
45 changes: 39 additions & 6 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Expand Up @@ -61,7 +61,14 @@
RunsFilter,
TagBucket,
)
from dagster._core.storage.tags import PARENT_RUN_ID_TAG, RESUME_RETRY_TAG, ROOT_RUN_ID_TAG
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
RESUME_RETRY_TAG,
ROOT_RUN_ID_TAG,
)
from dagster._core.utils import str_format_list
from dagster._serdes import ConfigurableClass
from dagster._seven import get_current_datetime_in_utc
Expand Down Expand Up @@ -1133,31 +1140,57 @@ def _ensure_persisted_execution_plan_snapshot(

return execution_plan_snapshot_id

def _log_asset_materialization_planned_events(self, pipeline_run, execution_plan_snapshot):
def _log_asset_materialization_planned_events(
self, run: DagsterRun, execution_plan_snapshot: ExecutionPlanSnapshot
):
from dagster._core.events import (
AssetMaterializationPlannedData,
DagsterEvent,
DagsterEventType,
)

pipeline_name = pipeline_run.pipeline_name
pipeline_name = run.pipeline_name

for step in execution_plan_snapshot.steps:
if step.key in execution_plan_snapshot.step_keys_to_execute:
for output in step.outputs:
asset_key = output.properties.asset_key
asset_key = check.not_none(output.properties).asset_key
if asset_key:
# Logs and stores asset_materialization_planned event
partition_tag = run.tags.get(PARTITION_NAME_TAG)
partition_range_start, partition_range_end = run.tags.get(
ASSET_PARTITION_RANGE_START_TAG
), run.tags.get(ASSET_PARTITION_RANGE_END_TAG)

check.invariant(
not (partition_tag and partition_range_start),
"Cannot have both a partition and a partition range",
)

if partition_range_start:
check.invariant(
partition_range_end, "Partition range start set but not end"
)
# TODO: resolve which partitions are in the range, and emit an event for each

partition = (
partition_tag
if check.not_none(output.properties).is_asset_partitioned
else None
)

event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
pipeline_name=pipeline_name,
message=(
f"{pipeline_name} intends to materialize asset"
f" {asset_key.to_string()}"
),
event_specific_data=AssetMaterializationPlannedData(asset_key),
event_specific_data=AssetMaterializationPlannedData(
asset_key, partition=partition
),
)
self.report_dagster_event(event, pipeline_run.run_id, logging.DEBUG)
self.report_dagster_event(event, run.run_id, logging.DEBUG)

def create_run(
self,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -6,9 +6,11 @@
Output,
asset,
job,
materialize_to_memory,
multi_asset,
op,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.test_utils import instance_for_test
from dagster._legacy import build_assets_job

Expand Down Expand Up @@ -87,3 +89,31 @@ def my_asset():

assert instance.run_ids_for_asset_key(AssetKey("my_asset_name")) == [run_id]
assert instance.run_ids_for_asset_key(AssetKey("my_other_asset")) == [run_id]


def test_asset_partition_materialization_planned_events():
@asset(partitions_def=StaticPartitionsDefinition(["a", "b"]))
def my_asset():
return 0

@asset()
def my_other_asset(my_asset):
pass

with instance_for_test() as instance:
materialize_to_memory([my_asset, my_other_asset], instance=instance, partition_key="b")
[record] = instance.get_event_records(
EventRecordsFilter(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
AssetKey("my_asset"),
)
)
assert record.event_log_entry.dagster_event.event_specific_data.partition == "b"

[record] = instance.get_event_records(
EventRecordsFilter(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
AssetKey("my_other_asset"),
)
)
assert record.event_log_entry.dagster_event.event_specific_data.partition is None

0 comments on commit 7d2b6be

Please sign in to comment.