Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce storage kind tag, helpers to @asset, @multi_asset #22037

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._serdes.serdes import whitelist_for_serdes

from .auto_materialize_policy import AutoMaterializePolicy
Expand Down Expand Up @@ -125,12 +126,18 @@ def __new__(
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
owners: Optional[Sequence[str]] = None,
tags: Optional[Mapping[str, str]] = None,
storage_kind: Optional[str] = None,
):
from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates

key = AssetKey.from_coercible(key)
asset_deps = coerce_to_deps_and_check_duplicates(deps, key)

all_tags = {
**(tags or {}),
**(StorageKindTagSet(storage_kind=storage_kind) if storage_kind else {}),
} or None

return super().__new__(
cls,
key=key,
Expand All @@ -151,5 +158,5 @@ def __new__(
AutoMaterializePolicy,
),
owners=check.opt_sequence_param(owners, "owners", of_type=str),
tags=validate_tags_strict(tags) or {},
tags=validate_tags_strict(all_tags),
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
from dagster._core.definitions.resource_annotation import (
get_resource_args,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterType
from dagster._utils.merger import deep_merge_dicts
from dagster._utils.warnings import (
disable_dagster_warnings,
)
Expand Down Expand Up @@ -94,6 +96,7 @@ def asset(
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
check_specs: Optional[Sequence[AssetCheckSpec]] = ...,
owners: Optional[Sequence[str]] = ...,
storage_kind: Optional[str] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...


Expand Down Expand Up @@ -136,6 +139,7 @@ def asset(
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners: Optional[Sequence[str]] = None,
storage_kind: Optional[str] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.

Expand Down Expand Up @@ -215,6 +219,7 @@ def asset(
owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each
string can be a user's email address, or a team name prefixed with `team:`,
e.g. `team:finops`.
storage_kind (Optional[str]): A storage kind tag for the asset.

Examples:
.. code-block:: python
Expand Down Expand Up @@ -256,6 +261,7 @@ def create_asset():
check_specs=check_specs,
key=key,
owners=owners,
storage_kind=check.opt_str_param(storage_kind, "storage_kind"),
)

if compute_fn is not None:
Expand Down Expand Up @@ -330,6 +336,7 @@ def __init__(
key: Optional[CoercibleToAssetKey] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners: Optional[Sequence[str]] = None,
storage_kind: Optional[str] = None,
):
self.name = name
self.key_prefix = key_prefix
Expand Down Expand Up @@ -359,6 +366,7 @@ def __init__(
self.check_specs = check_specs
self.key = key
self.owners = owners
self.storage_kind = storage_kind

def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition:
from dagster._config.pythonic_config import (
Expand Down Expand Up @@ -494,6 +502,12 @@ def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition:
deps=deps,
)

all_tags = {
**(self.tags or {}),
**(StorageKindTagSet(storage_kind=self.storage_kind) if self.storage_kind else {}),
}
tags_by_key = {out_asset_key: all_tags} if all_tags else None

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name={"result": out_asset_key},
Expand Down Expand Up @@ -535,6 +549,7 @@ def multi_asset(
code_version: Optional[str] = None,
specs: Optional[Sequence[AssetSpec]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
storage_kind: Optional[str] = None,
# deprecated
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
Expand Down Expand Up @@ -592,6 +607,7 @@ def multi_asset(
by this function.
check_specs (Optional[Sequence[AssetCheckSpec]]): (Experimental) Specs for asset checks that
execute in the decorated function after materializing the assets.
storage_kind (Optional[str]): A storage kind tag for the assets materialized by this function.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
Set of asset keys that are upstream dependencies, but do not pass an input to the
multi_asset.
Expand Down Expand Up @@ -864,6 +880,14 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
)
resolved_specs = [spec._replace(group_name=group_name) for spec in resolved_specs]

if storage_kind:
check.invariant(
all(spec.storage_kind is None for spec in resolved_specs),
"Cannot set storage_kind parameter on multi_asset if one or more of the"
" AssetSpecs/AssetOuts supplied to this multi_asset have a storage_kind defined.",
)
resolved_specs = [spec._replace(storage_kind=storage_kind) for spec in resolved_specs]

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
Expand Down Expand Up @@ -893,21 +917,6 @@ def get_function_params_without_context_or_config_or_resources(

resource_arg_names = {arg.name for arg in get_resource_args(fn)}

new_input_args = []
for input_arg in input_params:
if input_arg.name != "config" and input_arg.name not in resource_arg_names:
new_input_args.append(input_arg)

return new_input_args


def stringify_asset_key_to_input_name(asset_key: AssetKey) -> str:
return "_".join(asset_key.path).replace("-", "_")


def build_asset_ins(
fn: Callable[..., Any],
asset_ins: Mapping[str, AssetIn],
deps: Optional[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
"""Creates a mapping from AssetKey to (name of input, In object)."""
Expand Down
103 changes: 103 additions & 0 deletions python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2157,6 +2157,21 @@ def asset2(): ...
}


def test_asset_with_storage_kind() -> None:
@asset(storage_kind="snowflake")
def asset1(): ...

assert asset1.tags_by_key[asset1.key] == {"dagster/storage_kind": "snowflake"}

@asset(storage_kind="snowflake", tags={"a": "b"})
def asset2(): ...

assert asset2.tags_by_key[asset2.key] == {
"dagster/storage_kind": "snowflake",
"a": "b",
}


def test_asset_spec_with_tags():
@multi_asset(specs=[AssetSpec("asset1", tags={"a": "b"})])
def assets(): ...
Expand All @@ -2179,3 +2194,91 @@ def assets(): ...

@multi_asset(outs={"asset1": AssetOut(tags={"a%": "b"})}) # key has illegal character
def assets(): ...


def test_multi_assets_asset_spec_with_storage_kind() -> None:
@multi_asset(specs=[AssetSpec("asset1"), AssetSpec("asset2")], storage_kind="snowflake")
def assets(): ...

assert assets.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "snowflake"}

@multi_asset(
specs=[AssetSpec("asset1", tags={"a": "b"}), AssetSpec("asset2", tags={"c": "d"})],
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {
"dagster/storage_kind": "snowflake",
"a": "b",
}
assert assets2.tags_by_key[AssetKey("asset2")] == {
"dagster/storage_kind": "snowflake",
"c": "d",
}


def test_multi_assets_asset_out_with_storage_kind() -> None:
@multi_asset(outs={"asset1": AssetOut(), "asset2": AssetOut()}, storage_kind="snowflake")
def assets(): ...

assert assets.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "snowflake"}

@multi_asset(
outs={"asset1": AssetOut(tags={"a": "b"}), "asset2": AssetOut(tags={"c": "d"})},
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {
"dagster/storage_kind": "snowflake",
"a": "b",
}
assert assets2.tags_by_key[AssetKey("asset2")] == {
"dagster/storage_kind": "snowflake",
"c": "d",
}


def test_multi_assets_asset_spec_with_storage_kind_override() -> None:
@multi_asset(
specs=[
AssetSpec("asset1"),
AssetSpec("asset2", storage_kind="bigquery"),
],
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets2.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "bigquery"}


def test_multi_assets_asset_spec_with_storage_kind_tag_override() -> None:
@multi_asset(
specs=[
AssetSpec("asset1"),
AssetSpec("asset2", tags={**StorageKindTagSet(storage_kind="bigquery")}),
],
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets2.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "bigquery"}


def test_multi_assets_asset_out_with_storage_kind_override() -> None:
@multi_asset(
outs={
"asset1": AssetOut(),
"asset2": AssetOut(tags={**StorageKindTagSet(storage_kind="bigquery")}),
},
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets2.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "bigquery"}