Skip to content

Commit

Permalink
[hooks] fix resource reqs in nested graphs (#7529)
Browse files Browse the repository at this point in the history
the hook def resources for mode calculation did not traverse the node invocation heirarchy, only the recursive definitions. This fixes that.

## Test Plan

added test that previously failed and invariant to prevent future similar problems
  • Loading branch information
alangenfeld committed Apr 22, 2022
1 parent 81e7cba commit 460e482
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Any, Dict, FrozenSet, List, Optional, Set, Union
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
FrozenSet,
Iterator,
List,
Optional,
Set,
Union,
)

from dagster import check
from dagster.core.definitions.policy import RetryPolicy
Expand Down Expand Up @@ -235,7 +246,7 @@ def __init__(
mode_def,
self._current_level_node_defs,
self._graph_def._dagster_type_dict,
self._graph_def._node_dict,
self._graph_def.node_dict,
self._hook_defs,
self._graph_def._dependency_structure,
)
Expand Down Expand Up @@ -694,11 +705,18 @@ def _get_pipeline_subset_def(
) from exc


def _iterate_all_nodes(root_node_dict: Dict[str, Node]) -> Iterator[Node]:
for node in root_node_dict.values():
yield node
if node.is_graph:
yield from _iterate_all_nodes(node.definition.ensure_graph_def().node_dict)


def _checked_resource_reqs_for_mode(
mode_def: ModeDefinition,
node_defs: List[NodeDefinition],
dagster_type_dict: Dict[str, DagsterType],
solid_dict: Dict[str, Node],
root_node_dict: Dict[str, Node],
pipeline_hook_defs: AbstractSet[HookDefinition],
dependency_structure: DependencyStructure,
) -> Set[str]:
Expand Down Expand Up @@ -751,11 +769,11 @@ def _checked_resource_reqs_for_mode(

# Validate unsatisfied inputs can be materialized from config
resource_reqs.update(
_checked_input_resource_reqs_for_mode(dependency_structure, solid_dict, mode_def)
_checked_input_resource_reqs_for_mode(dependency_structure, root_node_dict, mode_def)
)

for solid in solid_dict.values():
for hook_def in solid.hook_defs:
for node in _iterate_all_nodes(root_node_dict):
for hook_def in node.hook_defs:
for required_resource in hook_def.required_resource_keys:
resource_reqs.add(required_resource)
if required_resource not in mode_resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ def _core_resource_initialization_event_generator(
resource_instances,
resource_init_times,
)

delta_res_keys = resource_keys_to_init - set(resource_instances.keys())
check.invariant(
not delta_res_keys,
f"resources instances do not align with resource to init, difference: {delta_res_keys}",
)
yield ScopedResourcesBuilder(resource_instances, contains_generator)
except DagsterUserCodeExecutionError as dagster_user_error:
# Can only end up in this state if we attempt to initialize a resource, so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
check,
execute_pipeline,
graph,
job,
op,
pipeline,
reconstructable,
Expand All @@ -22,6 +23,7 @@
from dagster.core.definitions.events import HookExecutionResult
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.test_utils import instance_for_test


class SomeUserException(Exception):
Expand Down Expand Up @@ -469,6 +471,39 @@ def run_success_hook():
assert called.get("hook_two") == 1


@success_hook(required_resource_keys={"resource_a"})
def res_hook(context):
assert context.resources.resource_a == 1


@op
def emit():
return 1


@graph
def nested():
emit.with_hooks({res_hook})()


@graph
def nested_two():
nested()


@job(resource_defs={"resource_a": resource_a})
def res_hook_job():
nested_two()


def test_multiproc_hook_resource_deps():
assert nested.execute_in_process(resources={"resource_a": resource_a}).success
assert res_hook_job.execute_in_process().success

with instance_for_test() as instance:
assert execute_pipeline(reconstructable(res_hook_job), instance=instance).success


def test_hook_context_op_solid_provided():
@op
def hook_op(_):
Expand Down

0 comments on commit 460e482

Please sign in to comment.