Skip to content

Commit

Permalink
Introduce storage kind tag, helpers to @asset, @multi_asset
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed May 22, 2024
1 parent 6f93b35 commit 7b6f045
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
from dagster._core.definitions.resource_annotation import (
get_resource_args,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterType
from dagster._utils.merger import deep_merge_dicts
from dagster._utils.warnings import (
disable_dagster_warnings,
)
Expand Down Expand Up @@ -95,6 +97,7 @@ def asset(
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
check_specs: Optional[Sequence[AssetCheckSpec]] = ...,
owners: Optional[Sequence[str]] = ...,
storage_kind: Optional[str] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...


Expand Down Expand Up @@ -137,6 +140,7 @@ def asset(
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners: Optional[Sequence[str]] = None,
storage_kind: Optional[str] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -216,6 +220,7 @@ def asset(
owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each
string can be a user's email address, or a team name prefixed with `team:`,
e.g. `team:finops`.
storage_kind (Optional[str]): A storage kind tag for the asset.
Examples:
.. code-block:: python
Expand Down Expand Up @@ -257,6 +262,7 @@ def create_asset():
check_specs=check_specs,
key=key,
owners=owners,
storage_kind=check.opt_str_param(storage_kind, "storage_kind"),
)

if compute_fn is not None:
Expand Down Expand Up @@ -331,6 +337,7 @@ def __init__(
key: Optional[CoercibleToAssetKey] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners: Optional[Sequence[str]] = None,
storage_kind: Optional[str] = None,
):
self.name = name
self.key_prefix = key_prefix
Expand Down Expand Up @@ -360,6 +367,7 @@ def __init__(
self.check_specs = check_specs
self.key = key
self.owners = owners
self.storage_kind = storage_kind

def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition:
from dagster._config.pythonic_config import (
Expand Down Expand Up @@ -475,6 +483,12 @@ def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition:
partition_mappings=partition_mappings, deps=self.deps, asset_name=asset_name
)

all_tags = {
**(self.tags or {}),
**(StorageKindTagSet(storage_kind=self.storage_kind) if self.storage_kind else {}),
}
tags_by_key = {out_asset_key: all_tags} if all_tags else None

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name={"result": out_asset_key},
Expand All @@ -498,7 +512,7 @@ def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition:
selected_asset_keys=None, # no subselection in decorator
can_subset=False,
metadata_by_key={out_asset_key: self.metadata} if self.metadata else None,
tags_by_key={out_asset_key: self.tags} if self.tags else None,
tags_by_key=tags_by_key,
# see comment in @multi_asset's call to dagster_internal_init for the gory details
# this is best understood as an _override_ which @asset does not support
descriptions_by_key=None,
Expand Down Expand Up @@ -534,6 +548,7 @@ def multi_asset(
code_version: Optional[str] = None,
specs: Optional[Sequence[AssetSpec]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
storage_kind: Optional[str] = None,
# deprecated
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
Expand Down Expand Up @@ -591,6 +606,7 @@ def multi_asset(
by this function.
check_specs (Optional[Sequence[AssetCheckSpec]]): (Experimental) Specs for asset checks that
execute in the decorated function after materializing the assets.
storage_kind (Optional[str]): A storage kind tag for the assets materialized by this function.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
Set of asset keys that are upstream dependencies, but do not pass an input to the
multi_asset.
Expand Down Expand Up @@ -887,11 +903,24 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
for asset_key, props in props_by_asset_key.items()
if props.metadata is not None
}

tags_by_key = {
asset_key: props.tags
for asset_key, props in props_by_asset_key.items()
if props.tags is not None
}
storage_kind_tags = (
{
asset_key: dict(StorageKindTagSet(storage_kind=storage_kind))
for asset_key in output_tuples_by_asset_key.keys()
}
if storage_kind
else {}
)
all_tags = cast(
Dict[AssetKey, Mapping[str, str]], deep_merge_dicts(storage_kind_tags, tags_by_key)
)

owners_by_key = {
asset_key: props.owners
for asset_key, props in props_by_asset_key.items()
Expand Down Expand Up @@ -923,7 +952,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
# in OutputDefinitions in @multi_asset
descriptions_by_key=None,
metadata_by_key=metadata_by_key,
tags_by_key=tags_by_key,
tags_by_key=all_tags,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None, # no subselection in decorator
is_subset=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,21 @@ def asset2(): ...
}


def test_asset_with_storage_kind() -> None:
@asset(storage_kind="snowflake")
def asset1(): ...

assert asset1.tags_by_key[asset1.key] == {"dagster/storage_kind": "snowflake"}

@asset(storage_kind="snowflake", tags={"a": "b"})
def asset2(): ...

assert asset2.tags_by_key[asset2.key] == {
"dagster/storage_kind": "snowflake",
"a": "b",
}


def test_asset_spec_with_tags():
@multi_asset(specs=[AssetSpec("asset1", tags={"a": "b"})])
def assets(): ...
Expand All @@ -2184,3 +2199,77 @@ def assets(): ...

@multi_asset(outs={"asset1": AssetOut(tags={"a%": "b"})}) # key has illegal character
def assets(): ...


def test_multi_assets_asset_spec_with_storage_kind() -> None:
@multi_asset(specs=[AssetSpec("asset1"), AssetSpec("asset2")], storage_kind="snowflake")
def assets(): ...

assert assets.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "snowflake"}

@multi_asset(
specs=[AssetSpec("asset1", tags={"a": "b"}), AssetSpec("asset2", tags={"c": "d"})],
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {
"dagster/storage_kind": "snowflake",
"a": "b",
}
assert assets2.tags_by_key[AssetKey("asset2")] == {
"dagster/storage_kind": "snowflake",
"c": "d",
}


def test_multi_assets_asset_out_with_storage_kind() -> None:
@multi_asset(outs={"asset1": AssetOut(), "asset2": AssetOut()}, storage_kind="snowflake")
def assets(): ...

assert assets.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "snowflake"}

@multi_asset(
outs={"asset1": AssetOut(tags={"a": "b"}), "asset2": AssetOut(tags={"c": "d"})},
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {
"dagster/storage_kind": "snowflake",
"a": "b",
}
assert assets2.tags_by_key[AssetKey("asset2")] == {
"dagster/storage_kind": "snowflake",
"c": "d",
}


def test_multi_assets_asset_spec_with_storage_kind_override() -> None:
@multi_asset(
specs=[
AssetSpec("asset1"),
AssetSpec("asset2", tags={**StorageKindTagSet(storage_kind="bigquery")}),
],
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets2.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "bigquery"}


def test_multi_assets_asset_out_with_storage_kind_override() -> None:
@multi_asset(
outs={
"asset1": AssetOut(),
"asset2": AssetOut(tags={**StorageKindTagSet(storage_kind="bigquery")}),
},
storage_kind="snowflake",
)
def assets2(): ...

assert assets2.tags_by_key[AssetKey("asset1")] == {"dagster/storage_kind": "snowflake"}
assert assets2.tags_by_key[AssetKey("asset2")] == {"dagster/storage_kind": "bigquery"}

0 comments on commit 7b6f045

Please sign in to comment.