Skip to content

Commit

Permalink
namedtuple to NamedTuple (4th batch) (#6785)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 25, 2022
1 parent e0c66db commit 6327300
Show file tree
Hide file tree
Showing 15 changed files with 298 additions and 103 deletions.
23 changes: 16 additions & 7 deletions python_modules/dagster/dagster/core/definitions/preset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import namedtuple
from typing import Dict, List, NamedTuple, Optional

import pkg_resources
import yaml
Expand All @@ -12,7 +12,16 @@


class PresetDefinition(
namedtuple("_PresetDefinition", "name run_config solid_selection mode tags")
NamedTuple(
"_PresetDefinition",
[
("name", str),
("run_config", Optional[Dict[str, object]]),
("solid_selection", Optional[List[str]]),
("mode", Optional[str]),
("tags", Dict[str, object]),
],
)
):
"""Defines a preset configuration in which a pipeline can execute.
Expand Down Expand Up @@ -43,11 +52,11 @@ class PresetDefinition(

def __new__(
cls,
name,
run_config=None,
solid_selection=None,
mode=None,
tags=None,
name: str,
run_config: Optional[Dict[str, object]] = None,
solid_selection: Optional[List[str]] = None,
mode: Optional[str] = None,
tags: Dict[str, object] = None,
):

return super(PresetDefinition, cls).__new__(
Expand Down
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/core/execution/plan/handle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import re
from typing import NamedTuple, Union, cast
from typing import NamedTuple, Optional, Union, cast

from dagster import check
from dagster.core.definitions.dependency import NodeHandle
Expand All @@ -10,7 +10,7 @@
class StepHandle(NamedTuple("_StepHandle", [("solid_handle", NodeHandle), ("key", str)])):
"""A reference to an ExecutionStep that was determined statically"""

def __new__(cls, solid_handle: NodeHandle, key: str = None):
def __new__(cls, solid_handle: NodeHandle, key: Optional[str] = None):
return super(StepHandle, cls).__new__(
cls,
solid_handle=check.inst_param(solid_handle, "solid_handle", NodeHandle),
Expand Down Expand Up @@ -68,7 +68,7 @@ class ResolvedFromDynamicStepHandle(
completed successfully.
"""

def __new__(cls, solid_handle: NodeHandle, mapping_key: str, key: str = None):
def __new__(cls, solid_handle: NodeHandle, mapping_key: str, key: Optional[str] = None):
return super(ResolvedFromDynamicStepHandle, cls).__new__(
cls,
solid_handle=check.inst_param(solid_handle, "solid_handle", NodeHandle),
Expand Down
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/core/execution/plan/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,11 @@ def get_step_output_handle_deps_with_placeholders(self) -> List[StepOutputHandle
return [self.source.get_step_output_handle_dep_with_placeholder()]


StepInputSourceTypes = (
StepInputSourceUnion = Union[
StepInputSource,
FromDynamicCollect,
FromUnresolvedStepOutput,
FromPendingDynamicStepOutput,
)
]

StepInputSourceTypes = StepInputSourceUnion.__args__ # type: ignore
56 changes: 44 additions & 12 deletions python_modules/dagster/dagster/core/execution/plan/step.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import abstractmethod, abstractproperty
from abc import abstractmethod
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand All @@ -7,6 +7,7 @@
List,
NamedTuple,
Optional,
Sequence,
Set,
Type,
Union,
Expand Down Expand Up @@ -56,16 +57,47 @@ def is_executable_step(step: Union["ExecutionStep", "UnresolvedMappedExecutionSt


class IExecutionStep:
@abstractproperty
def handle(self):
@property
@abstractmethod
def handle(self) -> Union[StepHandle, UnresolvedStepHandle, ResolvedFromDynamicStepHandle]:
pass

@property
@abstractmethod
def key(self) -> str:
pass

@property
@abstractmethod
def solid_handle(self) -> "NodeHandle":
pass

@property
@abstractmethod
def kind(self) -> StepKind:
pass

@property
@abstractmethod
def tags(self) -> Optional[Dict[str, str]]:
pass

@abstractproperty
def key(self):
@property
@abstractmethod
def step_inputs(
self,
) -> Sequence[Union[StepInput, UnresolvedCollectStepInput, UnresolvedMappedStepInput]]:
pass

@property
@abstractmethod
def step_outputs(self) -> Sequence[StepOutput]:
pass

@abstractproperty
def solid_handle(self):
@abstractmethod
def step_input_named(
self, name: str
) -> Union[StepInput, UnresolvedCollectStepInput, UnresolvedMappedStepInput]:
pass

@abstractmethod
Expand Down Expand Up @@ -100,7 +132,7 @@ def __new__(
step_outputs: List[StepOutput],
tags: Optional[Dict[str, str]],
logging_tags: Optional[Dict[str, str]] = None,
key: str = None,
key: Optional[str] = None,
):
return super(ExecutionStep, cls).__new__(
cls,
Expand Down Expand Up @@ -382,14 +414,14 @@ def key(self) -> str:
def kind(self) -> StepKind:
return StepKind.UNRESOLVED_COLLECT

@property
def step_outputs(self) -> List[StepOutput]:
return list(self.step_output_dict.values())

@property
def step_inputs(self) -> List[Union[StepInput, UnresolvedCollectStepInput]]:
return list(self.step_input_dict.values())

@property
def step_outputs(self) -> List[StepOutput]:
return list(self.step_output_dict.values())

def step_input_named(self, name: str) -> Union[StepInput, UnresolvedCollectStepInput]:
check.str_param(name, "name")
return self.step_input_dict[name]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,15 @@ def __new__(cls, pipeline_name: str, mode: str, solid_selection: Optional[List[s


@whitelist_for_serdes
class ExternalSensorMetadata(namedtuple("_ExternalSensorMetadata", "asset_keys")):
class ExternalSensorMetadata(
NamedTuple("_ExternalSensorMetadata", [("asset_keys", Optional[List[AssetKey]])])
):
"""Stores additional sensor metadata which is available on the Dagit frontend."""

def __new__(cls, asset_keys=None):
def __new__(cls, asset_keys: Optional[List[AssetKey]] = None):
return super(ExternalSensorMetadata, cls).__new__(
cls, asset_keys=check.opt_nullable_list_param(asset_keys, "asset_keys")
cls,
asset_keys=check.opt_nullable_list_param(asset_keys, "asset_keys", of_type=AssetKey),
)


Expand All @@ -327,22 +330,32 @@ def skip_when_empty(cls) -> Set[str]:

@whitelist_for_serdes(serializer=ExternalSensorDataSerializer)
class ExternalSensorData(
namedtuple(
NamedTuple(
"_ExternalSensorData",
"name pipeline_name solid_selection mode min_interval description target_dict metadata default_status",
[
("name", str),
("pipeline_name", Optional[str]),
("solid_selection", Optional[List[str]]),
("mode", Optional[str]),
("min_interval", Optional[int]),
("description", Optional[str]),
("target_dict", Dict[str, ExternalTargetData]),
("metadata", Optional[ExternalSensorMetadata]),
("default_status", Optional[DefaultSensorStatus]),
],
)
):
def __new__(
cls,
name,
pipeline_name=None,
solid_selection=None,
mode=None,
min_interval=None,
description=None,
target_dict=None,
metadata=None,
default_status=None,
name: str,
pipeline_name: Optional[str] = None,
solid_selection: Optional[List[str]] = None,
mode: Optional[str] = None,
min_interval: Optional[int] = None,
description: Optional[str] = None,
target_dict: Dict[str, ExternalTargetData] = None,
metadata: Optional[ExternalSensorMetadata] = None,
default_status: Optional[DefaultSensorStatus] = None,
):
if pipeline_name and not target_dict:
# handle the legacy case where the ExternalSensorData was constructed from an earlier
Expand Down
45 changes: 34 additions & 11 deletions python_modules/dagster/dagster/core/host_representation/handle.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
from collections import namedtuple
from typing import TYPE_CHECKING, Dict, NamedTuple

from dagster import check
from dagster.core.host_representation.origin import ExternalRepositoryOrigin
from dagster.core.host_representation.origin import (
ExternalRepositoryOrigin,
RepositoryLocationOrigin,
)
from dagster.core.host_representation.selector import PipelineSelector
from dagster.core.origin import RepositoryPythonOrigin

if TYPE_CHECKING:
from dagster.core.host_representation.repository_location import RepositoryLocation


class RepositoryHandle(
namedtuple(
NamedTuple(
"_RepositoryHandle",
"repository_name repository_location_origin repository_python_origin display_metadata",
[
("repository_name", str),
("repository_location_origin", RepositoryLocationOrigin),
("repository_python_origin", RepositoryPythonOrigin),
("display_metadata", Dict[str, str]),
],
)
):
def __new__(cls, repository_name, repository_location):
def __new__(cls, repository_name: str, repository_location: "RepositoryLocation"):
from dagster.core.host_representation.repository_location import RepositoryLocation

check.inst_param(repository_location, "repository_location", RepositoryLocation)
Expand All @@ -37,8 +49,10 @@ def get_python_origin(self):
return self.repository_python_origin


class PipelineHandle(namedtuple("_PipelineHandle", "pipeline_name repository_handle")):
def __new__(cls, pipeline_name, repository_handle):
class PipelineHandle(
NamedTuple("_PipelineHandle", [("pipeline_name", str), ("repository_handle", RepositoryHandle)])
):
def __new__(cls, pipeline_name: str, repository_handle: RepositoryHandle):
return super(PipelineHandle, cls).__new__(
cls,
check.str_param(pipeline_name, "pipeline_name"),
Expand Down Expand Up @@ -66,8 +80,12 @@ def to_selector(self):
return PipelineSelector(self.location_name, self.repository_name, self.pipeline_name, None)


class InstigatorHandle(namedtuple("_InstigatorHandle", "instigator_name repository_handle")):
def __new__(cls, job_name, repository_handle):
class InstigatorHandle(
NamedTuple(
"_InstigatorHandle", [("instigator_name", str), ("repository_handle", RepositoryHandle)]
)
):
def __new__(cls, job_name: str, repository_handle: RepositoryHandle):
return super(InstigatorHandle, cls).__new__(
cls,
check.str_param(job_name, "job_name"),
Expand All @@ -88,8 +106,13 @@ def get_external_origin(self):
)


class PartitionSetHandle(namedtuple("_PartitionSetHandle", "partition_set_name repository_handle")):
def __new__(cls, partition_set_name, repository_handle):
class PartitionSetHandle(
NamedTuple(
"_PartitionSetHandle",
[("partition_set_name", str), ("repository_handle", RepositoryHandle)],
)
):
def __new__(cls, partition_set_name: str, repository_handle: RepositoryHandle):
return super(PartitionSetHandle, cls).__new__(
cls,
check.str_param(partition_set_name, "partition_set_name"),
Expand Down

0 comments on commit 6327300

Please sign in to comment.