Skip to content

Commit

Permalink
Revert "Allow python objects to be passed as top-level inputs to job (#…
Browse files Browse the repository at this point in the history
…7032)" (#7779)

This reverts commit afe23af.
  • Loading branch information
dpeng817 committed May 6, 2022
1 parent 9366a7e commit 68a99d6
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 322 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

from dagster_graphql.test.utils import execute_dagster_graphql, infer_pipeline_selector

from .graphql_context_test_suite import NonLaunchableGraphQLContextTestMatrix
Expand Down Expand Up @@ -68,7 +70,10 @@ def test_pipeline_with_invalid_definition_error(self, graphql_context):
assert result.data
assert result.data["runConfigSchemaOrError"]["__typename"] == "InvalidSubsetError"

assert (
"Input 'some_input' of solid 'fail_subset' has no upstream output, no default value, and no dagster type loader"
in result.data["runConfigSchemaOrError"]["message"]
assert re.match(
(
r".*DagsterInvalidSubsetError[\s\S]*"
r"add a dagster_type_loader for the type 'InputTypeWithoutHydration'"
),
result.data["runConfigSchemaOrError"]["message"],
)
11 changes: 4 additions & 7 deletions python_modules/dagster/dagster/core/definitions/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Callable,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Set,
Expand Down Expand Up @@ -573,7 +572,6 @@ def to_job(
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "JobDefinition":
if not isinstance(self.node_def, GraphDefinition):
raise DagsterInvalidInvocationError(
Expand All @@ -583,7 +581,6 @@ def to_job(

tags = check.opt_dict_param(tags, "tags", key_type=str)
hooks = check.opt_set_param(hooks, "hooks", HookDefinition)
input_values = check.opt_mapping_param(input_values, "input_values")
op_retry_policy = check.opt_inst_param(op_retry_policy, "op_retry_policy", RetryPolicy)
job_hooks: Set[HookDefinition] = set()
job_hooks.update(check.opt_set_param(hooks, "hooks", HookDefinition))
Expand All @@ -600,7 +597,6 @@ def to_job(
op_retry_policy=op_retry_policy,
version_strategy=version_strategy,
partitions_def=partitions_def,
input_values=input_values,
)

def execute_in_process(
Expand All @@ -610,7 +606,6 @@ def execute_in_process(
resources: Optional[Dict[str, Any]] = None,
raise_on_error: bool = True,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
if not isinstance(self.node_def, GraphDefinition):
raise DagsterInvalidInvocationError(
Expand All @@ -624,7 +619,10 @@ def execute_in_process(
from .executor_definition import execute_in_process_executor
from .job_definition import JobDefinition

input_values = check.opt_mapping_param(input_values, "input_values")
if len(self.node_def.input_defs) > 0:
raise DagsterInvariantViolationError(
"Graphs with inputs cannot be used with execute_in_process at this time."
)

ephemeral_job = JobDefinition(
name=self.given_alias,
Expand All @@ -634,7 +632,6 @@ def execute_in_process(
tags=self.tags,
hook_defs=self.hook_defs,
op_retry_policy=self.retry_policy,
_input_values=input_values,
)

return core_execute_in_process(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
from functools import update_wrapper
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
Mapping,
Optional,
Union,
overload,
)
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Dict, Optional, Union, overload

from dagster import check
from dagster.core.decorator_utils import format_docstring_for_description
Expand Down Expand Up @@ -42,7 +32,6 @@ def __init__(
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
input_values: Optional[Mapping[str, object]] = None,
):
self.name = name
self.description = description
Expand All @@ -55,7 +44,6 @@ def __init__(
self.op_retry_policy = op_retry_policy
self.version_strategy = version_strategy
self.partitions_def = partitions_def
self.input_values = input_values

def __call__(self, fn: Callable[..., Any]) -> JobDefinition:
check.callable_param(fn, "fn")
Expand Down Expand Up @@ -105,7 +93,6 @@ def __call__(self, fn: Callable[..., Any]) -> JobDefinition:
op_retry_policy=self.op_retry_policy,
version_strategy=self.version_strategy,
partitions_def=self.partitions_def,
input_values=self.input_values,
)
update_wrapper(job_def, fn)
return job_def
Expand All @@ -128,8 +115,6 @@ def job(
hooks: Optional[AbstractSet[HookDefinition]] = ...,
op_retry_policy: Optional[RetryPolicy] = ...,
version_strategy: Optional[VersionStrategy] = ...,
partitions_def: Optional["PartitionsDefinition"] = ...,
input_values: Optional[Mapping[str, object]] = ...,
) -> _Job:
...

Expand All @@ -146,7 +131,6 @@ def job(
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> Union[JobDefinition, _Job]:
"""Creates a job with the specified parameters from the decorated graph/op invocation function.
Expand Down Expand Up @@ -195,8 +179,6 @@ def job(
partitions_def (Optional[PartitionsDefinition]): Defines a discrete set of partition keys
that can parameterize the job. If this argument is supplied, the config argument
can't also be supplied.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of a job.
"""
if callable(name):
Expand All @@ -215,5 +197,4 @@ def job(
op_retry_policy=op_retry_policy,
version_strategy=version_strategy,
partitions_def=partitions_def,
input_values=input_values,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Iterable,
Iterator,
List,
Mapping,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -463,7 +462,6 @@ def to_job(
op_selection: Optional[List[str]] = None,
partitions_def: Optional["PartitionsDefinition"] = None,
asset_layer: Optional["AssetLayer"] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "JobDefinition":
"""
Make this graph in to an executable Job by providing remaining components required for execution.
Expand Down Expand Up @@ -514,8 +512,6 @@ def to_job(
argument can't also be supplied.
asset_layer (Optional[AssetLayer]): Top level information about the assets this job
will produce. Generally should not be set manually.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of a job.
Returns:
JobDefinition
Expand All @@ -530,7 +526,6 @@ def to_job(
executor_def = check.opt_inst_param(
executor_def, "executor_def", ExecutorDefinition, default=multi_or_in_process_executor
)
input_values = check.opt_mapping_param(input_values, "input_values")

if resource_defs and "io_manager" in resource_defs:
resource_defs_with_defaults = resource_defs
Expand Down Expand Up @@ -588,7 +583,6 @@ def to_job(
version_strategy=version_strategy,
op_retry_policy=op_retry_policy,
asset_layer=asset_layer,
_input_values=input_values,
).get_job_def_for_op_selection(op_selection)

def coerce_to_job(self):
Expand Down Expand Up @@ -629,7 +623,6 @@ def execute_in_process(
raise_on_error: bool = True,
op_selection: Optional[List[str]] = None,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
"""
Execute this graph in-process, collecting results in-memory.
Expand All @@ -653,8 +646,6 @@ def execute_in_process(
(downstream dependencies) within 3 levels down.
* ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of the graph.
Returns:
:py:class:`~dagster.ExecuteInProcessResult`
Expand All @@ -668,7 +659,6 @@ def execute_in_process(

instance = check.opt_inst_param(instance, "instance", DagsterInstance)
resources = check.opt_dict_param(resources, "resources", key_type=str)
input_values = check.opt_mapping_param(input_values, "input_values")

resource_defs = wrap_resources_for_execution(resources)

Expand All @@ -677,7 +667,6 @@ def execute_in_process(
graph_def=self,
executor_def=execute_in_process_executor,
resource_defs=resource_defs,
_input_values=input_values,
).get_job_def_for_op_selection(op_selection)

run_config = run_config if run_config is not None else {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,14 @@
)
from dagster.core.definitions.node_definition import NodeDefinition
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvalidSubsetError,
)
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidSubsetError
from dagster.core.selector.subset_selector import (
LeafNodeSelection,
OpSelectionData,
parse_op_selection,
)
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.core.utils import str_format_set
from dagster.utils import merge_dicts

from .asset_layer import AssetLayer
from .config import ConfigMapping
Expand Down Expand Up @@ -79,7 +74,6 @@ def __init__(
version_strategy: Optional[VersionStrategy] = None,
_op_selection_data: Optional[OpSelectionData] = None,
asset_layer: Optional[AssetLayer] = None,
_input_values: Optional[Mapping[str, object]] = None,
):

# Exists for backcompat - JobDefinition is implemented as a single-mode pipeline.
Expand All @@ -95,15 +89,6 @@ def __init__(
self._op_selection_data = check.opt_inst_param(
_op_selection_data, "_op_selection_data", OpSelectionData
)
self._input_values: Mapping[str, object] = check.opt_mapping_param(
_input_values, "_input_values"
)
for input_name in sorted(list(self._input_values.keys())):
if not graph_def.has_input(input_name):
job_name = name or graph_def.name
raise DagsterInvalidDefinitionError(
f"Error when constructing JobDefinition '{job_name}': Input value provided for key '{input_name}', but job has no top-level input with that name."
)

super(JobDefinition, self).__init__(
name=name,
Expand Down Expand Up @@ -157,7 +142,6 @@ def execute_in_process(
raise_on_error: bool = True,
op_selection: Optional[List[str]] = None,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
"""
Execute the Job in-process, gathering results in-memory.
Expand All @@ -184,8 +168,6 @@ def execute_in_process(
(downstream dependencies) within 3 levels down.
* ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.
Returns:
:py:class:`~dagster.ExecuteInProcessResult`
Expand All @@ -196,12 +178,6 @@ def execute_in_process(
run_config = check.opt_dict_param(run_config, "run_config")
op_selection = check.opt_list_param(op_selection, "op_selection", str)
partition_key = check.opt_str_param(partition_key, "partition_key")
input_values = check.opt_mapping_param(input_values, "input_values")

# Combine provided input values at execute_in_process with input values
# provided to the definition. Input values provided at
# execute_in_process will override those provided on the definition.
input_values = merge_dicts(self._input_values, input_values)

resource_defs = dict(self.resource_defs)
logger_defs = dict(self.loggers)
Expand All @@ -218,7 +194,6 @@ def execute_in_process(
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
asset_layer=self.asset_layer,
_input_values=input_values,
).get_job_def_for_op_selection(op_selection)

tags = None
Expand Down Expand Up @@ -360,13 +335,6 @@ def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
else None
)

def get_direct_input_value(self, input_name: str) -> object:
if input_name not in self._input_values:
raise DagsterInvalidInvocationError(
f"On job '{self.name}', attempted to retrieve input value for input named '{input_name}', but no value was provided. Provided input values: {sorted(list(self._input_values.keys()))}"
)
return self._input_values[input_name]


def _swap_default_io_man(resources: Dict[str, ResourceDefinition], job: PipelineDefinition):
"""
Expand Down Expand Up @@ -500,10 +468,3 @@ def get_subselected_graph_definition(
f"The attempted subset {str_format_set(resolved_op_selection_dict)} for graph "
f"{graph.name} results in an invalid graph."
) from exc


def get_direct_input_values_from_job(target: PipelineDefinition) -> Mapping[str, Any]:
if target.is_job:
return cast(JobDefinition, target)._input_values # pylint: disable=protected-access
else:
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
RootInputManagerDefinition,
)
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from dagster.core.types.dagster_type import DagsterType
from dagster.core.types.dagster_type import DagsterType, DagsterTypeKind
from dagster.core.utils import str_format_set
from dagster.utils import frozentags, merge_dicts
from dagster.utils.backcompat import experimental_class_warning
Expand Down Expand Up @@ -970,6 +970,24 @@ def _checked_input_resource_reqs_for_mode(
else:
# input is unconnected
input_def = handle.input_def
if (
not input_def.dagster_type.loader
and not input_def.dagster_type.kind == DagsterTypeKind.NOTHING
and not input_def.root_manager_key
and not input_def.has_default_value
):
raise DagsterInvalidDefinitionError(
"Input '{input_name}' in {described_node} is not connected to "
"the output of a previous node and can not be loaded from configuration, "
"making it impossible to execute. "
"Possible solutions are:\n"
" * add a dagster_type_loader for the type '{dagster_type}'\n"
" * connect '{input_name}' to the output of another node\n".format(
described_node=node.describe_node(),
input_name=input_def.name,
dagster_type=input_def.dagster_type.display_name,
)
)

# If a root manager is provided, it's always used. I.e. it has priority over
# the other ways of loading unsatisfied inputs - dagster type loaders and
Expand Down Expand Up @@ -1030,7 +1048,6 @@ def _create_run_config_schema(
mode_definition: ModeDefinition,
required_resources: Set[str],
) -> "RunConfigSchema":
from .job_definition import get_direct_input_values_from_job
from .run_config import (
RunConfigSchemaCreationData,
construct_config_type_dictionary,
Expand Down Expand Up @@ -1066,7 +1083,6 @@ def _create_run_config_schema(
ignored_solids=ignored_solids,
required_resources=required_resources,
is_using_graph_job_op_apis=pipeline_def.is_job,
direct_inputs=get_direct_input_values_from_job(pipeline_def),
)
)

Expand Down

0 comments on commit 68a99d6

Please sign in to comment.