Skip to content

Commit

Permalink
AssetOut (#8359)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jun 13, 2022
1 parent ff22edd commit 227a336
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 52 deletions.
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dagster.config.config_type import Array, Noneable, ScalarUnion
from dagster.core.asset_defs import (
AssetIn,
AssetOut,
AssetSelection,
AssetsDefinition,
SourceAsset,
Expand Down Expand Up @@ -396,6 +397,7 @@ def __dir__() -> typing.List[str]:
"AssetIn",
"AssetMaterialization",
"AssetObservation",
"AssetOut",
"AssetSelection",
"AssetSensorDefinition",
"AssetsDefinition",
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/core/asset_defs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .asset_group import AssetGroup
from .asset_in import AssetIn
from .asset_out import AssetOut
from .asset_selection import AssetSelection
from .assets import AssetsDefinition
from .assets_job import build_assets_job
Expand Down
81 changes: 81 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/asset_out.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Type, Union

import dagster._check as check
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix
from dagster.core.definitions.input import NoValueSentinel
from dagster.core.definitions.metadata import MetadataUserInput
from dagster.core.definitions.output import Out
from dagster.core.types.dagster_type import DagsterType, resolve_dagster_type


class AssetOut(
NamedTuple(
"_AssetOut",
[
("key", Optional[AssetKey]),
("key_prefix", Optional[Sequence[str]]),
("metadata", Optional[Mapping[str, Any]]),
("io_manager_key", str),
("description", Optional[str]),
("is_required", bool),
("dagster_type", Union[DagsterType, Type[NoValueSentinel]]),
],
)
):
"""
Args:
key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the
concatenation of the key_prefix and the asset's name, which defaults to the name of
the decorated function. Only one of the "key_prefix" and "key" arguments should be
provided.
key (Optional[Union[str, Sequence[str], AssetKey]]): The asset's key. Only one of the
"key_prefix" and "key" arguments should be provided.
dagster_type (Optional[Union[Type, DagsterType]]]):
The type of this output. Should only be set if the correct type can not
be inferred directly from the type signature of the decorated function.
description (Optional[str]): Human-readable description of the output.
is_required (bool): Whether the presence of this field is required. (default: True)
io_manager_key (Optional[str]): The resource key of the IO manager used for this output.
(default: "io_manager").
metadata (Optional[Dict[str, Any]]): A dict of the metadata for the output.
For example, users can provide a file path if the data object will be stored in a
filesystem, or provide information of a database table when it is going to load the data
into the table.
"""

def __new__(
cls,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
key: Optional[CoercibleToAssetKey] = None,
dagster_type: Union[Type, DagsterType] = NoValueSentinel,
description: Optional[str] = None,
is_required: bool = True,
io_manager_key: Optional[str] = None,
metadata: Optional[MetadataUserInput] = None,
):
if isinstance(key_prefix, str):
key_prefix = [key_prefix]

return super(AssetOut, cls).__new__(
cls,
key=AssetKey.from_coerceable(key) if key is not None else None,
key_prefix=check.opt_list_param(key_prefix, "key_prefix", of_type=str),
dagster_type=NoValueSentinel
if dagster_type is NoValueSentinel
else resolve_dagster_type(dagster_type),
description=check.opt_str_param(description, "description"),
is_required=check.bool_param(is_required, "is_required"),
io_manager_key=check.opt_str_param(
io_manager_key, "io_manager_key", default="io_manager"
),
metadata=check.opt_dict_param(metadata, "metadata", key_type=str),
)

def to_out(self) -> Out:
return Out(
dagster_type=self.dagster_type,

This comment has been minimized.

Copy link
@mcminis1

mcminis1 Jun 21, 2022

should this also include?

asset_key=self.key
description=self.description,
metadata=self.metadata,
is_required=self.is_required,
io_manager_key=self.io_manager_key,
)
66 changes: 50 additions & 16 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@
from dagster.core.storage.io_manager import IOManagerDefinition
from dagster.core.types.dagster_type import DagsterType
from dagster.seven import funcsigs
from dagster.utils.backcompat import ExperimentalWarning, canonicalize_backcompat_args
from dagster.utils.backcompat import (
ExperimentalWarning,
canonicalize_backcompat_args,
deprecation_warning,
)

from .asset_in import AssetIn
from .asset_out import AssetOut
from .assets import AssetsDefinition
from .partition_mapping import PartitionMapping

Expand Down Expand Up @@ -104,8 +109,9 @@ def asset(
decorated function.
namespace (Optional[Sequence[str]]): **Deprecated (use `key_prefix`)**. The namespace that
the asset resides in. The namespace + the name forms the asset key.
key_prefix (Optional[Union[str, Sequence[str]]]): Optional prefix to apply to the asset key.
If `None`, asset key is simply the name of the function.
key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the
concatenation of the key_prefix and the asset's name, which defaults to the name of
the decorated function.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to their metadata
and namespaces.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are
Expand Down Expand Up @@ -299,7 +305,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:


def multi_asset(
outs: Dict[str, Out],
outs: Mapping[str, Union[Out, AssetOut]],
name: Optional[str] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
Expand Down Expand Up @@ -355,11 +361,6 @@ def multi_asset(
can_subset (bool): If this asset's computation can emit a subset of the asset
keys based on the context.selected_assets argument. Defaults to False.
"""

check.invariant(
all(out.asset_key is None or isinstance(out.asset_key, AssetKey) for out in outs.values()),
"The asset_key argument for Outs supplied to a multi_asset must be a constant or None, not a function. ",
)
asset_deps = check.opt_dict_param(
internal_asset_deps, "internal_asset_deps", key_type=str, value_type=set
)
Expand All @@ -368,19 +369,24 @@ def multi_asset(
"config_schema",
additional_message="Only dicts are supported for asset config_schema.",
)
for out in outs.values():
if isinstance(out, Out) and not isinstance(out, AssetOut):
deprecation_warning(
"Passing Out objects as values for the out argument of @multi_asset",
"0.16.0",
additional_warn_txt="Use AssetOut instead.",
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:

op_name = name or fn.__name__
asset_ins = build_asset_ins(
fn, None, ins or {}, non_argument_deps=_make_asset_keys(non_argument_deps)
)
asset_outs = build_asset_outs(outs)

# validate that the asset_deps make sense
valid_asset_deps = set(asset_ins.keys())
valid_asset_deps.update(
cast(AssetKey, out.asset_key or AssetKey([name])) for name, out in outs.items()
)
valid_asset_deps = set(asset_ins.keys()) | set(asset_outs.keys())
for out_name, asset_keys in asset_deps.items():
check.invariant(
out_name in outs,
Expand All @@ -401,7 +407,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
name=op_name,
description=description,
ins=dict(asset_ins.values()),
out=outs,
out=dict(asset_outs.values()),
required_resource_keys=required_resource_keys,
tags={
**({"kind": compute_kind} if compute_kind else {}),
Expand All @@ -421,7 +427,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
asset_keys_by_output_name = {
name: cast(AssetKey, out.asset_key or AssetKey([name])) for name, out in outs.items()
output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
}
return AssetsDefinition(
asset_keys_by_input_name=asset_keys_by_input_name,
Expand Down Expand Up @@ -450,7 +456,6 @@ def build_asset_ins(
"""
Creates a mapping from AssetKey to (name of input, In object)
"""

non_argument_deps = check.opt_set_param(non_argument_deps, "non_argument_deps", AssetKey)

params = get_function_params(fn)
Expand Down Expand Up @@ -501,6 +506,35 @@ def build_asset_ins(
return ins_by_asset_key


def build_asset_outs(
asset_outs: Mapping[str, Union[Out, AssetOut]]
) -> Mapping[AssetKey, Tuple[str, Out]]:
"""
Creates a mapping from AssetKey to (name of output, Out object)
"""
outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {}
for output_name, asset_out in asset_outs.items():
if isinstance(asset_out, AssetOut):
out = asset_out.to_out()
asset_key = asset_out.key or AssetKey(
list(filter(None, [*(asset_out.key_prefix or []), output_name]))
)
elif isinstance(asset_out, Out):
out = asset_out
if isinstance(asset_out.asset_key, AssetKey):
asset_key = asset_out.asset_key
elif asset_out.asset_key is None:
asset_key = AssetKey(output_name)
else:
check.failed("Expected AssetKey or None")
else:
check.failed("Expected Out or AssetOut")

outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out)

return outs_by_asset_key


def _make_asset_keys(deps: Optional[Union[Set[AssetKey], Set[str]]]) -> Optional[Set[AssetKey]]:
"""Convert all str items to AssetKey in the set."""
if deps is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Callable,
Dict,
List,
Mapping,
Optional,
Sequence,
Set,
Expand Down Expand Up @@ -47,7 +48,7 @@ def __init__(
decorator_takes_context: Optional[bool] = True,
retry_policy: Optional[RetryPolicy] = None,
ins: Optional[Dict[str, In]] = None,
out: Optional[Union[Out, Dict[str, Out]]] = None,
out: Optional[Union[Out, Mapping[str, Out]]] = None,
):
self.name = check.opt_str_param(name, "name")
self.input_defs = check.opt_nullable_sequence_param(
Expand Down Expand Up @@ -141,14 +142,14 @@ def __call__(self, fn: Callable[..., Any]) -> "OpDefinition":


def _resolve_output_defs_from_outs(
inferred_out: InferredOutputProps, out: Optional[Union[Out, dict]]
inferred_out: InferredOutputProps, out: Optional[Union[Out, Mapping]]
) -> Optional[List[OutputDefinition]]:
if out is None:
return None
if isinstance(out, Out):
return [out.to_definition(inferred_out.annotation, name=None)]
else:
check.dict_param(out, "out", key_type=str, value_type=Out)
check.mapping_param(out, "out", key_type=str, value_type=Out)

# If only a single entry has been provided to the out dict, then slurp the
# annotation into the entry.
Expand Down Expand Up @@ -206,7 +207,7 @@ def op(
name: Optional[Union[Callable[..., Any], str]] = None,
description: Optional[str] = None,
ins: Optional[Dict[str, In]] = None,
out: Optional[Union[Out, Dict[str, Out]]] = None,
out: Optional[Union[Out, Mapping[str, Out]]] = None,
config_schema: Optional[ConfigSchemaType] = None,
required_resource_keys: Optional[Set[str]] = None,
tags: Optional[Dict[str, Any]] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dagster import (
AssetKey,
AssetOut,
AssetsDefinition,
DagsterEventType,
DagsterInvalidDefinitionError,
Expand Down Expand Up @@ -268,9 +269,9 @@ def c(b):

@multi_asset(
outs={
"a": Out(is_required=False),
"b": Out(is_required=False),
"c": Out(is_required=False),
"a": AssetOut(is_required=False),
"b": AssetOut(is_required=False),
"c": AssetOut(is_required=False),
},
internal_asset_deps={
"a": {AssetKey("start")},
Expand Down Expand Up @@ -302,9 +303,9 @@ def f(d, e):

@multi_asset(
outs={
"d": Out(is_required=False),
"e": Out(is_required=False),
"f": Out(is_required=False),
"d": AssetOut(is_required=False),
"e": AssetOut(is_required=False),
"f": AssetOut(is_required=False),
},
internal_asset_deps={
"d": {AssetKey("a"), AssetKey("b")},
Expand Down Expand Up @@ -553,7 +554,7 @@ def test_subset_does_not_respect_context():
def start():
return 1

@multi_asset(outs={"a": Out(), "b": Out(), "c": Out()}, can_subset=True)
@multi_asset(outs={"a": AssetOut(), "b": AssetOut(), "c": AssetOut()}, can_subset=True)
def abc(start):
# this asset declares that it can subset its computation but will always produce all outputs
yield Output(1 + start, "a")
Expand Down Expand Up @@ -610,7 +611,7 @@ def test_subset_cycle_resolution_embed_assets_in_complex_graph():
io_manager_obj.db[AssetKey(item)] = None

@multi_asset(
outs={name: Out(is_required=False) for name in "a,b,c,d,e,f,g,h".split(",")},
outs={name: AssetOut(is_required=False) for name in "a,b,c,d,e,f,g,h".split(",")},
internal_asset_deps={
"a": set(),
"b": set(),
Expand Down Expand Up @@ -690,7 +691,7 @@ def test_subset_cycle_resolution_complex():
io_manager_obj.db[AssetKey(item)] = None

@multi_asset(
outs={name: Out(is_required=False) for name in "a,b,c,d,e,f".split(",")},
outs={name: AssetOut(is_required=False) for name in "a,b,c,d,e,f".split(",")},
internal_asset_deps={
"a": set(),
"b": {AssetKey("x")},
Expand Down Expand Up @@ -754,7 +755,7 @@ def test_subset_cycle_resolution_basic():
s = SourceAsset("s")

@multi_asset(
outs={"a": Out(is_required=False), "b": Out(is_required=False)},
outs={"a": AssetOut(is_required=False), "b": AssetOut(is_required=False)},
internal_asset_deps={
"a": {AssetKey("s")},
"b": {AssetKey("a_prime")},
Expand All @@ -769,7 +770,7 @@ def foo(context, s, a_prime):
yield Output(a_prime + 1, "b")

@multi_asset(
outs={"a_prime": Out(is_required=False), "b_prime": Out(is_required=False)},
outs={"a_prime": AssetOut(is_required=False), "b_prime": AssetOut(is_required=False)},
internal_asset_deps={
"a_prime": {AssetKey("a")},
"b_prime": {AssetKey("b")},
Expand Down Expand Up @@ -1040,7 +1041,7 @@ def test_materialize_with_selection():
def start_asset():
return "foo"

@multi_asset(outs={"o1": Out(asset_key=AssetKey("o1")), "o2": Out(asset_key=AssetKey("o2"))})
@multi_asset(outs={"o1": AssetOut(), "o2": AssetOut()})
def middle_asset(start_asset):
return (start_asset, start_asset)

Expand Down Expand Up @@ -1269,8 +1270,8 @@ def my_asset():

@multi_asset(
outs={
"my_out_name": Out(asset_key=AssetKey("my_asset_name")),
"my_other_out_name": Out(asset_key=AssetKey("my_other_asset")),
"my_out_name": AssetOut(key=AssetKey("my_asset_name")),
"my_other_out_name": AssetOut(key=AssetKey("my_other_asset")),
}
)
def my_multi_asset():
Expand Down

0 comments on commit 227a336

Please sign in to comment.