Skip to content

Commit

Permalink
[top-level inputs 1/2] Move InvalidSubsetError try-catch up a level (#…
Browse files Browse the repository at this point in the history
…7780)

* Move InvalidSubsetError try-catch up a level

* Fix mypy error

* isort
  • Loading branch information
dpeng817 committed May 11, 2022
1 parent 42e35af commit 03c8691
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 43 deletions.
86 changes: 43 additions & 43 deletions python_modules/dagster/dagster/core/definitions/job_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,33 +242,42 @@ def get_job_def_for_op_selection(

resolved_op_selection_dict = parse_op_selection(self, op_selection)

sub_graph = get_subselected_graph_definition(self.graph, resolved_op_selection_dict)

return JobDefinition(
name=self.name,
description=self.description,
resource_defs=dict(self.resource_defs),
logger_defs=dict(self.loggers),
executor_def=self.executor_def,
config_mapping=self.config_mapping,
partitioned_config=self.partitioned_config,
preset_defs=self.preset_defs,
tags=self.tags,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
graph_def=sub_graph,
version_strategy=self.version_strategy,
_op_selection_data=OpSelectionData(
op_selection=op_selection,
resolved_op_selection=set(
resolved_op_selection_dict.keys()
), # equivalent to solids_to_execute. currently only gets top level nodes.
parent_job_def=self, # used by pipeline snapshot lineage
),
# TODO: subset this structure.
# https://github.com/dagster-io/dagster/issues/7541
asset_layer=self.asset_layer,
)
try:
sub_graph = get_subselected_graph_definition(self.graph, resolved_op_selection_dict)

return JobDefinition(
name=self.name,
description=self.description,
resource_defs=dict(self.resource_defs),
logger_defs=dict(self.loggers),
executor_def=self.executor_def,
config_mapping=self.config_mapping,
partitioned_config=self.partitioned_config,
preset_defs=self.preset_defs,
tags=self.tags,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
graph_def=sub_graph,
version_strategy=self.version_strategy,
_op_selection_data=OpSelectionData(
op_selection=op_selection,
resolved_op_selection=set(
resolved_op_selection_dict.keys()
), # equivalent to solids_to_execute. currently only gets top level nodes.
parent_job_def=self, # used by pipeline snapshot lineage
),
# TODO: subset this structure.
# https://github.com/dagster-io/dagster/issues/7541
asset_layer=self.asset_layer,
)
except DagsterInvalidDefinitionError as exc:
# This handles the case when you construct a subset such that an unsatisfied
# input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError,
# we re-raise a DagsterInvalidSubsetError.
raise DagsterInvalidSubsetError(
f"The attempted subset {str_format_set(resolved_op_selection_dict)} for graph "
f"{self.graph.name} results in an invalid graph."
) from exc

def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:

Expand Down Expand Up @@ -460,19 +469,10 @@ def get_subselected_graph_definition(
)
)

try:
return SubselectedGraphDefinition(
parent_graph_def=graph,
dependencies=deps,
node_defs=[definition for _, definition in selected_nodes],
input_mappings=new_input_mappings,
output_mappings=new_output_mappings,
)
except DagsterInvalidDefinitionError as exc:
# This handles the case when you construct a subset such that an unsatisfied
# input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError,
# we re-raise a DagsterInvalidSubsetError.
raise DagsterInvalidSubsetError(
f"The attempted subset {str_format_set(resolved_op_selection_dict)} for graph "
f"{graph.name} results in an invalid graph."
) from exc
return SubselectedGraphDefinition(
parent_graph_def=graph,
dependencies=deps,
node_defs=[definition for _, definition in selected_nodes],
input_mappings=new_input_mappings,
output_mappings=new_output_mappings,
)
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# type: ignore[return-value]
from datetime import datetime
from typing import List

import pytest
Expand Down Expand Up @@ -642,3 +644,23 @@ def my_super_graph():
assert result_sub_3_2.success
assert set(_success_step_keys(result_sub_3_2)) == {"my_graph.my_nested_graph.my_op"}
assert result_sub_3_2.output_for_node("my_graph.my_nested_graph.my_op") == "hello"


def test_op_selection_unsatisfied_input_failure():
@op
def basic() -> datetime:
return 5

@op
def ingest(x: datetime) -> str:
return str(x)

@graph
def the_graph():
ingest(basic())

with pytest.raises(DagsterInvalidSubsetError):
the_graph.execute_in_process(op_selection=["ingest"])

with pytest.raises(DagsterInvalidSubsetError):
the_graph.to_job(op_selection=["ingest"])

0 comments on commit 03c8691

Please sign in to comment.