Skip to content

Commit

Permalink
[top-level inputs 2/2] top level inputs on job and execute_in_process (
Browse files Browse the repository at this point in the history
…#7786)

* Initial pass at adding arbitrary python objects as input values

* Create placeholder types so that we can resolve to the correct dagster type at runtime

* Get rid of changes to RootInputConfig

* Fix docstring

* Fix lints

* Get rid of placeholders, update to use new input def finding mechanism

* Make input values work with non-serializable types

* Fix types, rename root -> direct

* Docstrings

* Add more tests, error when you send an input that isn't represented on top-level graph

* Add error message if no type loader is set, and no direct inputs are provided.

* Fixing tests

* Fix tests, nothing case

* Any -> object in type annotations

* Fix test_unconfigurable_inputs_pipeline

* Fix test

* Test api snapshot pipeline

* More in-depth check for whether input is resolvable in pipeline_def

* Properly detect unmapped input cases.

* Fix tests + lint + typing

* Fix tests and lint

* Style and clarity

* lint

* Fix nits
  • Loading branch information
dpeng817 committed May 12, 2022
1 parent f06689b commit 8455843
Show file tree
Hide file tree
Showing 18 changed files with 472 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import re

from dagster_graphql.test.utils import execute_dagster_graphql, infer_pipeline_selector

from .graphql_context_test_suite import NonLaunchableGraphQLContextTestMatrix
Expand Down Expand Up @@ -70,10 +68,10 @@ def test_pipeline_with_invalid_definition_error(self, graphql_context):
assert result.data
assert result.data["runConfigSchemaOrError"]["__typename"] == "InvalidSubsetError"

assert re.match(
(
r".*DagsterInvalidSubsetError[\s\S]*"
r"add a dagster_type_loader for the type 'InputTypeWithoutHydration'"
),
result.data["runConfigSchemaOrError"]["message"],
error_msg = result.data["runConfigSchemaOrError"]["message"]

assert "DagsterInvalidSubsetError" in error_msg
assert (
"Input 'some_input' of solid 'fail_subset' has no upstream output, no default value, and no dagster type loader."
in error_msg
)
11 changes: 7 additions & 4 deletions python_modules/dagster/dagster/core/definitions/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Callable,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Set,
Expand Down Expand Up @@ -572,6 +573,7 @@ def to_job(
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "JobDefinition":
if not isinstance(self.node_def, GraphDefinition):
raise DagsterInvalidInvocationError(
Expand All @@ -581,6 +583,7 @@ def to_job(

tags = check.opt_dict_param(tags, "tags", key_type=str)
hooks = check.opt_set_param(hooks, "hooks", HookDefinition)
input_values = check.opt_mapping_param(input_values, "input_values")
op_retry_policy = check.opt_inst_param(op_retry_policy, "op_retry_policy", RetryPolicy)
job_hooks: Set[HookDefinition] = set()
job_hooks.update(check.opt_set_param(hooks, "hooks", HookDefinition))
Expand All @@ -597,6 +600,7 @@ def to_job(
op_retry_policy=op_retry_policy,
version_strategy=version_strategy,
partitions_def=partitions_def,
input_values=input_values,
)

def execute_in_process(
Expand All @@ -606,6 +610,7 @@ def execute_in_process(
resources: Optional[Dict[str, Any]] = None,
raise_on_error: bool = True,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
if not isinstance(self.node_def, GraphDefinition):
raise DagsterInvalidInvocationError(
Expand All @@ -619,10 +624,7 @@ def execute_in_process(
from .executor_definition import execute_in_process_executor
from .job_definition import JobDefinition

if len(self.node_def.input_defs) > 0:
raise DagsterInvariantViolationError(
"Graphs with inputs cannot be used with execute_in_process at this time."
)
input_values = check.opt_mapping_param(input_values, "input_values")

ephemeral_job = JobDefinition(
name=self.given_alias,
Expand All @@ -632,6 +634,7 @@ def execute_in_process(
tags=self.tags,
hook_defs=self.hook_defs,
op_retry_policy=self.retry_policy,
_input_values=input_values,
)

return core_execute_in_process(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Dict, Optional, Union, overload
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
Mapping,
Optional,
Union,
overload,
)

import dagster._check as check
from dagster.core.decorator_utils import format_docstring_for_description
Expand Down Expand Up @@ -32,6 +42,7 @@ def __init__(
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
input_values: Optional[Mapping[str, object]] = None,
):
self.name = name
self.description = description
Expand All @@ -44,6 +55,7 @@ def __init__(
self.op_retry_policy = op_retry_policy
self.version_strategy = version_strategy
self.partitions_def = partitions_def
self.input_values = input_values

def __call__(self, fn: Callable[..., Any]) -> JobDefinition:
check.callable_param(fn, "fn")
Expand Down Expand Up @@ -93,6 +105,7 @@ def __call__(self, fn: Callable[..., Any]) -> JobDefinition:
op_retry_policy=self.op_retry_policy,
version_strategy=self.version_strategy,
partitions_def=self.partitions_def,
input_values=self.input_values,
)
update_wrapper(job_def, fn)
return job_def
Expand All @@ -115,6 +128,8 @@ def job(
hooks: Optional[AbstractSet[HookDefinition]] = ...,
op_retry_policy: Optional[RetryPolicy] = ...,
version_strategy: Optional[VersionStrategy] = ...,
partitions_def: Optional["PartitionsDefinition"] = ...,
input_values: Optional[Mapping[str, object]] = ...,
) -> _Job:
...

Expand All @@ -131,6 +146,7 @@ def job(
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> Union[JobDefinition, _Job]:
"""Creates a job with the specified parameters from the decorated graph/op invocation function.
Expand Down Expand Up @@ -179,6 +195,8 @@ def job(
partitions_def (Optional[PartitionsDefinition]): Defines a discrete set of partition keys
that can parameterize the job. If this argument is supplied, the config argument
can't also be supplied.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of a job.
"""
if callable(name):
Expand All @@ -197,4 +215,5 @@ def job(
op_retry_policy=op_retry_policy,
version_strategy=version_strategy,
partitions_def=partitions_def,
input_values=input_values,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Iterable,
Iterator,
List,
Mapping,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -462,6 +463,7 @@ def to_job(
op_selection: Optional[List[str]] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
asset_layer: Optional["AssetLayer"] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "JobDefinition":
"""
Make this graph in to an executable Job by providing remaining components required for execution.
Expand Down Expand Up @@ -512,6 +514,8 @@ def to_job(
argument can't also be supplied.
asset_layer (Optional[AssetLayer]): Top level information about the assets this job
will produce. Generally should not be set manually.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of a job.
Returns:
JobDefinition
Expand All @@ -526,6 +530,7 @@ def to_job(
executor_def = check.opt_inst_param(
executor_def, "executor_def", ExecutorDefinition, default=multi_or_in_process_executor
)
input_values = check.opt_mapping_param(input_values, "input_values")

if resource_defs and "io_manager" in resource_defs:
resource_defs_with_defaults = resource_defs
Expand Down Expand Up @@ -583,6 +588,7 @@ def to_job(
version_strategy=version_strategy,
op_retry_policy=op_retry_policy,
asset_layer=asset_layer,
_input_values=input_values,
).get_job_def_for_op_selection(op_selection)

def coerce_to_job(self):
Expand Down Expand Up @@ -623,6 +629,7 @@ def execute_in_process(
raise_on_error: bool = True,
op_selection: Optional[List[str]] = None,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
"""
Execute this graph in-process, collecting results in-memory.
Expand All @@ -646,6 +653,8 @@ def execute_in_process(
(downstream dependencies) within 3 levels down.
* ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of the graph.
Returns:
:py:class:`~dagster.ExecuteInProcessResult`
Expand All @@ -659,6 +668,7 @@ def execute_in_process(

instance = check.opt_inst_param(instance, "instance", DagsterInstance)
resources = check.opt_dict_param(resources, "resources", key_type=str)
input_values = check.opt_mapping_param(input_values, "input_values")

resource_defs = wrap_resources_for_execution(resources)

Expand All @@ -667,6 +677,7 @@ def execute_in_process(
graph_def=self,
executor_def=execute_in_process_executor,
resource_defs=resource_defs,
_input_values=input_values,
).get_job_def_for_op_selection(op_selection)

run_config = run_config if run_config is not None else {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
)
from dagster.core.definitions.node_definition import NodeDefinition
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidSubsetError
from dagster.core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvalidSubsetError,
)
from dagster.core.selector.subset_selector import (
LeafNodeSelection,
OpSelectionData,
parse_op_selection,
)
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.core.utils import str_format_set
from dagster.utils import merge_dicts

from .asset_layer import AssetLayer
from .config import ConfigMapping
Expand Down Expand Up @@ -74,6 +79,7 @@ def __init__(
version_strategy: Optional[VersionStrategy] = None,
_op_selection_data: Optional[OpSelectionData] = None,
asset_layer: Optional[AssetLayer] = None,
_input_values: Optional[Mapping[str, object]] = None,
):

# Exists for backcompat - JobDefinition is implemented as a single-mode pipeline.
Expand All @@ -89,6 +95,15 @@ def __init__(
self._op_selection_data = check.opt_inst_param(
_op_selection_data, "_op_selection_data", OpSelectionData
)
self._input_values: Mapping[str, object] = check.opt_mapping_param(
_input_values, "_input_values"
)
for input_name in sorted(list(self._input_values.keys())):
if not graph_def.has_input(input_name):
job_name = name or graph_def.name
raise DagsterInvalidDefinitionError(
f"Error when constructing JobDefinition '{job_name}': Input value provided for key '{input_name}', but job has no top-level input with that name."
)

super(JobDefinition, self).__init__(
name=name,
Expand Down Expand Up @@ -142,6 +157,7 @@ def execute_in_process(
raise_on_error: bool = True,
op_selection: Optional[List[str]] = None,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
"""
Execute the Job in-process, gathering results in-memory.
Expand All @@ -168,6 +184,8 @@ def execute_in_process(
(downstream dependencies) within 3 levels down.
* ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.
Returns:
:py:class:`~dagster.ExecuteInProcessResult`
Expand All @@ -178,6 +196,12 @@ def execute_in_process(
run_config = check.opt_dict_param(run_config, "run_config")
op_selection = check.opt_list_param(op_selection, "op_selection", str)
partition_key = check.opt_str_param(partition_key, "partition_key")
input_values = check.opt_mapping_param(input_values, "input_values")

# Combine provided input values at execute_in_process with input values
# provided to the definition. Input values provided at
# execute_in_process will override those provided on the definition.
input_values = merge_dicts(self._input_values, input_values)

resource_defs = dict(self.resource_defs)
logger_defs = dict(self.loggers)
Expand All @@ -194,6 +218,7 @@ def execute_in_process(
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
asset_layer=self.asset_layer,
_input_values=input_values,
).get_job_def_for_op_selection(op_selection)

tags = None
Expand Down Expand Up @@ -352,6 +377,16 @@ def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
else None
)

def has_direct_input_value(self, input_name: str) -> bool:
return input_name in self._input_values

def get_direct_input_value(self, input_name: str) -> object:
if input_name not in self._input_values:
raise DagsterInvalidInvocationError(
f"On job '{self.name}', attempted to retrieve input value for input named '{input_name}', but no value was provided. Provided input values: {sorted(list(self._input_values.keys()))}"
)
return self._input_values[input_name]


def _swap_default_io_man(resources: Dict[str, ResourceDefinition], job: PipelineDefinition):
"""
Expand Down Expand Up @@ -476,3 +511,10 @@ def get_subselected_graph_definition(
input_mappings=new_input_mappings,
output_mappings=new_output_mappings,
)


def get_direct_input_values_from_job(target: PipelineDefinition) -> Mapping[str, Any]:
if target.is_job:
return cast(JobDefinition, target)._input_values # pylint: disable=protected-access
else:
return {}

0 comments on commit 8455843

Please sign in to comment.