Skip to content

Commit

Permalink
Make asset invocation error if resources conflict (#8390)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 authored and clairelin135 committed Jun 14, 2022
1 parent 14696ae commit e03eefe
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
21 changes: 14 additions & 7 deletions python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ def __call__(self, *args, **kwargs):
solid_def = self.op
provided_context: Optional[OpExecutionContext] = None
if len(args) > 0 and isinstance(args[0], OpExecutionContext):
provided_context = _build_invocation_context_with_included_resources(
self.resource_defs, args[0]
)
provided_context = _build_invocation_context_with_included_resources(self, args[0])
new_args = [provided_context, *args[1:]]
return solid_def(*new_args, **kwargs)
elif (
Expand All @@ -115,7 +113,7 @@ def __call__(self, *args, **kwargs):
context_param_name = get_function_params(solid_def.compute_fn.decorated_fn)[0].name
if context_param_name in kwargs:
provided_context = _build_invocation_context_with_included_resources(
self.resource_defs, kwargs[context_param_name]
self, kwargs[context_param_name]
)
new_kwargs = dict(kwargs)
new_kwargs[context_param_name] = provided_context
Expand Down Expand Up @@ -530,15 +528,24 @@ def _infer_keys_by_output_names(


def _build_invocation_context_with_included_resources(
resource_defs: Dict[str, ResourceDefinition], context: OpExecutionContext
assets_def: AssetsDefinition,
context: OpExecutionContext,
) -> OpExecutionContext:
from dagster.core.execution.context.invocation import (
UnboundSolidExecutionContext,
build_op_context,
)

override_resources = context.resources._asdict()
all_resources = merge_dicts(resource_defs, override_resources)
resource_defs = assets_def.resource_defs
invocation_resources = context.resources._asdict()
for resource_key in sorted(list(invocation_resources.keys())):
if resource_key in resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {str(assets_def)}: resource '{resource_key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
)
all_resources = merge_dicts(resource_defs, invocation_resources)

if isinstance(context, UnboundSolidExecutionContext):
context = cast(UnboundSolidExecutionContext, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,11 @@ def asset_resource_overrides(context):
assert context.resources.foo == "override_foo"
assert context.resources.bar == "orig_bar"

asset_resource_overrides(build_op_context(resources={"foo": "override_foo"}))
with pytest.raises(
DagsterInvalidInvocationError,
match="resource 'foo' provided on both the definition and invocation context.",
):
asset_resource_overrides(build_op_context(resources={"foo": "override_foo"}))


def test_asset_invocation_resource_errors():
Expand Down

0 comments on commit e03eefe

Please sign in to comment.