Skip to content

Commit

Permalink
namedtuple to NamedTuple (1st batch) (#6752)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 25, 2022
1 parent 308429b commit 47b9a68
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 88 deletions.
8 changes: 5 additions & 3 deletions python_modules/automation/automation/docker/dagster_docker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
import os
from collections import namedtuple
from typing import Callable, NamedTuple, Optional

import yaml
from dagster import check
Expand All @@ -25,7 +25,9 @@ def do_nothing(_cwd):
yield


class DagsterDockerImage(namedtuple("_DagsterDockerImage", "image build_cm path")):
class DagsterDockerImage(
NamedTuple("_DagsterDockerImage", [("image", str), ("build_cm", Callable), ("path", str)])
):
"""Represents a Dagster image.
Properties:
Expand All @@ -35,7 +37,7 @@ class DagsterDockerImage(namedtuple("_DagsterDockerImage", "image build_cm path"
path (Optional(str)): The path to the image's path. Defaults to docker/images/<IMAGE NAME>
"""

def __new__(cls, image, build_cm=do_nothing, path=None):
def __new__(cls, image: str, build_cm: Callable = do_nothing, path: Optional[str] = None):
return super(DagsterDockerImage, cls).__new__(
cls,
check.str_param(image, "image"),
Expand Down
29 changes: 23 additions & 6 deletions python_modules/dagster/dagster/config/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections import namedtuple
from enum import Enum
from typing import Dict, List, NamedTuple
from typing import Dict, List, NamedTuple, Union

from dagster import check
from dagster.utils.error import SerializableErrorInfo
Expand Down Expand Up @@ -99,7 +98,7 @@ def __new__(cls, config_type_snap, incoming_fields):
)


ERROR_DATA_TYPES = (
ERROR_DATA_UNION = Union[
FieldNotDefinedErrorData,
FieldsNotDefinedErrorData,
MissingFieldErrorData,
Expand All @@ -108,11 +107,29 @@ def __new__(cls, config_type_snap, incoming_fields):
SelectorTypeErrorData,
SerializableErrorInfo,
FieldAliasCollisionErrorData,
)
]

ERROR_DATA_TYPES = ERROR_DATA_UNION.__args__ # type: ignore

class EvaluationError(namedtuple("_EvaluationError", "stack reason message error_data")):
def __new__(cls, stack, reason, message, error_data):

class EvaluationError(
NamedTuple(
"_EvaluationError",
[
("stack", EvaluationStack),
("reason", DagsterEvaluationErrorReason),
("message", str),
("error_data", ERROR_DATA_UNION),
],
)
):
def __new__(
cls,
stack: EvaluationStack,
reason: DagsterEvaluationErrorReason,
message: str,
error_data: ERROR_DATA_UNION,
):
return super(EvaluationError, cls).__new__(
cls,
check.inst_param(stack, "stack", EvaluationStack),
Expand Down
7 changes: 4 additions & 3 deletions python_modules/dagster/dagster/config/snap.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import namedtuple
from typing import Any, Dict, List, NamedTuple, Optional, Set, cast

from dagster import check
Expand Down Expand Up @@ -196,8 +195,10 @@ def has_enum_value(self, value: object) -> bool:


@whitelist_for_serdes
class ConfigEnumValueSnap(namedtuple("_ConfigEnumValueSnap", "value description")):
def __new__(cls, value, description):
class ConfigEnumValueSnap(
NamedTuple("_ConfigEnumValueSnap", [("value", str), ("description", Optional[str])])
):
def __new__(cls, value: str, description: Optional[str]):
return super(ConfigEnumValueSnap, cls).__new__(
cls,
value=check.str_param(value, "value"),
Expand Down
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/core/assets.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from collections import namedtuple
from typing import NamedTuple, Optional

from dagster import check
from dagster.serdes import deserialize_json_to_dagster_namedtuple, whitelist_for_serdes


@whitelist_for_serdes
class AssetDetails(namedtuple("_AssetDetails", "last_wipe_timestamp")):
class AssetDetails(NamedTuple("_AssetDetails", [("last_wipe_timestamp", Optional[float])])):
"""
Set of asset fields that do not change with every materialization. These are generally updated
on some non-materialization action (e.g. wipe)
"""

def __new__(cls, last_wipe_timestamp=None):
def __new__(cls, last_wipe_timestamp: Optional[float] = None):
check.opt_float_param(last_wipe_timestamp, "last_wipe_timestamp")
return super(AssetDetails, cls).__new__(cls, last_wipe_timestamp)

Expand Down
45 changes: 33 additions & 12 deletions python_modules/dagster/dagster/core/code_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import sys
from abc import ABC, abstractmethod
from collections import namedtuple
from typing import List, NamedTuple, Optional

from dagster import check
from dagster.core.errors import DagsterImportError, DagsterInvariantViolationError
Expand Down Expand Up @@ -147,15 +147,18 @@ def load_python_module(module_name, working_directory, remove_from_path_fn=None)

@whitelist_for_serdes
class FileCodePointer(
namedtuple("_FileCodePointer", "python_file fn_name working_directory"), CodePointer
NamedTuple(
"_FileCodePointer",
[("python_file", str), ("fn_name", str), ("working_directory", Optional[str])],
),
CodePointer,
):
def __new__(cls, python_file, fn_name, working_directory=None):
check.opt_str_param(working_directory, "working_directory")
def __new__(cls, python_file: str, fn_name: str, working_directory: Optional[str] = None):
return super(FileCodePointer, cls).__new__(
cls,
check.str_param(python_file, "python_file"),
check.str_param(fn_name, "fn_name"),
working_directory,
check.opt_str_param(working_directory, "working_directory"),
)

def load_target(self):
Expand All @@ -180,9 +183,13 @@ def describe(self):

@whitelist_for_serdes
class ModuleCodePointer(
namedtuple("_ModuleCodePointer", "module fn_name working_directory"), CodePointer
NamedTuple(
"_ModuleCodePointer",
[("module", str), ("fn_name", str), ("working_directory", Optional[str])],
),
CodePointer,
):
def __new__(cls, module, fn_name, working_directory=None):
def __new__(cls, module: str, fn_name: str, working_directory: Optional[str] = None):
return super(ModuleCodePointer, cls).__new__(
cls,
check.str_param(module, "module"),
Expand All @@ -207,9 +214,13 @@ def describe(self):

@whitelist_for_serdes
class PackageCodePointer(
namedtuple("_PackageCodePointer", "module attribute working_directory"), CodePointer
NamedTuple(
"_PackageCodePointer",
[("module", str), ("attribute", str), ("working_directory", Optional[str])],
),
CodePointer,
):
def __new__(cls, module, attribute, working_directory=None):
def __new__(cls, module: str, attribute: str, working_directory: Optional[str] = None):
return super(PackageCodePointer, cls).__new__(
cls,
check.str_param(module, "module"),
Expand Down Expand Up @@ -244,12 +255,22 @@ def get_python_file_from_target(target):

@whitelist_for_serdes
class CustomPointer(
namedtuple(
"_CustomPointer", "reconstructor_pointer reconstructable_args reconstructable_kwargs"
NamedTuple(
"_CustomPointer",
[
("reconstructor_pointer", ModuleCodePointer),
("reconstructable_args", List[object]),
("reconstructable_kwargs", List[List]),
],
),
CodePointer,
):
def __new__(cls, reconstructor_pointer, reconstructable_args, reconstructable_kwargs):
def __new__(
cls,
reconstructor_pointer: ModuleCodePointer,
reconstructable_args: List[object],
reconstructable_kwargs: List[List],
):
check.inst_param(reconstructor_pointer, "reconstructor_pointer", ModuleCodePointer)
# These are lists rather than tuples to circumvent the tuple serdes machinery -- since these
# are user-provided, they aren't whitelisted for serdes.
Expand Down
22 changes: 14 additions & 8 deletions python_modules/dagster/dagster/core/debug.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import namedtuple
from typing import List, NamedTuple

from dagster import check
from dagster.core.events.log import EventLogEntry
Expand All @@ -9,18 +9,24 @@

@whitelist_for_serdes
class DebugRunPayload(
namedtuple(
NamedTuple(
"_DebugRunPayload",
"version pipeline_run event_list pipeline_snapshot execution_plan_snapshot",
[
("version", str),
("pipeline_run", PipelineRun),
("event_list", List[EventLogEntry]),
("pipeline_snapshot", PipelineSnapshot),
("execution_plan_snapshot", ExecutionPlanSnapshot),
],
)
):
def __new__(
cls,
version,
pipeline_run,
event_list,
pipeline_snapshot,
execution_plan_snapshot,
version: str,
pipeline_run: PipelineRun,
event_list: List[EventLogEntry],
pipeline_snapshot: PipelineSnapshot,
execution_plan_snapshot: ExecutionPlanSnapshot,
):
return super(DebugRunPayload, cls).__new__(
cls,
Expand Down
5 changes: 3 additions & 2 deletions python_modules/dagster/dagster/core/definitions/dependency.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from collections import defaultdict, namedtuple
from collections import defaultdict
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -262,7 +262,8 @@ def value_to_storage_dict(
@whitelist_for_serdes(serializer=NodeHandleSerializer)
class NodeHandle(
# mypy does not yet support recursive types
namedtuple("_NodeHandle", "name parent")
# NamedTuple("_NodeHandle", [("name", str), ("parent", Optional["NodeHandle"])])
NamedTuple("_NodeHandle", [("name", str), ("parent", Any)])
):
"""
A structured object to identify nodes in the potentially recursive graph structure.
Expand Down
29 changes: 11 additions & 18 deletions python_modules/dagster/dagster/core/definitions/graph_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,32 +804,25 @@ def _validate_in_mappings(
solid_input_handle = SolidInputHandle(target_solid, target_input)

if mapping.maps_to_fan_in:
maps_to = cast(FanInInputPointer, mapping.maps_to)
if not dependency_structure.has_fan_in_deps(solid_input_handle):
raise DagsterInvalidDefinitionError(
"In {class_name} '{name}' input mapping target "
'"{mapping.maps_to.solid_name}.{mapping.maps_to.input_name}" (index {mapping.maps_to.fan_in_index} of fan-in) '
"is not a MultiDependencyDefinition.".format(
name=name, mapping=mapping, class_name=class_name
)
f"In {class_name} '{name}' input mapping target "
f'"{maps_to.solid_name}.{maps_to.input_name}" (index {maps_to.fan_in_index} of fan-in) '
f"is not a MultiDependencyDefinition."
)
inner_deps = dependency_structure.get_fan_in_deps(solid_input_handle)
if (mapping.maps_to.fan_in_index >= len(inner_deps)) or (
inner_deps[mapping.maps_to.fan_in_index] is not MappedInputPlaceholder
if (maps_to.fan_in_index >= len(inner_deps)) or (
inner_deps[maps_to.fan_in_index] is not MappedInputPlaceholder
):
raise DagsterInvalidDefinitionError(
"In {class_name} '{name}' input mapping target "
'"{mapping.maps_to.solid_name}.{mapping.maps_to.input_name}" index {mapping.maps_to.fan_in_index} in '
"the MultiDependencyDefinition is not a MappedInputPlaceholder".format(
name=name, mapping=mapping, class_name=class_name
)
)
mapping_keys.add(
"{mapping.maps_to.solid_name}.{mapping.maps_to.input_name}.{mapping.maps_to.fan_in_index}".format(
mapping=mapping
f"In {class_name} '{name}' input mapping target "
f'"{maps_to.solid_name}.{maps_to.input_name}" index {maps_to.fan_in_index} in '
f"the MultiDependencyDefinition is not a MappedInputPlaceholder"
)
)
mapping_keys.add(f"{maps_to.solid_name}.{maps_to.input_name}.{maps_to.fan_in_index}")
target_type = target_input.dagster_type.get_inner_type_for_fan_in()
fan_in_msg = " (index {} of fan-in)".format(mapping.maps_to.fan_in_index)
fan_in_msg = " (index {} of fan-in)".format(maps_to.fan_in_index)
else:
if dependency_structure.has_deps(solid_input_handle):
raise DagsterInvalidDefinitionError(
Expand Down
15 changes: 11 additions & 4 deletions python_modules/dagster/dagster/core/definitions/hook_definition.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from collections import namedtuple
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Optional
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, NamedTuple, Optional

from dagster import check

Expand All @@ -12,7 +11,15 @@


class HookDefinition(
namedtuple("_HookDefinition", "name hook_fn required_resource_keys decorated_fn")
NamedTuple(
"_HookDefinition",
[
("name", str),
("hook_fn", Callable),
("required_resource_keys", AbstractSet[str]),
("decorated_fn", Callable),
],
)
):
"""Define a hook which can be triggered during a op execution (e.g. a callback on the step
execution failure event during a op execution).
Expand All @@ -38,7 +45,7 @@ def __new__(
required_resource_keys=frozenset(
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
),
decorated_fn=check.opt_callable_param(decorated_fn, "decorated_fn"),
decorated_fn=check.callable_param(decorated_fn, "decorated_fn"),
)

def __call__(self, *args, **kwargs):
Expand Down
23 changes: 16 additions & 7 deletions python_modules/dagster/dagster/core/definitions/input.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import namedtuple
from typing import NamedTuple, Optional, Set
from typing import NamedTuple, Optional, Set, Union

from dagster import check
from dagster.core.definitions.events import AssetKey
Expand Down Expand Up @@ -286,17 +286,21 @@ def _checked_inferred_type(inferred: InferredInputProps) -> DagsterType:
return resolved_type


class InputPointer(namedtuple("_InputPointer", "solid_name input_name")):
def __new__(cls, solid_name, input_name):
class InputPointer(NamedTuple("_InputPointer", [("solid_name", str), ("input_name", str)])):
def __new__(cls, solid_name: str, input_name: str):
return super(InputPointer, cls).__new__(
cls,
check.str_param(solid_name, "solid_name"),
check.str_param(input_name, "input_name"),
)


class FanInInputPointer(namedtuple("_FanInInputPointer", "solid_name input_name fan_in_index")):
def __new__(cls, solid_name, input_name, fan_in_index):
class FanInInputPointer(
NamedTuple(
"_FanInInputPointer", [("solid_name", str), ("input_name", str), ("fan_in_index", int)]
)
):
def __new__(cls, solid_name: str, input_name: str, fan_in_index: int):
return super(FanInInputPointer, cls).__new__(
cls,
check.str_param(solid_name, "solid_name"),
Expand All @@ -305,7 +309,12 @@ def __new__(cls, solid_name, input_name, fan_in_index):
)


class InputMapping(namedtuple("_InputMapping", "definition maps_to")):
class InputMapping(
NamedTuple(
"_InputMapping",
[("definition", InputDefinition), ("maps_to", Union[InputPointer, FanInInputPointer])],
)
):
"""Defines an input mapping for a composite solid.
Args:
Expand All @@ -314,7 +323,7 @@ class InputMapping(namedtuple("_InputMapping", "definition maps_to")):
input_name (str): The name of the input to the child solid onto which to map the input.
"""

def __new__(cls, definition, maps_to):
def __new__(cls, definition: InputDefinition, maps_to: Union[InputPointer, FanInInputPointer]):
return super(InputMapping, cls).__new__(
cls,
check.inst_param(definition, "definition", InputDefinition),
Expand Down

0 comments on commit 47b9a68

Please sign in to comment.