Skip to content

Commit

Permalink
AssetsDefinition property name changes (#8317)
Browse files Browse the repository at this point in the history
* AssetsDefinition.group_names -> group_names_by_asset_key

* asset_key -> key
  • Loading branch information
sryza committed Jun 13, 2022
1 parent c212e85 commit 8856374
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def _validate_resource_reqs_for_asset_group(
f"Missing resource keys: {missing_resource_keys}"
)

for output_name, asset_key in asset_def.asset_keys_by_output_name.items():
for output_name, asset_key in asset_def.keys_by_output_name.items():
output_def, _ = asset_def.node_def.resolve_output_to_origin(
output_name, NodeHandle(name=asset_def.node_def.name, parent=None)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,6 @@ def _resolve(self, node: AssetSelection) -> AbstractSet[str]:
def _match_groups(assets_def: AssetsDefinition, groups: AbstractSet[str]) -> AbstractSet[str]:
return {
asset_key.to_user_string()
for asset_key, group in assets_def.group_names.items()
for asset_key, group in assets_def.group_names_by_key.items()
if group in groups
}
179 changes: 90 additions & 89 deletions python_modules/dagster/dagster/core/asset_defs/assets.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def build_source_assets_by_key(
if isinstance(asset_source, SourceAsset):
source_assets_by_key[asset_source.key] = asset_source
elif isinstance(asset_source, AssetsDefinition):
for output_name, asset_key in asset_source.asset_keys_by_output_name.items():
for output_name, asset_key in asset_source.keys_by_output_name.items():
if asset_key:
source_assets_by_key[asset_key] = asset_source.node_def.output_def_named(
output_name
Expand Down Expand Up @@ -210,7 +210,7 @@ def build_deps(

# unique handle for each AssetsDefinition
assets_defs_by_node_handle[NodeHandle(node_alias, parent=None)] = assets_def
for output_name, key in assets_def.asset_keys_by_output_name.items():
for output_name, key in assets_def.keys_by_output_name.items():
node_alias_and_output_by_asset_key[key] = (node_alias, output_name)

deps: Dict[Union[str, NodeInvocation], Dict[str, IDependencyDefinition]] = {}
Expand All @@ -224,7 +224,7 @@ def build_deps(
deps[node_key] = {}

# connect each input of this AssetsDefinition to the proper upstream node
for input_name, upstream_asset_key in assets_def.asset_keys_by_input_name.items():
for input_name, upstream_asset_key in assets_def.keys_by_input_name.items():
if upstream_asset_key in node_alias_and_output_by_asset_key:
upstream_node_alias, upstream_output_name = node_alias_and_output_by_asset_key[
upstream_asset_key
Expand Down
22 changes: 11 additions & 11 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,22 +285,22 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
},
)(fn)

asset_keys_by_input_name = {
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
return AssetsDefinition(
asset_keys_by_input_name=asset_keys_by_input_name,
asset_keys_by_output_name={"result": out_asset_key},
keys_by_input_name=keys_by_input_name,
keys_by_output_name={"result": out_asset_key},
node_def=op,
partitions_def=self.partitions_def,
partition_mappings={
asset_keys_by_input_name[input_name]: partition_mapping
keys_by_input_name[input_name]: partition_mapping
for input_name, partition_mapping in self.partition_mappings.items()
}
if self.partition_mappings
else None,
resource_defs=self.resource_defs,
group_names={out_asset_key: self.group_name} if self.group_name else None,
group_names_by_key={out_asset_key: self.group_name} if self.group_name else None,
)


Expand Down Expand Up @@ -423,20 +423,20 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
},
)(fn)

asset_keys_by_input_name = {
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
asset_keys_by_output_name = {
keys_by_output_name = {
output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
}
return AssetsDefinition(
asset_keys_by_input_name=asset_keys_by_input_name,
asset_keys_by_output_name=asset_keys_by_output_name,
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
node_def=op,
asset_deps={asset_keys_by_output_name[name]: asset_deps[name] for name in asset_deps},
asset_deps={keys_by_output_name[name]: asset_deps[name] for name in asset_deps},
partitions_def=partitions_def,
partition_mappings={
asset_keys_by_input_name[input_name]: partition_mapping
keys_by_input_name[input_name]: partition_mapping
for input_name, partition_mapping in partition_mappings.items()
}
if partition_mappings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def load_assets_from_modules(
if group_name:
assets = [
asset.with_prefix_or_group(
group_names={asset_key: group_name for asset_key in asset.keys}
group_names_by_key={asset_key: group_name for asset_key in asset.keys}
)
for asset in assets
]
Expand Down Expand Up @@ -194,7 +194,7 @@ def load_assets_from_package_module(
if group_name:
assets = [
asset.with_prefix_or_group(
group_names={asset_key: group_name for asset_key in asset.keys}
group_names_by_key={asset_key: group_name for asset_key in asset.keys}
)
for asset in assets
]
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _asset_key_to_dep_node_handles(
dep_node_handles_by_node: Dict[
NodeHandle, List[NodeHandle]
] = {} # memoized map of nodehandle to all node handle dependencies that are ops
for output_name, asset_key in assets_defs.node_asset_keys_by_output_name.items():
for output_name, asset_key in assets_defs.node_keys_by_output_name.items():
output_def = assets_defs.node_def.output_def_named(output_name)
output_name = output_def.name

Expand Down Expand Up @@ -501,7 +501,7 @@ def from_graph_and_assets_node_mapping(
for node_handle, assets_def in assets_defs_by_node_handle.items():
asset_deps.update(assets_def.asset_deps)

for input_name, asset_key in assets_def.node_asset_keys_by_input_name.items():
for input_name, asset_key in assets_def.node_keys_by_input_name.items():
asset_key_by_input[NodeInputHandle(node_handle, input_name)] = asset_key
# resolve graph input to list of op inputs that consume it
node_input_handles = _resolve_input_to_destinations(
Expand All @@ -510,7 +510,7 @@ def from_graph_and_assets_node_mapping(
for node_input_handle in node_input_handles:
asset_key_by_input[node_input_handle] = asset_key

for output_name, asset_key in assets_def.node_asset_keys_by_output_name.items():
for output_name, asset_key in assets_def.node_keys_by_output_name.items():
# resolve graph output to the op output it comes from
inner_output_def, inner_node_handle = assets_def.node_def.resolve_output_to_origin(
output_name, handle=node_handle
Expand Down Expand Up @@ -606,9 +606,9 @@ def asset_info_for_output(

def group_names_by_assets(self) -> Mapping[AssetKey, str]:
group_names: Dict[AssetKey, str] = {
key: assets_def.group_names[key]
key: assets_def.group_names_by_key[key]
for key, assets_def in self._assets_defs_by_key.items()
if key in assets_def.group_names
if key in assets_def.group_names_by_key
}

group_names.update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1345,17 +1345,17 @@ def basic():
other_resource = ResourceDefinition.hardcoded_resource("baz")

the_asset = AssetsDefinition(
asset_keys_by_input_name={},
asset_keys_by_output_name={"result": AssetKey("the_asset")},
keys_by_input_name={},
keys_by_output_name={"result": AssetKey("the_asset")},
node_def=basic,
resource_defs={"foo": the_resource},
)
no_conflict_group = AssetGroup([the_asset])
assert no_conflict_group.materialize().success

other_asset = AssetsDefinition(
asset_keys_by_input_name={},
asset_keys_by_output_name={"result": AssetKey("other_asset")},
keys_by_input_name={},
keys_by_output_name={"result": AssetKey("other_asset")},
node_def=basic,
resource_defs={"foo": other_resource},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def asset1(input1, input2):
}
assert replaced.keys == {AssetKey(["prefix1", "asset1_changed"])}

assert replaced.asset_keys_by_input_name["input1"] == AssetKey("input1")
assert replaced.keys_by_input_name["input1"] == AssetKey("input1")

assert replaced.asset_keys_by_input_name["input2"] == AssetKey(["apple", "banana"])
assert replaced.keys_by_input_name["input2"] == AssetKey(["apple", "banana"])

assert replaced.asset_keys_by_output_name["result"] == AssetKey(["prefix1", "asset1_changed"])
assert replaced.keys_by_output_name["result"] == AssetKey(["prefix1", "asset1_changed"])


@pytest.mark.parametrize(
Expand Down Expand Up @@ -74,8 +74,8 @@ def abc_(context, in1, in2, in3): # pylint: disable=unused-argument
{AssetKey(key) for key in expected_keys.split(",")} if expected_keys else set()
)

assert len(subbed.asset_keys_by_input_name) == expected_inputs
assert len(subbed.asset_keys_by_output_name) == expected_outputs
assert len(subbed.keys_by_input_name) == expected_inputs
assert len(subbed.keys_by_output_name) == expected_outputs

# the asset dependency structure should stay the same
assert subbed.asset_deps == abc_.asset_deps
Expand All @@ -89,7 +89,7 @@ def bar():
replaced = bar.with_prefix_or_group(
output_asset_key_replacements={AssetKey(["bar"]): AssetKey(["baz"])}
)
assert replaced.group_names[AssetKey("baz")] == "foo"
assert replaced.group_names_by_key[AssetKey("baz")] == "foo"


def test_chain_replace_and_subset_for():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def check_asset_group(assets):
if isinstance(asset, AssetsDefinition):
asset_keys = asset.keys
for asset_key in asset_keys:
assert asset.group_names.get(asset_key) == "my_cool_group"
assert asset.group_names_by_key.get(asset_key) == "my_cool_group"
elif isinstance(asset, SourceAsset):
assert asset.group_name == "my_cool_group"

Expand All @@ -121,7 +121,7 @@ def check_asset_group(assets):

def test_respect_existing_groups():
assets = load_assets_from_current_module()
assert assets[0].group_names.get(AssetKey("asset_in_current_module")) == "my_group"
assert assets[0].group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group"

with pytest.raises(DagsterInvalidDefinitionError):
load_assets_from_current_module(group_name="yay")
Expand Down

0 comments on commit 8856374

Please sign in to comment.