Skip to content

Commit

Permalink
uniform prefix parsing (#8332)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jun 10, 2022
1 parent ab62a16 commit 5ba7a9f
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 56 deletions.
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import dagster._check as check
from dagster.core.definitions.dependency import NodeHandle
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKeyPrefix
from dagster.core.definitions.executor_definition import in_process_executor
from dagster.core.errors import DagsterUnmetExecutorRequirementsError
from dagster.core.execution.execute_in_process_result import ExecuteInProcessResult
Expand Down Expand Up @@ -418,7 +418,7 @@ def get_base_jobs(self) -> Sequence[JobDefinition]:

return jobs

def prefixed(self, key_prefix: str):
def prefixed(self, key_prefix: CoercibleToAssetKeyPrefix):
"""
Returns an AssetGroup that's identical to this AssetGroup, but with prefixes on all the
asset keys. The prefix is not added to source assets.
Expand Down
17 changes: 7 additions & 10 deletions python_modules/dagster/dagster/core/asset_defs/asset_in.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Union
from typing import Any, Mapping, NamedTuple, Optional, Sequence

import dagster._check as check
from dagster.core.definitions.events import ASSET_KEY_DELIMITER, AssetKey, CoerceableToAssetKey
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix
from dagster.utils.backcompat import canonicalize_backcompat_args


Expand All @@ -17,28 +17,25 @@ class AssetIn(
):
def __new__(
cls,
asset_key: Optional[CoerceableToAssetKey] = None,
asset_key: Optional[CoercibleToAssetKey] = None,
metadata: Optional[Mapping[str, Any]] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
):
key_prefix = canonicalize_backcompat_args(
key_prefix, "key_prefix", namespace, "namespace", "0.16.0"
)
if isinstance(key_prefix, str):
key_prefix = [key_prefix]

check.invariant(
not (asset_key and key_prefix),
("Asset key and key_prefix cannot both be set on AssetIn"),
)

# if user inputs a single string, split on delimiter
key_prefix = (
key_prefix.split(ASSET_KEY_DELIMITER) if isinstance(key_prefix, str) else key_prefix
)

return super(AssetIn, cls).__new__(
cls,
asset_key=AssetKey.from_coerceable(asset_key) if asset_key is not None else None,
metadata=check.opt_inst_param(metadata, "metadata", Mapping),
key_prefix=check.opt_list_param(key_prefix, "key_prefix", str),
key_prefix=check.opt_list_param(key_prefix, "key_prefix", of_type=str),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import dagster._check as check
from dagster.core.asset_defs.assets import AssetsDefinition
from dagster.core.definitions.events import AssetKey, CoerceableToAssetKey
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster.core.selector.subset_selector import (
fetch_connected,
generate_asset_dep_graph,
Expand All @@ -25,7 +25,7 @@ def assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection":
)

@staticmethod
def keys(*asset_keys: CoerceableToAssetKey) -> "KeysAssetSelection":
def keys(*asset_keys: CoercibleToAssetKey) -> "KeysAssetSelection":
_asset_keys = [AssetKey.from_coerceable(key) for key in asset_keys]
return KeysAssetSelection(*_asset_keys)

Expand Down
25 changes: 11 additions & 14 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dagster.config.config_schema import ConfigSchemaType
from dagster.core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster.core.definitions.decorators.op_decorator import _Op
from dagster.core.definitions.events import ASSET_KEY_DELIMITER, AssetKey
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKeyPrefix
from dagster.core.definitions.input import In
from dagster.core.definitions.output import Out
from dagster.core.definitions.partition import PartitionsDefinition
Expand Down Expand Up @@ -48,7 +48,7 @@ def asset(
def asset(
name: Optional[str] = ...,
namespace: Optional[Sequence[str]] = ...,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = ...,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
Expand All @@ -71,7 +71,7 @@ def asset(
def asset(
name: Optional[Union[Callable[..., Any], Optional[str]]] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
metadata: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -104,9 +104,8 @@ def asset(
decorated function.
namespace (Optional[Sequence[str]]): **Deprecated (use `key_prefix`)**. The namespace that
the asset resides in. The namespace + the name forms the asset key.
key_prefix (Optional[Union[str, Sequence[str]]]): Optional prefix to apply to the asset key. If `Sequence[str]`,
elements are prepended to function name to form the asset key. If `str`, will be split on "{asset_key_delimiter}"
and then prepended. If `None` asset key is simply the name of the function. name forms the asset key.
key_prefix (Optional[Union[str, Sequence[str]]]): Optional prefix to apply to the asset key.
If `None`, asset key is simply the name of the function.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to their metadata
and namespaces.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are
Expand Down Expand Up @@ -147,9 +146,7 @@ def asset(
@asset
def my_asset(my_upstream_asset: int) -> int:
return my_upstream_asset + 1
""".format(
asset_key_delimiter=ASSET_KEY_DELIMITER
)
"""
if callable(name):
return _Asset()(name)

Expand Down Expand Up @@ -188,7 +185,7 @@ class _Asset:
def __init__(
self,
name: Optional[str] = None,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Set[AssetKey]] = None,
metadata: Optional[Mapping[str, Any]] = None,
Expand All @@ -205,10 +202,10 @@ def __init__(
group_name: Optional[str] = None,
):
self.name = name
# if user inputs a single string, coerce to list
self.key_prefix = (
key_prefix.split(ASSET_KEY_DELIMITER) if isinstance(key_prefix, str) else key_prefix
)

if isinstance(key_prefix, str):
key_prefix = [key_prefix]
self.key_prefix = key_prefix
self.ins = ins or {}
self.non_argument_deps = non_argument_deps
self.metadata = metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Dict, Generator, Iterable, List, Optional, Sequence, Set, Tuple, Union

import dagster._check as check
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKeyPrefix

from ..errors import DagsterInvalidDefinitionError
from .assets import AssetsDefinition
Expand Down Expand Up @@ -73,7 +73,7 @@ def assets_and_source_assets_from_modules(
def load_assets_from_modules(
modules: Iterable[ModuleType],
group_name: Optional[str] = None,
key_prefix: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets from the given modules.
Expand All @@ -83,14 +83,14 @@ def load_assets_from_modules(
group_name (Optional[str]):
Group name to apply to the loaded assets. The returned assets will be copies of the
loaded objects, with the group name added
key_prefix (Optional[str]): Asset key prefix for assets within the modules.
key_prefix (Optional[Union[str, List[str]]]): Asset key prefix for assets within the modules.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
A list containing assets and source assets defined in the given modules.
"""
group_name = check.opt_str_param(group_name, "group_name")
key_prefix = check.opt_str_param(key_prefix, "key_prefix")
key_prefix = check.opt_inst_param(key_prefix, "key_prefix", (str, list))

assets, source_assets = assets_and_source_assets_from_modules(modules)
if key_prefix:
Expand All @@ -109,7 +109,7 @@ def load_assets_from_modules(

def load_assets_from_current_module(
group_name: Optional[str] = None,
key_prefix: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets from the module where this function is called.
Expand All @@ -118,7 +118,7 @@ def load_assets_from_current_module(
group_name (Optional[str]):
Group name to apply to the loaded assets. The returned assets will be copies of the
loaded objects, with the group name added
key_prefix (Optional[str]): Asset key prefix for assets within the module.
key_prefix (Optional[Union[str, List[str]]]): Asset key prefix for assets within the module.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
Expand Down Expand Up @@ -160,7 +160,7 @@ def assets_and_source_assets_from_package_module(
def load_assets_from_package_module(
package_module: ModuleType,
group_name: Optional[str] = None,
key_prefix: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets that includes all asset
Expand All @@ -173,14 +173,14 @@ def load_assets_from_package_module(
group_name (Optional[str]):
Group name to apply to the loaded assets. The returned assets will be copies of the
loaded objects, with the group name added
key_prefix (Optional[str]): Asset key prefix for assets within the modules.
key_prefix (Optional[Union[str, List[str]]]): Asset key prefix for assets within the modules.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
A list containing assets and source assets defined in the module.
"""
group_name = check.opt_str_param(group_name, "group_name")
key_prefix = check.opt_str_param(key_prefix, "key_prefix")
key_prefix = check.opt_inst_param(key_prefix, "key_prefix", (str, list))

assets, source_assets = assets_and_source_assets_from_package_module(package_module)
if key_prefix:
Expand All @@ -200,7 +200,7 @@ def load_assets_from_package_module(
def load_assets_from_package_name(
package_name: str,
group_name: Optional[str] = None,
key_prefix: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets that include all asset
Expand All @@ -211,7 +211,7 @@ def load_assets_from_package_name(
group_name (Optional[str]):
Group name to apply to the loaded assets. The returned assets will be copies of the
loaded objects, with the group name added
key_prefix (Optional[str]): Asset key prefix for assets within the modules.
key_prefix (Optional[Union[str, List[str]]]): Asset key prefix for assets within the modules.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
Expand Down Expand Up @@ -242,7 +242,7 @@ def _find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]


def prefix_assets(
assets_defs: Sequence[AssetsDefinition], key_prefix: str
assets_defs: Sequence[AssetsDefinition], key_prefix: CoercibleToAssetKeyPrefix
) -> List[AssetsDefinition]:
"""
Given a list of assets, prefix the input and output asset keys with key_prefix.
Expand Down Expand Up @@ -282,17 +282,20 @@ def asset2(asset1):
"""
asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.asset_keys}

if isinstance(key_prefix, str):
key_prefix = [key_prefix]
key_prefix = check.is_list(key_prefix, of_type=str)

result_assets: List[AssetsDefinition] = []
for assets_def in assets_defs:
output_asset_key_replacements = {
asset_key: AssetKey([key_prefix] + asset_key.path)
for asset_key in assets_def.asset_keys
asset_key: AssetKey(key_prefix + asset_key.path) for asset_key in assets_def.asset_keys
}
input_asset_key_replacements = {}
for dep_asset_key in assets_def.dependency_asset_keys:
if dep_asset_key in asset_keys:
input_asset_key_replacements[dep_asset_key] = AssetKey(
(key_prefix, *dep_asset_key.path)
key_prefix + dep_asset_key.path
)

result_assets.append(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Dict, Mapping, NamedTuple, Optional, Sequence, Union, cast

import dagster._check as check
from dagster.core.definitions.events import AssetKey, CoerceableToAssetKey
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster.core.definitions.metadata import (
MetadataEntry,
MetadataMapping,
Expand Down Expand Up @@ -49,7 +49,7 @@ class SourceAsset(

def __new__(
cls,
key: CoerceableToAssetKey,
key: CoercibleToAssetKey,
metadata: Optional[MetadataUserInput] = None,
io_manager_key: Optional[str] = None,
io_manager_def: Optional[IOManagerDefinition] = None,
Expand Down
7 changes: 4 additions & 3 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def from_graphql_input(asset_key: Mapping[str, List[str]]) -> Optional["AssetKey
return None

@staticmethod
def from_coerceable(arg: "CoerceableToAssetKey") -> "AssetKey":
def from_coerceable(arg: "CoercibleToAssetKey") -> "AssetKey":
if isinstance(arg, AssetKey):
return check.inst_param(arg, "arg", AssetKey)
elif isinstance(arg, str):
Expand All @@ -164,7 +164,8 @@ def from_coerceable(arg: "CoerceableToAssetKey") -> "AssetKey":
return AssetKey(arg)


CoerceableToAssetKey = Union[AssetKey, str, Sequence[str]]
CoercibleToAssetKey = Union[AssetKey, str, Sequence[str]]
CoercibleToAssetKeyPrefix = Union[str, Sequence[str]]


DynamicAssetKey = Callable[["OutputContext"], Optional[AssetKey]]
Expand Down Expand Up @@ -401,7 +402,7 @@ class AssetMaterialization(

def __new__(
cls,
asset_key: CoerceableToAssetKey,
asset_key: CoercibleToAssetKey,
description: Optional[str] = None,
metadata_entries: Optional[Sequence[Union[MetadataEntry, PartitionMetadataEntry]]] = None,
partition: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,15 @@ def test_respect_existing_groups():
load_assets_from_current_module(group_name="yay")


def test_prefix():
@pytest.mark.parametrize(
"prefix",
[
"my_cool_prefix",
["foo", "my_cool_prefix"],
["foo", "bar", "baz", "my_cool_prefix"],
],
)
def test_prefix(prefix):
from . import asset_package
from .asset_package import module_with_assets

Expand All @@ -136,12 +144,13 @@ def check_asset_prefix(assets):
if isinstance(asset, AssetsDefinition):
asset_keys = asset.asset_keys
for asset_key in asset_keys:
assert asset_key.path[0] == "my_cool_prefix"
observed_prefix = asset_key.path[:-1]
if len(observed_prefix) == 1:
observed_prefix = observed_prefix[0]
assert observed_prefix == prefix

assets = load_assets_from_modules(
[asset_package, module_with_assets], key_prefix="my_cool_prefix"
)
assets = load_assets_from_modules([asset_package, module_with_assets], key_prefix=prefix)
check_asset_prefix(assets)

assets = load_assets_from_package_module(asset_package, key_prefix="my_cool_prefix")
assets = load_assets_from_package_module(asset_package, key_prefix=prefix)
check_asset_prefix(assets)
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def multi_component_list_asset():
AssetKey(["one", "two", "three", "multi_component_list_asset"])
}

@asset(key_prefix="one/two/three")
@asset(key_prefix=["one", "two", "three"])
def multi_component_str_asset():
pass

Expand Down Expand Up @@ -285,7 +285,7 @@ def my_asset(arg1):


def test_input_key_prefix_str():
@asset(ins={"arg1": AssetIn(key_prefix="abc/xyz")})
@asset(ins={"arg1": AssetIn(key_prefix=["abc", "xyz"])})
def my_asset(arg1):
assert arg1

Expand Down

0 comments on commit 5ba7a9f

Please sign in to comment.