Skip to content

Commit

Permalink
Make Output and DynamicOutput no longer be tuple types (#7740)
Browse files Browse the repository at this point in the history
* Make Output and DynamicOutput no longer be tuple types

* Get rid of complex 3.6 logic

* Fix metadata entries

* remove unused import
  • Loading branch information
dpeng817 committed May 5, 2022
1 parent 81a0421 commit ee3081f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 51 deletions.
78 changes: 39 additions & 39 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,7 @@ def __new__(cls, asset_key, partitions=None):
T = TypeVar("T")


class Output(
NamedTuple(
"_Output",
[
("value", Any),
("output_name", str),
("metadata_entries", List[Union[PartitionMetadataEntry, MetadataEntry]]),
],
),
Generic[T],
):
class Output(Generic[T]):
"""Event corresponding to one of a op's outputs.
Op compute functions must explicitly yield events of this type when they have more than
Expand All @@ -214,8 +204,8 @@ class Output(
list, and one of the data classes returned by a MetadataValue static method.
"""

def __new__(
cls,
def __init__(
self,
value: T,
output_name: Optional[str] = DEFAULT_OUTPUT,
metadata_entries: Optional[Sequence[Union[MetadataEntry, PartitionMetadataEntry]]] = None,
Expand All @@ -228,26 +218,24 @@ def __new__(
"metadata_entries",
of_type=(MetadataEntry, PartitionMetadataEntry),
)
self._value = value
self._output_name = check.str_param(output_name, "output_name")
self._metadata_entries = normalize_metadata(metadata, metadata_entries)

return super(Output, cls).__new__(
cls,
value,
check.str_param(output_name, "output_name"),
normalize_metadata(metadata, metadata_entries),
)
@property
def metadata_entries(self) -> List[Union[PartitionMetadataEntry, MetadataEntry]]:
return self._metadata_entries

@property
def value(self) -> Any:
return self._value

class DynamicOutput(
NamedTuple(
"_DynamicOutput",
[
("value", Any),
("mapping_key", str),
("output_name", str),
("metadata_entries", List[Union[PartitionMetadataEntry, MetadataEntry]]),
],
)
):
@property
def output_name(self) -> str:
return self._output_name


class DynamicOutput:
"""
Variant of :py:class:`Output <dagster.Output>` used to support
dynamic mapping & collect. Each ``DynamicOutput`` produced by an op represents
Expand All @@ -274,8 +262,8 @@ class DynamicOutput(
list, and one of the data classes returned by a MetadataValue static method.
"""

def __new__(
cls,
def __init__(
self,
value: Any,
mapping_key: str,
output_name: Optional[str] = DEFAULT_OUTPUT,
Expand All @@ -287,14 +275,26 @@ def __new__(
metadata_entries = check.opt_list_param(
metadata_entries, "metadata_entries", of_type=MetadataEntry
)
self._mapping_key = check_valid_name(check.str_param(mapping_key, "mapping_key"))
self._output_name = check.str_param(output_name, "output_name")
self._metadata_entries = normalize_metadata(metadata, metadata_entries)
self._value = value

return super(DynamicOutput, cls).__new__(
cls,
value=value,
mapping_key=check_valid_name(check.str_param(mapping_key, "mapping_key")),
output_name=check.str_param(output_name, "output_name"),
metadata_entries=normalize_metadata(metadata, metadata_entries),
)
@property
def metadata_entries(self) -> List[Union[PartitionMetadataEntry, MetadataEntry]]:
return self._metadata_entries

@property
def mapping_key(self) -> str:
return self._mapping_key

@property
def value(self) -> Any:
return self._value

@property
def output_name(self) -> str:
return self._output_name


@whitelist_for_serdes
Expand Down
14 changes: 4 additions & 10 deletions python_modules/dagster/dagster/core/types/dagster_type.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
import typing as t
from abc import abstractmethod
from enum import Enum as PythonEnum
Expand Down Expand Up @@ -825,7 +824,9 @@ def resolve_dagster_type(dagster_type: object) -> DagsterType:

# First, check to see if we're using Dagster's generic output type to do the type catching.
if _is_generic_output_annotation(dagster_type):
dagster_type = get_args(dagster_type)[0]
type_args = get_args(dagster_type)
# If no inner type was provided, forward Any type.
dagster_type = type_args[0] if len(type_args) == 1 else Any

# Then, check to see if it is part of python's typing library
if is_typing_type(dagster_type):
Expand Down Expand Up @@ -879,14 +880,7 @@ def resolve_dagster_type(dagster_type: object) -> DagsterType:
def _is_generic_output_annotation(dagster_type: object) -> bool:
from dagster.seven.typing import get_origin

# On python version 3.6, get_origin cannot introspect the origin of a
# generic NamedTuple (returns tuple, not Output). So we need to use
# __origin__ directly. We only do this on python 3.6, since __origin__ is
# an internal implementation detail, and may not be supported in future versions.
if sys.version_info[0] == 3 and sys.version_info[1] <= 6:
return hasattr(dagster_type, "__origin__") and dagster_type.__origin__ == Output
else:
return get_origin(dagster_type) == Output
return dagster_type == Output or get_origin(dagster_type) == Output


def resolve_python_type_to_dagster_type(python_type: t.Type) -> DagsterType:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,10 @@ def test_type_annotations_with_output():
def my_op_returns_output() -> Output:
return Output(5)

with pytest.raises(DagsterTypeCheckDidNotPass):
my_op_returns_output()
output = my_op_returns_output()
assert output.value == 5
result = execute_op_in_graph(my_op_returns_output)
assert result.output_for_node("my_op_returns_output") == 5


# Document what happens when someone tries to use type annotations with generator
Expand Down

0 comments on commit ee3081f

Please sign in to comment.