Skip to content

Commit

Permalink
[rfc] generic + returnable dynamic outputs (#7744)
Browse files Browse the repository at this point in the history
* Make Output and DynamicOutput no longer be tuple types

* Fix metadata entries

* Allow dynamic outputs to be used in a return context

* Add test cases, fix implementations

* Expand test cases, fix error message in execute in process result, fix dynamic output case where no outputs have been returned

* fix error message

* Add comments

* Fix error in test
  • Loading branch information
dpeng817 committed May 11, 2022
1 parent d099e43 commit 0356320
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 15 deletions.
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def output_name(self) -> str:
return self._output_name


class DynamicOutput:
class DynamicOutput(Generic[T]):
"""
Variant of :py:class:`Output <dagster.Output>` used to support
dynamic mapping & collect. Each ``DynamicOutput`` produced by an op represents
Expand Down Expand Up @@ -256,7 +256,7 @@ class DynamicOutput:

def __init__(
self,
value: Any,
value: T,
mapping_key: str,
output_name: Optional[str] = DEFAULT_OUTPUT,
metadata_entries: Optional[List[Union[PartitionMetadataEntry, MetadataEntry]]] = None,
Expand All @@ -281,7 +281,7 @@ def mapping_key(self) -> str:
return self._mapping_key

@property
def value(self) -> Any:
def value(self) -> T:
return self._value

@property
Expand Down
20 changes: 15 additions & 5 deletions python_modules/dagster/dagster/core/definitions/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from dagster.core.definitions.events import AssetKey, DynamicAssetKey
from dagster.core.definitions.metadata import MetadataEntry, MetadataUserInput, normalize_metadata
from dagster.core.errors import DagsterError, DagsterInvalidDefinitionError
from dagster.core.types.dagster_type import DagsterType, resolve_dagster_type
from dagster.core.types.dagster_type import (
DagsterType,
is_dynamic_output_annotation,
resolve_dagster_type,
)
from dagster.utils.backcompat import experimental_arg_warning

from .inference import InferredOutputProps
Expand Down Expand Up @@ -231,10 +235,16 @@ def mapping_from(self, solid_name: str, output_name: Optional[str] = None) -> "O

@staticmethod
def create_from_inferred(inferred: InferredOutputProps) -> "OutputDefinition":
return OutputDefinition(
dagster_type=_checked_inferred_type(inferred.annotation),
description=inferred.description,
)
if is_dynamic_output_annotation(inferred.annotation):
return DynamicOutputDefinition(
dagster_type=_checked_inferred_type(inferred.annotation),
description=inferred.description,
)
else:
return OutputDefinition(
dagster_type=_checked_inferred_type(inferred.annotation),
description=inferred.description,
)

def combine_with_inferred(self: TOut, inferred: InferredOutputProps) -> TOut:
dagster_type = self.dagster_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,7 @@ def _filter_outputs_by_handle(
mapped_outputs[step_output_handle.mapping_key] = value

if not output_found:
raise DagsterInvariantViolationError(f"No outputs found for node '{node_handle}'.")
raise DagsterInvariantViolationError(
f"No outputs found for output '{output_name}' from node '{node_handle}'."
)
return mapped_outputs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dagster._check as check
from dagster.core.definitions import (
AssetMaterialization,
DynamicOutput,
ExpectationResult,
Materialization,
Output,
Expand Down Expand Up @@ -88,6 +89,14 @@ def _validate_and_coerce_solid_result_to_iterator(result, context, output_defs):
yield event
elif isinstance(result, Output):
yield result
elif len(output_defs) == 1 and output_defs[0].is_dynamic:
if isinstance(result, list) and all([isinstance(event, DynamicOutput) for event in result]):
for event in result:
yield event
elif result is not None:
check.failed(
f"{context.describe_op()} has a single dynamic output named '{output_defs[0].name}', which expects either a list of DynamicOutputs to be returned, or DynamicOutput objects to be yielded. Received instead an object of type {type(result)}"
)
elif len(output_defs) == 1:
if result is None and output_defs[0].is_required is False:
context.log.warn(
Expand Down Expand Up @@ -131,6 +140,30 @@ def _validate_and_coerce_solid_result_to_iterator(result, context, output_defs):
value=element.value,
metadata_entries=element.metadata_entries,
)
elif isinstance(element, list) and all(
[isinstance(event, DynamicOutput) for event in element]
):
if not output_def.is_dynamic:
raise DagsterInvariantViolationError(
f"Received a list of DynamicOutputs for output named '{output_def.name}', but output is not dynamic."
)
for dynamic_output in element:
if (
not dynamic_output.output_name == DEFAULT_OUTPUT
and not dynamic_output.output_name == output_def.name
):
raise DagsterInvariantViolationError(
f"Bad state: Received a tuple of outputs. An output was "
f"explicitly named '{dynamic_output.output_name}', which does "
"not match the dynamic output definition specified for "
f"position {position}: '{output_def.name}'."
)
yield DynamicOutput(
output_name=output_def.name,
value=dynamic_output.value,
mapping_key=dynamic_output.mapping_key,
metadata_entries=dynamic_output.metadata_entries,
)
else:
# If an output object was not returned, then construct one from any metadata that has been logged within the op's body.
metadata = context.get_output_metadata(output_def.name)
Expand Down
30 changes: 29 additions & 1 deletion python_modules/dagster/dagster/core/types/dagster_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dagster.builtins import BuiltinEnum
from dagster.config.config_type import Array, ConfigType
from dagster.config.config_type import Noneable as ConfigNoneable
from dagster.core.definitions.events import Output, TypeCheck
from dagster.core.definitions.events import DynamicOutput, Output, TypeCheck
from dagster.core.definitions.metadata import MetadataEntry, RawMetadataValue, normalize_metadata
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster.serdes import whitelist_for_serdes
Expand Down Expand Up @@ -827,6 +827,10 @@ def resolve_dagster_type(dagster_type: object) -> DagsterType:
type_args = get_args(dagster_type)
# If no inner type was provided, forward Any type.
dagster_type = type_args[0] if len(type_args) == 1 else Any
elif is_dynamic_output_annotation(dagster_type):
dynamic_out_annotation = get_args(dagster_type)[0]
type_args = get_args(dynamic_out_annotation)
dagster_type = type_args[0] if len(type_args) == 1 else Any

# Then, check to see if it is part of python's typing library
if is_typing_type(dagster_type):
Expand Down Expand Up @@ -877,6 +881,30 @@ def resolve_dagster_type(dagster_type: object) -> DagsterType:
)


def is_dynamic_output_annotation(dagster_type: object) -> bool:
from dagster.seven.typing import get_args, get_origin

check.invariant(
not (isinstance(dagster_type, type) and issubclass(dagster_type, ConfigType)),
"Cannot resolve a config type to a runtime type",
)

check.invariant(
not (isinstance(dagster_type, type) and issubclass(dagster_type, DagsterType)),
"Do not pass runtime type classes. Got {}".format(dagster_type),
)

if dagster_type == DynamicOutput or get_origin(dagster_type) == DynamicOutput:
raise DagsterInvariantViolationError(
"Op annotated with return type DynamicOutput. DynamicOutputs can only be returned in the context of a List. If only one output is needed, use the Output API."
)

if get_origin(dagster_type) == list and len(get_args(dagster_type)) == 1:
list_inner_type = get_args(dagster_type)[0]
return list_inner_type == DynamicOutput or get_origin(list_inner_type) == DynamicOutput
return False


def _is_generic_output_annotation(dagster_type: object) -> bool:
from dagster.seven.typing import get_origin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,175 @@ def the_op() -> Tuple[Output[int], Output[str]]:
match="Bad state: Received a tuple of outputs. An output was explicitly named 'out2', which does not match the output definition specified for position 0: 'out1'.",
):
execute_op_in_graph(the_op)


def test_generic_dynamic_output():
@op
def basic() -> List[DynamicOutput[int]]:
return [DynamicOutput(mapping_key="1", value=1), DynamicOutput(mapping_key="2", value=2)]

result = execute_op_in_graph(basic)
assert result.success
assert result.output_for_node("basic") == {"1": 1, "2": 2}


def test_generic_dynamic_output_type_mismatch():
@op
def basic() -> List[DynamicOutput[int]]:
return [DynamicOutput(mapping_key="1", value=1), DynamicOutput(mapping_key="2", value="2")]

with pytest.raises(
DagsterTypeCheckDidNotPass,
match='Type check failed for step output "result" - expected type "Int". Description: Value "2" of python type "str" must be a int.',
):
execute_op_in_graph(basic)


def test_generic_dynamic_output_mix_with_regular():
@op(out={"regular": Out(), "dynamic": DynamicOut()})
def basic() -> Tuple[Output[int], List[DynamicOutput[str]]]:
return (
Output(5),
[
DynamicOutput(mapping_key="1", value="foo"),
DynamicOutput(mapping_key="2", value="bar"),
],
)

result = execute_op_in_graph(basic)
assert result.success

assert result.output_for_node("basic", "regular") == 5
assert result.output_for_node("basic", "dynamic") == {"1": "foo", "2": "bar"}


def test_generic_dynamic_output_mix_with_regular_type_mismatch():
@op(out={"regular": Out(), "dynamic": DynamicOut()})
def basic() -> Tuple[Output[int], List[DynamicOutput[str]]]:
return (
Output(5),
[
DynamicOutput(mapping_key="1", value="foo"),
DynamicOutput(mapping_key="2", value=5),
],
)

with pytest.raises(
DagsterTypeCheckDidNotPass,
match='Type check failed for step output "dynamic" - expected type "String". Description: Value "5" of python type "int" must be a string.',
):
execute_op_in_graph(basic)


def test_generic_dynamic_output_name_not_provided():
@op
def basic() -> List[DynamicOutput[int]]:
return [DynamicOutput(value=5, mapping_key="blah", output_name="blah")]

with pytest.raises(
DagsterInvariantViolationError,
match='Core compute for op "basic" returned an output "blah" that does not exist.',
):
execute_op_in_graph(basic)


def test_generic_dynamic_output_name_mismatch():
@op(out={"the_name": DynamicOut()})
def basic() -> List[DynamicOutput[int]]:
return [DynamicOutput(value=5, mapping_key="blah", output_name="bad_name")]

with pytest.raises(
DagsterInvariantViolationError,
match='Core compute for op "basic" returned an output "bad_name" that does not exist.',
):
execute_op_in_graph(basic)


def test_generic_dynamic_output_bare_list():
@op
def basic() -> List[DynamicOutput]:
return [DynamicOutput(4, mapping_key="1")]

result = execute_op_in_graph(basic)
assert result.success
assert result.output_for_node("basic") == {"1": 4}


def test_generic_dynamic_output_bare():

with pytest.raises(
DagsterInvariantViolationError,
match="Op annotated with return type DynamicOutput. DynamicOutputs can only be returned in the context of a List. If only one output is needed, use the Output API.",
):

@op
def basic() -> DynamicOutput:
pass

with pytest.raises(
DagsterInvariantViolationError,
match="Op annotated with return type DynamicOutput. DynamicOutputs can only be returned in the context of a List. If only one output is needed, use the Output API.",
):

@op
def basic() -> DynamicOutput[int]:
pass


def test_generic_dynamic_output_empty():
@op
def basic() -> List[DynamicOutput]:
return []

result = execute_op_in_graph(basic)
assert result.success

with pytest.raises(
DagsterInvariantViolationError,
match="No outputs found for output 'result' from node 'basic'.",
):
result.output_for_node("basic")

# This behavior isn't exactly correct - we should be erroring when a
# required dynamic output yields no outputs.
# https://github.com/dagster-io/dagster/issues/5948#issuecomment-997037163
@op(out=DynamicOut())
def basic_yield():
pass

result = execute_op_in_graph(basic_yield)
assert result.success


def test_generic_dynamic_output_empty_with_type():
@op
def basic() -> List[DynamicOutput[str]]:
return []

result = execute_op_in_graph(basic)
assert result.success

# Equivalent behavior in the dynamic yield case. is_required doesn't
# actually do anything on a DynamicOut right now:
# https://github.com/dagster-io/dagster/issues/5948#issuecomment-997037163
@op(out=DynamicOut(dagster_type=str, is_required=False))
def basic_yield():
pass

result = execute_op_in_graph(basic_yield)
assert result.success


def test_generic_dynamic_multiple_outputs_empty():
@op(out={"out1": Out(), "out2": DynamicOut()})
def basic() -> Tuple[Output, List[DynamicOutput]]:
return (Output(5), [])

result = execute_op_in_graph(basic)
assert result.success

with pytest.raises(
DagsterInvariantViolationError,
match="No outputs found for output 'out2' from node 'basic'.",
):
result.output_for_node("basic", "out2")
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
import objgraph
import pytest

from dagster import DynamicOut, DynamicOutput, DynamicOutputDefinition, Out
from dagster import _check as check
from dagster import (
DynamicOut,
DynamicOutput,
DynamicOutputDefinition,
Out,
build_solid_context,
execute_pipeline,
execute_solid,
Expand Down Expand Up @@ -76,7 +74,10 @@ def should_fail(_):
def should_also_fail(_):
return 1

with pytest.raises(DagsterInvariantViolationError, match="must yield DynamicOutput"):
with pytest.raises(
check.CheckError,
match="expects either a list of DynamicOutputs to be returned, or DynamicOutput objects to be yielded. Received instead an object of type <class 'int'>",
):
execute_solid(should_also_fail)


Expand Down

0 comments on commit 0356320

Please sign in to comment.