Skip to content

Commit

Permalink
AssetGroup.prefixed (#7395)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 28, 2022
1 parent febd18f commit ba9d867
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 6 deletions.
85 changes: 85 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __new__(
executor_def: Optional[ExecutorDefinition] = None,
):
check.sequence_param(assets, "assets", of_type=AssetsDefinition)

source_assets = check.opt_sequence_param(
source_assets, "source_assets", of_type=SourceAsset
)
Expand Down Expand Up @@ -558,6 +559,90 @@ def get_base_jobs(self) -> Sequence[JobDefinition]:

return jobs

def prefixed(self, key_prefix: str):
"""
Returns an AssetGroup that's identical to this AssetGroup, but with prefixes on all the
asset keys. The prefix is not added to source assets.
Input asset keys that reference other assets within the group are "brought along" -
i.e. prefixed as well.
Example with a single asset:
.. code-block:: python
@asset
def asset1():
...
result = AssetGroup([asset1]).prefixed("my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
Example with dependencies within the list of assets:
.. code-block:: python
@asset
def asset1():
...
@asset
def asset2(asset1):
...
result = AssetGroup([asset1, asset2]).prefixed("my_prefix")
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[1].dependency_asset_keys == {AssetKey(["my_prefix", "asset1"])}
Examples with input prefixes provided by source assets:
.. code-block:: python
asset1 = SourceAsset(AssetKey(["upstream_prefix", "asset1"]))
@asset
def asset2(asset1):
...
result = AssetGroup([asset2], source_assets=[asset1]).prefixed("my_prefix")
assert len(result.assets) == 1
assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset2"])
assert result.assets[0].dependency_asset_keys == {AssetKey(["upstream_prefix", "asset1"])}
assert result.source_assets[0].key == AssetKey(["upstream_prefix", "asset1"])
"""

asset_keys = {
asset_key for assets_def in self.assets for asset_key in assets_def.asset_keys
}

result_assets: List[AssetsDefinition] = []
for assets_def in self.assets:
output_asset_key_replacements = {
asset_key: AssetKey([key_prefix] + asset_key.path)
for asset_key in assets_def.asset_keys
}
input_asset_key_replacements = {}
for dep_asset_key in assets_def.dependency_asset_keys:
if dep_asset_key in asset_keys:
input_asset_key_replacements[dep_asset_key] = AssetKey(
(key_prefix, *dep_asset_key.path)
)

result_assets.append(
assets_def.with_replaced_asset_keys(
output_asset_key_replacements=output_asset_key_replacements,
input_asset_key_replacements=input_asset_key_replacements,
)
)

return AssetGroup(
assets=result_assets,
source_assets=self.source_assets,
resource_defs={k: r for k, r in self.resource_defs.items() if k != "root_manager"},
executor_def=self.executor_def,
)


def _find_assets_in_module(
module: ModuleType,
Expand Down
41 changes: 40 additions & 1 deletion python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import AbstractSet, Mapping, Optional, cast
import warnings
from typing import AbstractSet, Iterable, Mapping, Optional, cast

from dagster import check
from dagster.core.definitions import NodeDefinition, OpDefinition
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.partition import PartitionsDefinition
from dagster.utils.backcompat import ExperimentalWarning

from .partition_mapping import PartitionMapping

Expand All @@ -17,6 +19,7 @@ def __init__(
partitions_def: Optional[PartitionsDefinition] = None,
partition_mappings: Optional[Mapping[AssetKey, PartitionMapping]] = None,
asset_deps: Optional[Mapping[AssetKey, AbstractSet[AssetKey]]] = None,
# if adding new fields, make sure to handle them in the with_replaced_asset_keys method
):
self._node_def = node_def
self._asset_keys_by_input_name = check.dict_param(
Expand Down Expand Up @@ -68,6 +71,16 @@ def node_def(self) -> NodeDefinition:
def asset_deps(self) -> Mapping[AssetKey, AbstractSet[AssetKey]]:
return self._asset_deps

@property
def asset_key(self) -> AssetKey:
check.invariant(
len(self._asset_keys_by_output_name) == 1,
"Tried to retrieve asset key from an assets definition with multiple asset keys: "
+ ", ".join([str(ak.to_string()) for ak in self._asset_keys_by_output_name.values()]),
)

return next(iter(self._asset_keys_by_output_name.values()))

@property
def asset_keys(self) -> AbstractSet[AssetKey]:
return set(self.asset_keys_by_output_name.values())
Expand All @@ -84,6 +97,10 @@ def asset_keys_by_input_name(self) -> Mapping[str, AssetKey]:
def partitions_def(self) -> Optional[PartitionsDefinition]:
return self._partitions_def

@property
def dependency_asset_keys(self) -> Iterable[AssetKey]:
return self._asset_keys_by_input_name.values()

def get_partition_mapping(self, in_asset_key: AssetKey) -> PartitionMapping:
if self._partitions_def is None:
check.failed("Asset is not partitioned")
Expand All @@ -92,3 +109,25 @@ def get_partition_mapping(self, in_asset_key: AssetKey) -> PartitionMapping:
in_asset_key,
self._partitions_def.get_default_partition_mapping(),
)

def with_replaced_asset_keys(
self,
output_asset_key_replacements: Mapping[AssetKey, AssetKey],
input_asset_key_replacements: Mapping[AssetKey, AssetKey],
) -> "AssetsDefinition":
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

return self.__class__(
asset_keys_by_input_name={
input_name: input_asset_key_replacements.get(key, key)
for input_name, key in self.asset_keys_by_input_name.items()
},
asset_keys_by_output_name={
output_name: output_asset_key_replacements.get(key, key)
for output_name, key in self.asset_keys_by_output_name.items()
},
node_def=self.node_def,
partitions_def=self.partitions_def,
partition_mappings=self._partition_mappings,
)
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def __eq__(self, other):
return self.to_string() == other.to_string()

def to_string(self, legacy: Optional[bool] = False) -> Optional[str]:
"""
E.g. '["first_component", "second_component"]'
"""
if not self.path:
return None
if legacy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,3 +570,76 @@ def unpartitioned_asset():
frozenset(["hourly_asset", "unpartitioned_asset"]),
frozenset(["daily_asset_different_start_date", "unpartitioned_asset"]),
}


def test_assets_prefixed_single_asset():
@asset
def asset1():
...

result = AssetGroup([asset1]).prefixed("my_prefix").assets
assert result[0].asset_key == AssetKey(["my_prefix", "asset1"])


def test_assets_prefixed_internal_dep():
@asset
def asset1():
...

@asset
def asset2(asset1):
del asset1

result = AssetGroup([asset1, asset2]).prefixed("my_prefix").assets
assert result[0].asset_key == AssetKey(["my_prefix", "asset1"])
assert result[1].asset_key == AssetKey(["my_prefix", "asset2"])
assert set(result[1].dependency_asset_keys) == {AssetKey(["my_prefix", "asset1"])}


def test_assets_prefixed_disambiguate():
asset1 = SourceAsset(AssetKey(["core", "apple"]))

@asset(name="apple")
def asset2():
...

@asset(ins={"apple": AssetIn(namespace="core")})
def orange(apple):
del apple

@asset
def banana(apple):
del apple

result = (
AssetGroup([asset2, orange, banana], source_assets=[asset1]).prefixed("my_prefix").assets
)
assert len(result) == 3
assert result[0].asset_key == AssetKey(["my_prefix", "apple"])
assert result[1].asset_key == AssetKey(["my_prefix", "orange"])
assert set(result[1].dependency_asset_keys) == {AssetKey(["core", "apple"])}
assert result[2].asset_key == AssetKey(["my_prefix", "banana"])
assert set(result[2].dependency_asset_keys) == {AssetKey(["my_prefix", "apple"])}


def test_assets_prefixed_source_asset():
asset1 = SourceAsset(key=AssetKey(["upstream_prefix", "asset1"]))

@asset(ins={"asset1": AssetIn(namespace="upstream_prefix")})
def asset2(asset1):
del asset1

result = AssetGroup([asset2], source_assets=[asset1]).prefixed("my_prefix").assets
assert len(result) == 1
assert result[0].asset_key == AssetKey(["my_prefix", "asset2"])
assert set(result[0].dependency_asset_keys) == {AssetKey(["upstream_prefix", "asset1"])}


def test_assets_prefixed_no_matches():
@asset
def orange(apple):
del apple

result = AssetGroup([orange]).prefixed("my_prefix").assets
assert result[0].asset_key == AssetKey(["my_prefix", "orange"])
assert set(result[0].dependency_asset_keys) == {AssetKey("apple")}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@ def check_experimental_warnings():
with warnings.catch_warnings(record=True) as record:
yield

raises_warning = False
for w in record:
if "asset_key" in w.message.args[0]:
raises_warning = True
break

assert not raises_warning
assert False, f"Unexpected warning: {w.message.args[0]}"


def test_asset_no_decorator_args():
Expand Down Expand Up @@ -349,3 +345,31 @@ def my_asset():
...

assert my_asset.op.tags == tags_stringified


def test_with_replaced_asset_keys():
@asset(ins={"input2": AssetIn(namespace="something_else")})
def asset1(input1, input2):
assert input1
assert input2

replaced = asset1.with_replaced_asset_keys(
output_asset_key_replacements={
AssetKey(["asset1"]): AssetKey(["prefix1", "asset1_changed"])
},
input_asset_key_replacements={
AssetKey(["something_else", "input2"]): AssetKey(["apple", "banana"])
},
)

assert set(replaced.dependency_asset_keys) == {
AssetKey("input1"),
AssetKey(["apple", "banana"]),
}
assert replaced.asset_keys == {AssetKey(["prefix1", "asset1_changed"])}

assert replaced.asset_keys_by_input_name["input1"] == AssetKey("input1")

assert replaced.asset_keys_by_input_name["input2"] == AssetKey(["apple", "banana"])

assert replaced.asset_keys_by_output_name["result"] == AssetKey(["prefix1", "asset1_changed"])

0 comments on commit ba9d867

Please sign in to comment.