Skip to content

Commit

Permalink
use ">" for multi-component selection in AssetGroup.build_job (#7661)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 3, 2022
1 parent 2f828aa commit 64c6d85
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
13 changes: 4 additions & 9 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,25 +282,20 @@ def _parse_asset_selection(self, selection: Union[str, List[str]], job_name: str
if asset.op.name not in op_names_to_asset_keys:
op_names_to_asset_keys[asset.op.name] = set()
for asset_key in asset.asset_keys:
asset_key_as_str = ".".join([piece for piece in asset_key.path])
asset_key_as_str = ">".join([piece for piece in asset_key.path])
op_names_to_asset_keys[asset.op.name].add(asset_key_as_str)
if not asset_key_as_str in asset_keys_to_ops:
asset_keys_to_ops[asset_key_as_str] = []
asset_keys_to_ops[asset_key_as_str].append(asset.op)

for asset in self.source_assets:
if isinstance(asset, SourceAsset):
asset_key_as_str = ".".join([piece for piece in asset.key.path])
source_asset_keys.add(asset_key_as_str)
else:
for asset_key in asset.asset_keys:
asset_key_as_str = ".".join([piece for piece in asset_key.path])
source_asset_keys.add(asset_key_as_str)
asset_key_as_str = ">".join([piece for piece in asset.key.path])
source_asset_keys.add(asset_key_as_str)

op_selection = []

for clause in selection:
token_matching = re.compile(r"^(\*?\+*)?([.\w\d\[\]?_-]+)(\+*\*?)?$").search(
token_matching = re.compile(r"^(\*?\+*)?([>.\w\d\[\]?_-]+)(\+*\*?)?$").search(
clause.strip()
)
parts = token_matching.groups() if token_matching is not None else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,22 @@ def follows_o2(o2):
group.build_job(name="test_subselect_only_one_key", selection="o1")


def test_asset_group_build_job_selection_multi_component():
source_asset = SourceAsset(["apple", "banana"])

@asset(namespace="abc")
def asset1():
...

group = AssetGroup([asset1], source_assets=[source_asset])
assert group.build_job(name="something", selection="abc>asset1").asset_layer.asset_keys == {
AssetKey(["abc", "asset1"])
}

with pytest.raises(DagsterInvalidDefinitionError, match="source asset"):
group.build_job(name="something", selection="apple>banana")


def test_asset_group_from_package_name():
from . import asset_package

Expand Down

0 comments on commit 64c6d85

Please sign in to comment.