Skip to content

Commit

Permalink
namedtuple to NamedTuple (5th batch) (#6917)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Mar 7, 2022
1 parent ca7aedf commit d1d6003
Show file tree
Hide file tree
Showing 29 changed files with 698 additions and 336 deletions.
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/api/snapshot_execution_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def sync_get_external_execution_plan_grpc(

check.inst_param(api_client, "api_client", DagsterGrpcClient)
check.inst_param(pipeline_origin, "pipeline_origin", ExternalPipelineOrigin)
check.opt_list_param(solid_selection, "solid_selection", of_type=str)
check.dict_param(run_config, "run_config", key_type=str)
solid_selection = check.opt_list_param(solid_selection, "solid_selection", of_type=str)
run_config = check.dict_param(run_config, "run_config", key_type=str)
check.str_param(mode, "mode")
check.opt_nullable_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str)
check.str_param(pipeline_snapshot_id, "pipeline_snapshot_id")
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/api/snapshot_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def sync_get_external_pipeline_subset_grpc(
from dagster.grpc.client import DagsterGrpcClient

check.inst_param(api_client, "api_client", DagsterGrpcClient)
check.inst_param(pipeline_origin, "pipeline_origin", ExternalPipelineOrigin)
check.opt_list_param(solid_selection, "solid_selection", of_type=str)
pipeline_origin = check.inst_param(pipeline_origin, "pipeline_origin", ExternalPipelineOrigin)
solid_selection = check.opt_list_param(solid_selection, "solid_selection", of_type=str)

result = deserialize_as(
api_client.external_pipeline_subset(
Expand Down
19 changes: 19 additions & 0 deletions python_modules/dagster/dagster/check/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,25 @@ def opt_set_param(obj: object, param_name: str, of_type: TypeOrTupleOfTypes = No
return _check_set_items(obj, of_type)


def opt_nullable_set_param(
obj: object, param_name: str, of_type: Optional[TypeOrTupleOfTypes] = None
) -> Optional[AbstractSet]:
"""Ensures argument obj is a set or None. Returns None if input is None.
and returns it.
If the of_type argument is provided, also ensures that list items conform to the type specified
by of_type.
"""
if obj is None:
return None
elif not isinstance(obj, (frozenset, set)):
raise _param_type_mismatch_exception(obj, (frozenset, set), param_name)
elif not of_type:
return obj

return _check_set_items(obj, of_type)


def _check_set_items(obj_set: AbstractSet, of_type: TypeOrTupleOfTypes) -> AbstractSet:
for obj in obj_set:

Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def _execute_step_command_body(
success = verify_step(
instance,
pipeline_run,
args.known_state.get_retry_state(),
check.not_none(args.known_state).get_retry_state(),
args.step_keys_to_execute,
)
if not success:
Expand Down
35 changes: 28 additions & 7 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from typing import Any, Callable, Dict, Mapping, Optional, Sequence, Set, Union, cast, overload
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Mapping,
Optional,
Sequence,
Set,
Union,
cast,
overload,
)

from dagster import check
from dagster.builtins import Nothing
Expand Down Expand Up @@ -191,15 +203,19 @@ def partition_fn(context): # pylint: disable=function-redefined
},
)(fn)

# NOTE: we can `cast` below because we know the Ins returned by `build_asset_ins` always
# have a plain AssetKey asset key. Dynamic asset keys will be deprecated in 0.15.0, when
# they are gone we can remove this cast.
return AssetsDefinition(
input_names_by_asset_key={
in_def.asset_key: input_name for input_name, in_def in asset_ins.items()
cast(AssetKey, in_def.asset_key): input_name
for input_name, in_def in asset_ins.items()
},
output_names_by_asset_key={out_asset_key: "result"},
op=op,
partitions_def=self.partitions_def,
partition_mappings={
asset_ins[input_name].asset_key: partition_mapping
cast(AssetKey, asset_ins[input_name].asset_key): partition_mapping
for input_name, partition_mapping in self.partition_mappings.items()
}
if self.partition_mappings
Expand Down Expand Up @@ -264,12 +280,16 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
tags={"kind": compute_kind} if compute_kind else None,
)(fn)

# NOTE: we can `cast` below because we know the Ins returned by `build_asset_ins` always
# have a plain AssetKey asset key. Dynamic asset keys will be deprecated in 0.15.0, when
# they are gone we can remove this cast.
return AssetsDefinition(
input_names_by_asset_key={
in_def.asset_key: input_name for input_name, in_def in asset_ins.items()
cast(AssetKey, in_def.asset_key): input_name
for input_name, in_def in asset_ins.items()
},
output_names_by_asset_key={
out_def.asset_key: output_name for output_name, out_def in asset_outs.items() # type: ignore
cast(AssetKey, out_def.asset_key): output_name for output_name, out_def in asset_outs.items() # type: ignore
},
op=op,
)
Expand Down Expand Up @@ -331,7 +351,7 @@ def build_asset_ins(
fn: Callable,
asset_namespace: Optional[Sequence[str]],
asset_ins: Mapping[str, AssetIn],
non_argument_deps: Optional[Set[AssetKey]],
non_argument_deps: Optional[AbstractSet[AssetKey]],
) -> Dict[str, In]:

non_argument_deps = check.opt_set_param(non_argument_deps, "non_argument_deps", AssetKey)
Expand Down Expand Up @@ -381,6 +401,7 @@ def build_asset_ins(
for asset_key in non_argument_deps:
stringified_asset_key = "_".join(asset_key.path)
if stringified_asset_key:
ins[stringified_asset_key] = In(dagster_type=Nothing, asset_key=asset_key)
# cast due to mypy bug-- doesn't understand Nothing is a type
ins[stringified_asset_key] = In(dagster_type=cast(type, Nothing), asset_key=asset_key)

return ins
48 changes: 30 additions & 18 deletions python_modules/dagster/dagster/core/definitions/input.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import namedtuple
from typing import NamedTuple, Optional, Set, Union
from types import FunctionType
from typing import TYPE_CHECKING, Any, Callable, Mapping, NamedTuple, Optional, Set, Type, Union

from dagster import check
from dagster.core.definitions.events import AssetKey
Expand All @@ -15,6 +15,9 @@
from .inference import InferredInputProps
from .utils import NoValueSentinel, check_valid_name

if TYPE_CHECKING:
from dagster.core.execution.context.input import InputContext


# unfortunately since type_check functions need TypeCheckContext which is only available
# at runtime, we can only check basic types before runtime
Expand Down Expand Up @@ -344,10 +347,17 @@ def describe(self) -> str:


class In(
namedtuple(
NamedTuple(
"_In",
"dagster_type description default_value root_manager_key metadata "
"asset_key asset_partitions",
[
("dagster_type", Optional[Union[type, DagsterType]]),
("description", Optional[str]),
("default_value", Any),
("root_manager_key", Optional[str]),
("metadata", Optional[Mapping[str, Any]]),
("asset_key", Optional[Union[AssetKey, Callable[["InputContext"], AssetKey]]]),
("asset_partitions", Optional[Union[Set[str], Callable[["InputContext"], Set[str]]]]),
],
)
):
"""
Expand Down Expand Up @@ -376,23 +386,25 @@ class In(

def __new__(
cls,
dagster_type=NoValueSentinel,
description=None,
default_value=NoValueSentinel,
root_manager_key=None,
metadata=None,
asset_key=None,
asset_partitions=None,
dagster_type: Optional[Union[Type, DagsterType]] = NoValueSentinel,
description: Optional[str] = None,
default_value: Any = NoValueSentinel,
root_manager_key: Optional[str] = None,
metadata: Optional[Mapping[str, Any]] = None,
asset_key: Optional[Union[AssetKey, Callable[["InputContext"], AssetKey]]] = None,
asset_partitions: Optional[Union[Set[str], Callable[["InputContext"], Set[str]]]] = None,
):
return super(In, cls).__new__(
cls,
dagster_type=dagster_type,
description=description,
dagster_type=check.opt_inst_param(dagster_type, "dagster_type", (type, DagsterType)),
description=check.opt_str_param(description, "description"),
default_value=default_value,
root_manager_key=root_manager_key,
metadata=metadata,
asset_key=asset_key,
asset_partitions=asset_partitions,
root_manager_key=check.opt_str_param(root_manager_key, "root_manager_key"),
metadata=check.opt_dict_param(metadata, "metadata", key_type=str),
asset_key=check.opt_inst_param(asset_key, "asset_key", (AssetKey, FunctionType)),
asset_partitions=check.opt_inst_param(
asset_partitions, "asset_partitions", (Set[str], FunctionType)
),
)

@staticmethod
Expand Down
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def gen_ignore_duplicate_run_worker():
# note that when we receive the solids to execute via PipelineRun, it won't support
# solid selection query syntax
pipeline = pipeline.subset_for_execution_from_existing_pipeline(
pipeline_run.solids_to_execute
frozenset(pipeline_run.solids_to_execute)
)

execution_plan = _get_execution_plan_from_run(pipeline, pipeline_run, instance)
Expand Down Expand Up @@ -211,7 +211,7 @@ def execute_run(
# note that when we receive the solids to execute via PipelineRun, it won't support
# solid selection query syntax
pipeline = pipeline.subset_for_execution_from_existing_pipeline(
pipeline_run.solids_to_execute
frozenset(pipeline_run.solids_to_execute)
)

execution_plan = _get_execution_plan_from_run(pipeline, pipeline_run, instance)
Expand Down Expand Up @@ -737,7 +737,7 @@ def create_execution_plan(
run_config: Optional[dict] = None,
mode: Optional[str] = None,
step_keys_to_execute: Optional[List[str]] = None,
known_state: KnownExecutionState = None,
known_state: Optional[KnownExecutionState] = None,
instance_ref: Optional[InstanceRef] = None,
tags: Optional[Dict[str, str]] = None,
) -> ExecutionPlan:
Expand Down
40 changes: 24 additions & 16 deletions python_modules/dagster/dagster/core/execution/backfill.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import namedtuple
from enum import Enum
from typing import Dict, List, NamedTuple, Optional

from dagster import check
from dagster.core.execution.plan.resume_retry import get_retry_steps_from_parent_run
Expand Down Expand Up @@ -45,26 +45,34 @@ def from_graphql_input(graphql_str):

@whitelist_for_serdes
class PartitionBackfill(
namedtuple(
NamedTuple(
"_PartitionBackfill",
(
"backfill_id partition_set_origin status partition_names from_failure "
"reexecution_steps tags backfill_timestamp last_submitted_partition_name error"
),
[
("backfill_id", str),
("partition_set_origin", ExternalPartitionSetOrigin),
("status", BulkActionStatus),
("partition_names", List[str]),
("from_failure", bool),
("reexecution_steps", List[str]),
("tags", Dict[str, str]),
("backfill_timestamp", float),
("last_submitted_partition_name", Optional[str]),
("error", Optional[SerializableErrorInfo]),
],
),
):
def __new__(
cls,
backfill_id,
partition_set_origin,
status,
partition_names,
from_failure,
reexecution_steps,
tags,
backfill_timestamp,
last_submitted_partition_name=None,
error=None,
backfill_id: str,
partition_set_origin: ExternalPartitionSetOrigin,
status: BulkActionStatus,
partition_names: List[str],
from_failure: bool,
reexecution_steps: List[str],
tags: Dict[str, str],
backfill_timestamp: float,
last_submitted_partition_name: Optional[str] = None,
error: Optional[SerializableErrorInfo] = None,
):
return super(PartitionBackfill, cls).__new__(
cls,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def _get_source_run_id_from_logs(self, step_output_handle: StepOutputHandle) ->

def _should_load_from_previous_runs(self, step_output_handle: StepOutputHandle) -> bool:
return ( # this is re-execution
self.pipeline_run.parent_run_id
self.pipeline_run.parent_run_id is not None
# we are not re-executing the entire pipeline
and self.pipeline_run.step_keys_to_execute is not None
# this step is not being executed
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/core/execution/host_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def execute_run_host_mode(

if pipeline_run.solids_to_execute:
pipeline = pipeline.subset_for_execution_from_existing_pipeline(
pipeline_run.solids_to_execute
frozenset(pipeline_run.solids_to_execute)
)

execution_plan_snapshot = instance.get_execution_plan_snapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def step_run_ref_to_step_context(
) -> StepExecutionContext:
check.inst_param(instance, "instance", DagsterInstance)
pipeline = step_run_ref.recon_pipeline.subset_for_execution_from_existing_pipeline(
step_run_ref.pipeline_run.solids_to_execute
frozenset(step_run_ref.pipeline_run.solids_to_execute or set())
)

execution_plan = create_execution_plan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _core_resource_initialization_event_generator(
pipeline_def_for_backwards_compat: Optional[PipelineDefinition],
):

pipeline_name = None
pipeline_name = "" # Must be initialized to a string to satisfy typechecker
contains_generator = False
if emit_persistent_events:
check.invariant(
Expand All @@ -124,7 +124,7 @@ def _core_resource_initialization_event_generator(
try:
if emit_persistent_events and resource_keys_to_init:
yield DagsterEvent.resource_init_start(
cast(str, pipeline_name),
pipeline_name,
cast(ExecutionPlan, execution_plan),
resource_log_manager,
resource_keys_to_init,
Expand Down Expand Up @@ -169,7 +169,7 @@ def _core_resource_initialization_event_generator(

if emit_persistent_events and resource_keys_to_init:
yield DagsterEvent.resource_init_success(
cast(str, pipeline_name),
pipeline_name,
cast(ExecutionPlan, execution_plan),
resource_log_manager,
resource_instances,
Expand All @@ -181,7 +181,7 @@ def _core_resource_initialization_event_generator(
# resource_keys_to_init cannot be empty
if emit_persistent_events:
yield DagsterEvent.resource_init_failure(
cast(str, pipeline_name),
pipeline_name,
cast(ExecutionPlan, execution_plan),
resource_log_manager,
resource_keys_to_init,
Expand Down

0 comments on commit d1d6003

Please sign in to comment.