Skip to content

Commit

Permalink
namedtuple to NamedTuple (3rd batch) (#6767)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 25, 2022
1 parent 032f442 commit e0c66db
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict, namedtuple
from datetime import datetime
from typing import Dict, List, Mapping, NamedTuple, Optional, Sequence, Set, Tuple
from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Sequence, Set, Tuple

from dagster import StaticPartitionsDefinition, check
from dagster.core.asset_defs import SourceAsset
Expand Down Expand Up @@ -196,18 +196,34 @@ def __new__(

@whitelist_for_serdes
class ExternalPresetData(
namedtuple("_ExternalPresetData", "name run_config solid_selection mode tags")
NamedTuple(
"_ExternalPresetData",
[
("name", str),
("run_config", Dict[str, Any]),
("solid_selection", Optional[List[str]]),
("mode", str),
("tags", Dict[str, str]),
],
)
):
def __new__(cls, name, run_config, solid_selection, mode, tags):
def __new__(
cls,
name: str,
run_config: Dict[str, Any],
solid_selection: Optional[List[str]],
mode: str,
tags: Dict[str, str],
):
return super(ExternalPresetData, cls).__new__(
cls,
name=check.str_param(name, "name"),
run_config=check.opt_dict_param(run_config, "run_config"),
run_config=check.opt_dict_param(run_config, "run_config", key_type=str),
solid_selection=check.opt_nullable_list_param(
solid_selection, "solid_selection", of_type=str
),
mode=check.str_param(mode, "mode"),
tags=check.opt_dict_param(tags, "tags"),
tags=check.opt_dict_param(tags, "tags", key_type=str, value_type=str),
)


Expand Down Expand Up @@ -268,9 +284,9 @@ def __new__(

@whitelist_for_serdes
class ExternalScheduleExecutionErrorData(
namedtuple("_ExternalScheduleExecutionErrorData", "error")
NamedTuple("_ExternalScheduleExecutionErrorData", [("error", Optional[SerializableErrorInfo])])
):
def __new__(cls, error):
def __new__(cls, error: Optional[SerializableErrorInfo]):
return super(ExternalScheduleExecutionErrorData, cls).__new__(
cls,
error=check.opt_inst_param(error, "error", SerializableErrorInfo),
Expand All @@ -279,17 +295,12 @@ def __new__(cls, error):

@whitelist_for_serdes
class ExternalTargetData(
namedtuple(
NamedTuple(
"_ExternalTargetData",
"pipeline_name mode solid_selection",
[("pipeline_name", str), ("mode", str), ("solid_selection", Optional[List[str]])],
)
):
def __new__(
cls,
pipeline_name,
mode,
solid_selection,
):
def __new__(cls, pipeline_name: str, mode: str, solid_selection: Optional[List[str]]):
return super(ExternalTargetData, cls).__new__(
cls,
pipeline_name=check.str_param(pipeline_name, "pipeline_name"),
Expand Down
62 changes: 41 additions & 21 deletions python_modules/dagster/dagster/core/host_representation/selector.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
from collections import namedtuple
from typing import List, NamedTuple, Optional

from dagster import check


class PipelineSelector(
namedtuple("_PipelineSelector", "location_name repository_name pipeline_name solid_selection")
NamedTuple(
"_PipelineSelector",
[
("location_name", str),
("repository_name", str),
("pipeline_name", str),
("solid_selection", Optional[List[str]]),
],
)
):
"""
The information needed to resolve a pipeline within a host process.
"""

def __new__(
cls,
location_name,
repository_name,
pipeline_name,
solid_selection,
location_name: str,
repository_name: str,
pipeline_name: str,
solid_selection: Optional[List[str]],
):
return super(PipelineSelector, cls).__new__(
cls,
Expand Down Expand Up @@ -45,8 +53,10 @@ def with_solid_selection(self, solid_selection):
)


class RepositorySelector(namedtuple("_RepositorySelector", "location_name repository_name")):
def __new__(cls, location_name, repository_name):
class RepositorySelector(
NamedTuple("_RepositorySelector", [("location_name", str), ("repository_name", str)])
):
def __new__(cls, location_name: str, repository_name: str):
return super(RepositorySelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
Expand All @@ -68,9 +78,12 @@ def from_graphql_input(graphql_data):


class ScheduleSelector(
namedtuple("_ScheduleSelector", "location_name repository_name schedule_name")
NamedTuple(
"_ScheduleSelector",
[("location_name", str), ("repository_name", str), ("schedule_name", str)],
)
):
def __new__(cls, location_name, repository_name, schedule_name):
def __new__(cls, location_name: str, repository_name: str, schedule_name: str):
return super(ScheduleSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
Expand All @@ -94,8 +107,12 @@ def from_graphql_input(graphql_data):
)


class SensorSelector(namedtuple("_SensorSelector", "location_name repository_name sensor_name")):
def __new__(cls, location_name, repository_name, sensor_name):
class SensorSelector(
NamedTuple(
"_SensorSelector", [("location_name", str), ("repository_name", str), ("sensor_name", str)]
)
):
def __new__(cls, location_name: str, repository_name: str, sensor_name: str):
return super(SensorSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
Expand All @@ -119,8 +136,12 @@ def from_graphql_input(graphql_data):
)


class InstigationSelector(namedtuple("_InstigationSelector", "location_name repository_name name")):
def __new__(cls, location_name, repository_name, name):
class InstigationSelector(
NamedTuple(
"_InstigationSelector", [("location_name", str), ("repository_name", str), ("name", str)]
)
):
def __new__(cls, location_name: str, repository_name: str, name: str):
return super(InstigationSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
Expand All @@ -144,17 +165,16 @@ def from_graphql_input(graphql_data):
)


class GraphSelector(namedtuple("_GraphSelector", "location_name repository_name graph_name")):
class GraphSelector(
NamedTuple(
"_GraphSelector", [("location_name", str), ("repository_name", str), ("graph_name", str)]
)
):
"""
The information needed to resolve a graph within a host process.
"""

def __new__(
cls,
location_name,
repository_name,
graph_name,
):
def __new__(cls, location_name: str, repository_name: str, graph_name: str):
return super(GraphSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
Expand Down
53 changes: 43 additions & 10 deletions python_modules/dagster/dagster/core/scheduler/instigation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import namedtuple
from enum import Enum
from inspect import Parameter
from typing import Any, Dict, Mapping, NamedTuple, Type
from typing import Any, Dict, Mapping, NamedTuple, Optional, Type, Union

from dagster import check
from dagster.core.definitions.run_request import InstigatorType
Expand Down Expand Up @@ -37,9 +37,23 @@ class InstigatorStatus(Enum):

@whitelist_for_serdes
class SensorInstigatorData(
namedtuple("_SensorInstigatorData", "last_tick_timestamp last_run_key min_interval cursor")
NamedTuple(
"_SensorInstigatorData",
[
("last_tick_timestamp", Optional[float]),
("last_run_key", Optional[str]),
("min_interval", Optional[int]),
("cursor", Optional[str]),
],
)
):
def __new__(cls, last_tick_timestamp=None, last_run_key=None, min_interval=None, cursor=None):
def __new__(
cls,
last_tick_timestamp: Optional[float] = None,
last_run_key: Optional[str] = None,
min_interval: Optional[int] = None,
cursor: Optional[str] = None,
):
return super(SensorInstigatorData, cls).__new__(
cls,
check.opt_float_param(last_tick_timestamp, "last_tick_timestamp"),
Expand All @@ -56,10 +70,12 @@ def __new__(cls, last_tick_timestamp=None, last_run_key=None, min_interval=None,

@whitelist_for_serdes
class ScheduleInstigatorData(
namedtuple("_ScheduleInstigatorData", "cron_schedule start_timestamp")
NamedTuple(
"_ScheduleInstigatorData", [("cron_schedule", str), ("start_timestamp", Optional[float])]
)
):
# removed scheduler, 1/5/2022 (0.13.13)
def __new__(cls, cron_schedule, start_timestamp=None):
def __new__(cls, cron_schedule: str, start_timestamp: Optional[float] = None):
return super(ScheduleInstigatorData, cls).__new__(
cls,
check.str_param(cron_schedule, "cron_schedule"),
Expand All @@ -75,7 +91,10 @@ def __new__(cls, cron_schedule, start_timestamp=None):
ScheduleJobData = ScheduleInstigatorData


def check_instigator_data(instigator_type, instigator_data):
def check_instigator_data(
instigator_type: InstigatorType,
instigator_data: Optional[Union[ScheduleInstigatorData, SensorInstigatorData]],
):
if instigator_type == InstigatorType.SCHEDULE:
check.inst_param(instigator_data, "instigator_data", ScheduleInstigatorData)
elif instigator_type == InstigatorType.SENSOR:
Expand Down Expand Up @@ -138,9 +157,23 @@ def value_to_storage_dict(

@whitelist_for_serdes(serializer=InstigatorStateSerializer)
class InstigatorState(
namedtuple("_InstigationState", "origin instigator_type status instigator_data")
NamedTuple(
"_InstigationState",
[
("origin", ExternalInstigatorOrigin),
("instigator_type", InstigatorType),
("status", InstigatorStatus),
("instigator_data", Optional[Union[ScheduleInstigatorData, SensorInstigatorData]]),
],
)
):
def __new__(cls, origin, instigator_type, status, instigator_data=None):
def __new__(
cls,
origin: ExternalInstigatorOrigin,
instigator_type: InstigatorType,
status: InstigatorStatus,
instigator_data: Optional[Union[ScheduleInstigatorData, SensorInstigatorData]] = None,
):
return super(InstigatorState, cls).__new__(
cls,
check.inst_param(origin, "origin", ExternalInstigatorOrigin),
Expand Down Expand Up @@ -250,8 +283,8 @@ def value_to_storage_dict(


@whitelist_for_serdes(serializer=TickSerializer)
class InstigatorTick(namedtuple("_InstigatorTick", "tick_id tick_data")):
def __new__(cls, tick_id, tick_data):
class InstigatorTick(NamedTuple("_InstigatorTick", [("tick_id", int), ("tick_data", "TickData")])):
def __new__(cls, tick_id: int, tick_data: "TickData"):
return super(InstigatorTick, cls).__new__(
cls,
check.int_param(tick_id, "tick_id"),
Expand Down
20 changes: 17 additions & 3 deletions python_modules/dagster/dagster/core/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import os
from collections import namedtuple
from typing import List, NamedTuple

from dagster import check
from dagster.config import Field
Expand Down Expand Up @@ -28,9 +28,23 @@ class DagsterScheduleDoesNotExist(DagsterSchedulerError):


class SchedulerDebugInfo(
namedtuple("SchedulerDebugInfo", "errors scheduler_config_info scheduler_info schedule_storage")
NamedTuple(
"SchedulerDebugInfo",
[
("errors", List[str]),
("scheduler_config_info", str),
("scheduler_info", str),
("schedule_storage", List[str]),
],
)
):
def __new__(cls, errors, scheduler_config_info, scheduler_info, schedule_storage):
def __new__(
cls,
errors: List[str],
scheduler_config_info: str,
scheduler_info: str,
schedule_storage: List[str],
):
return super(SchedulerDebugInfo, cls).__new__(
cls,
errors=check.list_param(errors, "errors", of_type=str),
Expand Down

0 comments on commit e0c66db

Please sign in to comment.