Skip to content

Commit

Permalink
[asset-resources 4/n][rfc] Refactor resource requirement checking code (
Browse files Browse the repository at this point in the history
#7947)

* Refactor resource requirement checking code

Move implementation of getting required graph resources to the graph itself

Keep resource requirement retrieval as its own code path

Revamp resource requirement checking process

Add ResourceRequirement, RequiresResources

Get resource dependencies

* Get errors as part of method

* Incorporate assets layer

* Make things non-optional that don't need to be optional
  • Loading branch information
dpeng817 committed May 31, 2022
1 parent 1429ac2 commit 9af83ae
Show file tree
Hide file tree
Showing 30 changed files with 583 additions and 504 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,4 @@ def test_pipeline_with_invalid_definition_error(self, graphql_context):
error_msg = result.data["runConfigSchemaOrError"]["message"]

assert "DagsterInvalidSubsetError" in error_msg
assert (
"Input 'some_input' of solid 'fail_subset' has no upstream output, no default value, and no dagster type loader."
in error_msg
)
assert "Input 'some_input' of solid 'fail_subset' has no way of being resolved" in error_msg
35 changes: 35 additions & 0 deletions python_modules/dagster/dagster/core/definitions/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
AbstractSet,
Any,
Dict,
Iterator,
List,
NamedTuple,
Optional,
Expand All @@ -32,9 +33,11 @@
from .utils import DEFAULT_OUTPUT, struct_to_string, validate_tags

if TYPE_CHECKING:
from .asset_layer import AssetLayer
from .composition import MappedInputPlaceholder
from .graph_definition import GraphDefinition
from .node_definition import NodeDefinition
from .resource_requirement import ResourceRequirement


class NodeInvocation(
Expand Down Expand Up @@ -241,6 +244,38 @@ def hook_defs(self) -> AbstractSet[HookDefinition]:
def retry_policy(self) -> Optional[RetryPolicy]:
return self._retry_policy

def get_resource_requirements(
self,
asset_layer: "AssetLayer",
outer_container: "GraphDefinition",
parent_handle: Optional["NodeHandle"] = None,
) -> Iterator["ResourceRequirement"]:
from .resource_requirement import InputManagerRequirement

cur_node_handle = NodeHandle(self.name, parent_handle)

if not self.is_graph:
solid_def = self.definition.ensure_solid_def()
for requirement in solid_def.get_resource_requirements((cur_node_handle, asset_layer)):
# If requirement is a root input manager requirement, but the corresponding node has an upstream output, then ignore the requirement.
if isinstance(
requirement, InputManagerRequirement
) and outer_container.dependency_structure.has_deps(
SolidInputHandle(self, solid_def.input_def_named(requirement.input_name))
):
continue
yield requirement
for hook_def in self.hook_defs:
yield from hook_def.get_resource_requirements(self.describe_node())
else:
graph_def = self.definition.ensure_graph_def()
for node in graph_def.node_dict.values():
yield from node.get_resource_requirements(
asset_layer=asset_layer,
outer_container=graph_def,
parent_handle=cur_node_handle,
)


class NodeHandleSerializer(DefaultNamedTupleSerializer):
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from .node_definition import NodeDefinition
from .output import OutputDefinition, OutputMapping
from .preset import PresetDefinition
from .resource_requirement import ResourceRequirement
from .solid_container import create_execution_structure, validate_dependency_dict
from .version_strategy import VersionStrategy

Expand Down Expand Up @@ -241,6 +242,31 @@ def _solids_in_topological_order(self):

return [self.solid_named(solid_name) for solid_name in order]

def get_inputs_must_be_resolved_top_level(
self, asset_layer: "AssetLayer", handle: Optional[NodeHandle] = None
) -> List[InputDefinition]:
unresolveable_input_defs = []
for node in self.node_dict.values():
cur_handle = NodeHandle(node.name, handle)
for input_def in node.definition.get_inputs_must_be_resolved_top_level(
asset_layer, cur_handle
):
if self.dependency_structure.has_deps(SolidInputHandle(node, input_def)):
continue
elif not node.container_maps_input(input_def.name):
raise DagsterInvalidDefinitionError(
f"Input '{input_def.name}' of {node.describe_node()} "
"has no way of being resolved. Must provide a resolution to this "
"input via another op/graph, or via a direct input value mapped from the "
"top-level graph. To "
"learn more, see the docs for unconnected inputs: "
"https://docs.dagster.io/concepts/io-management/unconnected-inputs#unconnected-inputs."
)
else:
mapped_input = node.container_mapped_input(input_def.name)
unresolveable_input_defs.append(mapped_input.definition)
return unresolveable_input_defs

@property
def node_type_str(self) -> str:
return "graph"
Expand Down Expand Up @@ -299,6 +325,16 @@ def iterate_solid_defs(self) -> Iterator["SolidDefinition"]:
for outer_node_def in self._node_defs:
yield from outer_node_def.iterate_solid_defs()

def iterate_node_handles(
self, parent_node_handle: Optional[NodeHandle] = None
) -> Iterator[NodeHandle]:
for node in self.node_dict.values():
cur_node_handle = NodeHandle(node.name, parent_node_handle)
if node.is_graph:
graph_def = node.definition.ensure_graph_def()
yield from graph_def.iterate_node_handles(cur_node_handle)
yield cur_node_handle

@property
def input_mappings(self) -> List[InputMapping]:
return self._input_mappings
Expand Down Expand Up @@ -704,6 +740,13 @@ def parent_graph_def(self) -> Optional["GraphDefinition"]:
def is_subselected(self) -> bool:
return False

def get_resource_requirements(self, asset_layer: "AssetLayer") -> Iterator[ResourceRequirement]:
for node in self.node_dict.values():
yield from node.get_resource_requirements(outer_container=self, asset_layer=asset_layer)

for dagster_type in self.all_dagster_types():
yield from dagster_type.get_resource_requirements()


class SubselectedGraphDefinition(GraphDefinition):
"""Defines a subselected graph.
Expand Down
16 changes: 14 additions & 2 deletions python_modules/dagster/dagster/core/definitions/hook_definition.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import AbstractSet, Any, Callable, NamedTuple, Optional
from typing import AbstractSet, Any, Callable, Iterator, NamedTuple, Optional, cast

import dagster._check as check

from ..decorator_utils import get_function_params
from ..errors import DagsterInvalidInvocationError
from .resource_requirement import HookResourceRequirement, RequiresResources, ResourceRequirement
from .utils import check_valid_name


Expand All @@ -16,7 +17,8 @@ class HookDefinition(
("required_resource_keys", AbstractSet[str]),
("decorated_fn", Optional[Callable]),
],
)
),
RequiresResources,
):
"""Define a hook which can be triggered during a op execution (e.g. a callback on the step
execution failure event during a op execution).
Expand Down Expand Up @@ -134,3 +136,13 @@ def a_job():
kwargs[context_arg_name], context_arg_name, HookContext
)
return hook_invocation_result(self, context)

def get_resource_requirements(
self, outer_context: Optional[object] = None
) -> Iterator[ResourceRequirement]:
# outer_context in this case is a string of (pipeline/job, pipeline/job name) or (node, node name)
attached_to = cast(Optional[str], outer_context)
for resource_key in sorted(list(self.required_resource_keys)):
yield HookResourceRequirement(
key=resource_key, attached_to=attached_to, hook_name=self.name
)
15 changes: 5 additions & 10 deletions python_modules/dagster/dagster/core/definitions/hook_invocation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import TYPE_CHECKING, List, NamedTuple, Optional, cast
from typing import TYPE_CHECKING, List, Optional

import dagster._check as check

from ..errors import DagsterInvariantViolationError
from ..execution.context.hook import BoundHookContext, UnboundHookContext
from .resource_requirement import ensure_requirements_satisfied

if TYPE_CHECKING:
from ..events import DagsterEvent
Expand All @@ -21,15 +21,10 @@ def hook_invocation_result(
)

# Validate that all required resources are provided in the context
for key in hook_def.required_resource_keys:
resources = cast(NamedTuple, hook_context.resources)
if key not in resources._asdict():
raise DagsterInvariantViolationError(
f"The hook '{hook_def.name}' requires resource '{key}', which was not provided by "
"the context."
)

# pylint: disable=protected-access
ensure_requirements_satisfied(
hook_context._resource_defs, list(hook_def.get_resource_requirements())
)

bound_context = BoundHookContext(
hook_def=hook_def,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import TYPE_CHECKING, Mapping, Sequence
from typing import TYPE_CHECKING, List, Mapping, Optional, Sequence

import dagster._check as check
from dagster.core.definitions.configurable import NamedConfigurableDefinition
Expand All @@ -10,6 +10,8 @@
from .utils import check_valid_name, validate_tags

if TYPE_CHECKING:
from .asset_layer import AssetLayer
from .dependency import NodeHandle
from .graph_definition import GraphDefinition
from .input import InputDefinition
from .output import OutputDefinition
Expand Down Expand Up @@ -230,3 +232,9 @@ def ensure_solid_def(self) -> "SolidDefinition":
return self

check.failed(f"{self.name} is not a SolidDefinition")

@abstractmethod
def get_inputs_must_be_resolved_top_level(
self, asset_layer: "AssetLayer", handle: Optional["NodeHandle"] = None
) -> List["InputDefinition"]:
raise NotImplementedError()

0 comments on commit 9af83ae

Please sign in to comment.