Skip to content

Commit

Permalink
Executor and dagster-docker typing (#7881)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed May 17, 2022
1 parent 17fa439 commit f6d63d1
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 58 deletions.
22 changes: 22 additions & 0 deletions python_modules/dagster/dagster/_check/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,28 @@ def opt_dict_elem(
return _check_mapping_entries(value, key_type, value_type, mapping_type=dict)


def opt_nullable_dict_elem(
obj: Dict[str, Any],
key: str,
key_type: Optional[TypeOrTupleOfTypes] = None,
value_type: Optional[TypeOrTupleOfTypes] = None,
additional_message: Optional[str] = None,
) -> Optional[Dict]:
from dagster.utils import frozendict

dict_param(obj, "obj")
str_param(key, "key")

value = obj.get(key)

if value is None:
return None
elif not isinstance(value, (frozendict, dict)):
raise _element_check_error(key, value, obj, dict, additional_message)
else:
return _check_mapping_entries(value, key_type, value_type, mapping_type=dict)


def is_dict(
obj: Dict[T, U],
key_type: Optional[TypeOrTupleOfTypes] = None,
Expand Down
112 changes: 77 additions & 35 deletions python_modules/dagster/dagster/core/definitions/executor_definition.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
from enum import Enum as PyEnum
from functools import update_wrapper
from typing import Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union, overload

from typing_extensions import TypeAlias

import dagster._check as check
from dagster.builtins import Int
from dagster.config import Field, Selector
from dagster.config.config_schema import ConfigSchemaType
from dagster.core.definitions.configurable import (
ConfiguredDefinitionConfigSchema,
NamedConfigurableDefinition,
)
from dagster.core.definitions.pipeline_base import IPipeline
from dagster.core.definitions.reconstruct import ReconstructablePipeline
from dagster.core.errors import DagsterUnmetExecutorRequirementsError
from dagster.core.execution.retries import RetryMode, get_retries_config

from .definition_config_schema import convert_user_facing_definition_config_schema
from .definition_config_schema import (
IDefinitionConfigSchema,
convert_user_facing_definition_config_schema,
)

if TYPE_CHECKING:
from dagster.core.executor.base import Executor
from dagster.core.executor.in_process import InProcessExecutor
from dagster.core.executor.init import InitExecutorContext
from dagster.core.executor.multiprocess import MultiprocessExecutor
from dagster.core.instance import DagsterInstance


class ExecutorRequirement(PyEnum):
Expand All @@ -33,14 +47,19 @@ class ExecutorRequirement(PyEnum):
PERSISTENT_OUTPUTS = "PERSISTENT_OUTPUTS"


def multiple_process_executor_requirements():
def multiple_process_executor_requirements() -> List[ExecutorRequirement]:
return [
ExecutorRequirement.RECONSTRUCTABLE_JOB,
ExecutorRequirement.NON_EPHEMERAL_INSTANCE,
ExecutorRequirement.PERSISTENT_OUTPUTS,
]


ExecutorConfig = Dict[str, object]
ExecutorCreationFunction: TypeAlias = Callable[["InitExecutorContext"], "Executor"]
ExecutorRequirementsFunction: TypeAlias = Callable[[ExecutorConfig], List[ExecutorRequirement]]


class ExecutorDefinition(NamedConfigurableDefinition):
"""
Args:
Expand All @@ -58,13 +77,16 @@ class ExecutorDefinition(NamedConfigurableDefinition):

def __init__(
self,
name,
config_schema=None,
requirements=None,
executor_creation_fn=None,
description=None,
name: str,
config_schema: Optional[ConfigSchemaType] = None,
requirements: Union[
ExecutorRequirementsFunction, Optional[List[ExecutorRequirement]]
] = None,
executor_creation_fn: Optional[ExecutorCreationFunction] = None,
description: Optional[str] = None,
):
self._name = check.str_param(name, "name")
self._requirements_fn: ExecutorRequirementsFunction
if callable(requirements):
self._requirements_fn = requirements
else:
Expand All @@ -79,25 +101,25 @@ def __init__(
self._description = check.opt_str_param(description, "description")

@property
def name(self):
def name(self) -> str:
return self._name

@property
def description(self):
def description(self) -> Optional[str]:
return self._description

@property
def config_schema(self):
def config_schema(self) -> IDefinitionConfigSchema:
return self._config_schema

def get_requirements(self, executor_config: Dict[str, Any]):
def get_requirements(self, executor_config: Dict[str, object]) -> List[ExecutorRequirement]:
return self._requirements_fn(executor_config)

@property
def executor_creation_fn(self):
def executor_creation_fn(self) -> Optional[ExecutorCreationFunction]:
return self._executor_creation_fn

def copy_for_configured(self, name, description, config_schema, _):
def copy_for_configured(self, name, description, config_schema, _) -> "ExecutorDefinition":
return ExecutorDefinition(
name=name,
config_schema=config_schema,
Expand Down Expand Up @@ -148,11 +170,25 @@ def configured(
)


@overload
def executor(name: ExecutorCreationFunction) -> ExecutorDefinition:
...


@overload
def executor(
name: Optional[str] = ...,
config_schema: Optional[ConfigSchemaType] = ...,
requirements: Optional[Union[ExecutorRequirementsFunction, List[ExecutorRequirement]]] = ...,
) -> "_ExecutorDecoratorCallable":
...


def executor(
name=None,
config_schema=None,
requirements=None,
):
name: Union[ExecutorCreationFunction, Optional[str]] = None,
config_schema: Optional[ConfigSchemaType] = None,
requirements: Optional[Union[ExecutorRequirementsFunction, List[ExecutorRequirement]]] = None,
) -> Union[ExecutorDefinition, "_ExecutorDecoratorCallable"]:
"""Define an executor.
The decorated function should accept an :py:class:`InitExecutorContext` and return an instance
Expand Down Expand Up @@ -181,7 +217,7 @@ def __init__(self, name=None, config_schema=None, requirements=None):
self.config_schema = config_schema # type check in definition
self.requirements = requirements

def __call__(self, fn):
def __call__(self, fn: ExecutorCreationFunction) -> ExecutorDefinition:
check.callable_param(fn, "fn")

if not self.name:
Expand All @@ -199,12 +235,12 @@ def __call__(self, fn):
return executor_def


def _core_in_process_executor_creation(config: Dict[str, Any]):
def _core_in_process_executor_creation(config: ExecutorConfig) -> "InProcessExecutor":
from dagster.core.executor.in_process import InProcessExecutor

return InProcessExecutor(
# shouldn't need to .get() here - issue with defaults in config setup
retries=RetryMode.from_config(config["retries"]),
retries=RetryMode.from_config(check.dict_elem(config, "retries")),
marker_to_close=config.get("marker_to_close"),
)

Expand Down Expand Up @@ -238,7 +274,7 @@ def in_process_executor(init_context):


@executor(name="execute_in_process_executor")
def execute_in_process_executor(_):
def execute_in_process_executor(_) -> "InProcessExecutor":
"""Executor used by execute_in_process.
Use of this executor triggers special behavior in the config system that ignores all incoming
Expand All @@ -253,21 +289,21 @@ def execute_in_process_executor(_):
)


def _core_multiprocess_executor_creation(config: Dict[str, Any]):
def _core_multiprocess_executor_creation(config: ExecutorConfig) -> "MultiprocessExecutor":
from dagster.core.executor.multiprocess import MultiprocessExecutor

# unpack optional selector
start_method = None
start_cfg = {}
start_selector = config.get("start_method")
start_cfg: Dict[str, object] = {}
start_selector = check.opt_dict_elem(config, "start_method")
if start_selector:
start_method, start_cfg = list(start_selector.items())[0]

return MultiprocessExecutor(
max_concurrent=config["max_concurrent"],
retries=RetryMode.from_config(config["retries"]),
max_concurrent=check.int_elem(config, "max_concurrent"),
retries=RetryMode.from_config(check.dict_elem(config, "retries")), # type: ignore
start_method=start_method,
explicit_forkserver_preload=start_cfg.get("preload_modules"),
explicit_forkserver_preload=check.opt_list_elem(start_cfg, "preload_modules", of_type=str),
)


Expand Down Expand Up @@ -334,7 +370,7 @@ def multiprocess_executor(init_context):
default_executors = [in_process_executor, multiprocess_executor]


def check_cross_process_constraints(init_context):
def check_cross_process_constraints(init_context: "InitExecutorContext") -> None:
from dagster.core.executor.init import InitExecutorContext

check.inst_param(init_context, "init_context", InitExecutorContext)
Expand All @@ -347,7 +383,7 @@ def check_cross_process_constraints(init_context):
_check_non_ephemeral_instance(init_context.instance)


def _check_intra_process_pipeline(pipeline):
def _check_intra_process_pipeline(pipeline: IPipeline) -> None:
from dagster.core.definitions import JobDefinition

if not isinstance(pipeline, ReconstructablePipeline):
Expand All @@ -363,7 +399,7 @@ def _check_intra_process_pipeline(pipeline):
)


def _check_non_ephemeral_instance(instance):
def _check_non_ephemeral_instance(instance: "DagsterInstance") -> None:
if instance.is_ephemeral:
raise DagsterUnmetExecutorRequirementsError(
"You have attempted to use an executor that uses multiple processes with an "
Expand All @@ -375,7 +411,9 @@ def _check_non_ephemeral_instance(instance):
)


def _get_default_executor_requirements(executor_config):
def _get_default_executor_requirements(
executor_config: ExecutorConfig,
) -> List[ExecutorRequirement]:
return multiple_process_executor_requirements() if "multiprocess" in executor_config else []


Expand All @@ -389,7 +427,7 @@ def _get_default_executor_requirements(executor_config):
),
requirements=_get_default_executor_requirements,
)
def multi_or_in_process_executor(init_context):
def multi_or_in_process_executor(init_context: "InitExecutorContext") -> "Executor":
"""The default executor for a job.
This is the executor available by default on a :py:class:`JobDefinition`
Expand Down Expand Up @@ -428,6 +466,10 @@ def multi_or_in_process_executor(init_context):
and negative numbers can be used.
"""
if "multiprocess" in init_context.executor_config:
return _core_multiprocess_executor_creation(init_context.executor_config["multiprocess"])
return _core_multiprocess_executor_creation(
check.dict_elem(init_context.executor_config, "multiprocess")
)
else:
return _core_in_process_executor_creation(init_context.executor_config["in_process"])
return _core_in_process_executor_creation(
check.dict_elem(init_context.executor_config, "in_process")
)
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ def create_executor(context_creation_data: ContextCreationData) -> "Executor":
instance=context_creation_data.instance,
)
check_cross_process_constraints(init_context)
return context_creation_data.executor_def.executor_creation_fn(init_context)
creation_fn = check.not_none(context_creation_data.executor_def.executor_creation_fn)
return creation_fn(init_context)


@contextmanager
Expand Down
3 changes: 2 additions & 1 deletion python_modules/dagster/dagster/core/execution/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class RetryMode(Enum):
DEFERRED = "deferred"

@staticmethod
def from_config(config_value: Dict[str, Dict]):
def from_config(config_value: Dict[str, Dict]) -> Optional["RetryMode"]:
for selector, _ in config_value.items():
return RetryMode(selector)
return None

@property
def enabled(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ def docker_executor(init_context: InitExecutorContext) -> Executor:
launcher will also be set on the containers that are created for each step.
"""

image = init_context.executor_config.get("image")
registry = init_context.executor_config.get("registry")
env_vars = init_context.executor_config.get("env_vars")
network = init_context.executor_config.get("network")
networks = init_context.executor_config.get("networks")
container_kwargs = init_context.executor_config.get("container_kwargs")
config = init_context.executor_config
image = check.opt_str_elem(config, "image")
registry = check.opt_dict_elem(config, "registry", key_type=str)
env_vars = check.opt_list_elem(config, "env_vars", of_type=str)
network = check.opt_str_elem(config, "network")
networks = check.opt_list_elem(config, "networks", of_type=str)
container_kwargs = check.opt_dict_elem(config, "container_kwargs", key_type=str)
retries = check.dict_elem(config, "retries", key_type=str)

validate_docker_config(network, networks, container_kwargs)

Expand All @@ -79,7 +81,7 @@ def docker_executor(init_context: InitExecutorContext) -> Executor:

return StepDelegatingExecutor(
DockerStepHandler(image, container_context),
retries=RetryMode.from_config(init_context.executor_config["retries"]),
retries=check.not_none(RetryMode.from_config(retries)),
)


Expand Down Expand Up @@ -190,10 +192,11 @@ def launch_step(self, step_handler_context: StepHandlerContext) -> List[DagsterE
network = client.networks.get(network_name)
network.connect(step_container)

assert (
len(step_handler_context.execute_step_args.step_keys_to_execute) == 1
), "Launching multiple steps is not currently supported"
step_key = step_handler_context.execute_step_args.step_keys_to_execute[0]
step_keys_to_execute = check.not_none(
step_handler_context.execute_step_args.step_keys_to_execute
)
assert len(step_keys_to_execute) == 1, "Launching multiple steps is not currently supported"
step_key = step_keys_to_execute[0]

events = [
DagsterEvent(
Expand All @@ -215,7 +218,10 @@ def launch_step(self, step_handler_context: StepHandlerContext) -> List[DagsterE
return events

def check_step_health(self, step_handler_context: StepHandlerContext) -> List[DagsterEvent]:
step_key = step_handler_context.execute_step_args.step_keys_to_execute[0]
step_keys_to_execute = check.not_none(
step_handler_context.execute_step_args.step_keys_to_execute
)
step_key = step_keys_to_execute[0]
container_context = self._get_docker_container_context(step_handler_context)

client = self._get_client(container_context)
Expand Down Expand Up @@ -281,10 +287,11 @@ def check_step_health(self, step_handler_context: StepHandlerContext) -> List[Da
def terminate_step(self, step_handler_context: StepHandlerContext) -> List[DagsterEvent]:
container_context = self._get_docker_container_context(step_handler_context)

assert (
len(step_handler_context.execute_step_args.step_keys_to_execute) == 1
), "Launching multiple steps is not currently supported"
step_key = step_handler_context.execute_step_args.step_keys_to_execute[0]
step_keys_to_execute = check.not_none(
step_handler_context.execute_step_args.step_keys_to_execute
)
assert len(step_keys_to_execute) == 1, "Launching multiple steps is not currently supported"
step_key = step_keys_to_execute[0]

events = [
DagsterEvent(
Expand All @@ -302,7 +309,7 @@ def terminate_step(self, step_handler_context: StepHandlerContext) -> List[Dagst
container = client.containers.get(
self._get_container_name(
step_handler_context.execute_step_args.pipeline_run_id,
step_handler_context.execute_step_args.step_keys_to_execute[0],
step_keys_to_execute[0],
)
)
container.stop()
Expand Down

0 comments on commit f6d63d1

Please sign in to comment.