Skip to content

Commit

Permalink
allow flexible types for SourceAsset key arg (#7633)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 28, 2022
1 parent cc07c17 commit a83f387
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import NamedTuple, Optional, Sequence, Union

import dagster.check as check
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.events import AssetKey, CoerceableToAssetKey
from dagster.core.definitions.metadata import (
MetadataEntry,
MetadataMapping,
Expand All @@ -28,7 +28,7 @@ class SourceAsset(
that it's referenced from.
Attributes:
key (AssetKey): The key of the asset.
key (Union[AssetKey, Sequence[str], str]): The key of the asset.
metadata_entries (List[MetadataEntry]): Metadata associated with the asset.
io_manager_key (str): The key for the IOManager that will be used to load the contents of
the asset when it's used as an input to other assets inside a job.
Expand All @@ -39,7 +39,7 @@ class SourceAsset(

def __new__(
cls,
key: AssetKey,
key: CoerceableToAssetKey,
metadata: Optional[MetadataUserInput] = None,
io_manager_key: str = "io_manager",
description: Optional[str] = None,
Expand All @@ -48,10 +48,9 @@ def __new__(

metadata = check.opt_dict_param(metadata, "metadata", key_type=str)
metadata_entries = normalize_metadata(metadata, [], allow_invalid=True)

return super().__new__(
cls,
key=check.inst_param(key, "key", AssetKey),
key=AssetKey.from_coerceable(key),
metadata_entries=metadata_entries,
io_manager_key=check.str_param(io_manager_key, "io_manager_key"),
description=check.opt_str_param(description, "description"),
Expand Down
18 changes: 17 additions & 1 deletion python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ def from_graphql_input(asset_key: Mapping[str, List[str]]) -> Optional["AssetKey
return AssetKey(asset_key["path"])
return None

@staticmethod
def from_coerceable(arg: "CoerceableToAssetKey") -> "AssetKey":
if isinstance(arg, AssetKey):
return check.inst_param(arg, "arg", AssetKey)
elif isinstance(arg, str):
return AssetKey([arg])
elif isinstance(arg, list):
check.list_param(arg, "arg", of_type=str)
return AssetKey(arg)
else:
check.tuple_param(arg, "arg", of_type=str)
return AssetKey(arg)


CoerceableToAssetKey = Union[AssetKey, str, Sequence[str]]


DynamicAssetKey = Callable[["OutputContext"], Optional[AssetKey]]

Expand Down Expand Up @@ -381,7 +397,7 @@ class AssetMaterialization(

def __new__(
cls,
asset_key: Union[List[str], AssetKey, str],
asset_key: CoerceableToAssetKey,
description: Optional[str] = None,
metadata_entries: Optional[List[Union[MetadataEntry, PartitionMetadataEntry]]] = None,
partition: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ def test_source_asset_metadata():
"foo": MetadataValue.text("bar"),
"baz": MetadataValue.text("[object] (unserializable)"),
}


def test_source_asset_key_args():
assert SourceAsset(key="foo").key == AssetKey(["foo"])
assert SourceAsset(key=["bar", "foo"]).key == AssetKey(["bar", "foo"])

0 comments on commit a83f387

Please sign in to comment.