Skip to content

Commit

Permalink
Resolve multi-asset deps when they have the same group (#10222)
Browse files Browse the repository at this point in the history
  • Loading branch information
peay committed Oct 28, 2022
1 parent ffa01e1 commit a5f0fa9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,9 @@ def resolve_assets_def_deps(

result: Dict[int, Mapping[AssetKey, AssetKey]] = {}
for assets_def in assets_defs:
group_name = (
next(iter(assets_def.group_names_by_key.values()))
if len(assets_def.group_names_by_key) == 1
else None
)
# If all keys have the same group name, use that
group_names = set(assets_def.group_names_by_key.values())
group_name = next(iter(group_names)) if len(group_names) == 1 else None

resolved_keys_by_unresolved_key: Dict[AssetKey, AssetKey] = {}
for input_name, upstream_key in assets_def.keys_by_input_name.items():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import AssetIn, asset
from dagster import AssetIn, AssetKey, AssetOut, asset, multi_asset
from dagster._core.definitions.resolved_asset_deps import resolve_assets_def_deps


Expand All @@ -16,3 +16,19 @@ def asset3(apple):
del apple

assert len(resolve_assets_def_deps([asset1, asset2, asset3], [])) == 0


def test_multi_asset_group_name():
@asset(group_name="somegroup", key_prefix=["some", "path"])
def upstream():
pass

@multi_asset(group_name="somegroup", outs={"a": AssetOut(), "b": AssetOut()})
def multi_downstream(upstream): # pylint: disable=unused-argument
pass

resolved = resolve_assets_def_deps([upstream, multi_downstream], [])
assert len(resolved) == 1

resolution = next(iter(resolved.values()))
assert resolution == {AssetKey(["upstream"]): AssetKey(["some", "path", "upstream"])}

0 comments on commit a5f0fa9

Please sign in to comment.