Skip to content

Commit

Permalink
[with_resources changes 1/n] with_resources docstring, config argument (
Browse files Browse the repository at this point in the history
#8322)

* Rename config arg on with_resources, add docstring

* Add config validation

* Docstring fixes
  • Loading branch information
dpeng817 committed Jun 13, 2022
1 parent 13f7f16 commit 8d5806e
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 8 deletions.
83 changes: 77 additions & 6 deletions python_modules/dagster/dagster/core/execution/with_resources.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,99 @@
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, TypeVar, cast
from typing import Any, Iterable, List, Mapping, Optional, Sequence, TypeVar, cast

from dagster import _check as check
from dagster.utils import merge_dicts

from ...config import Shape
from ..definitions import ResourceDefinition
from ..definitions.resource_requirement import ResourceAddable
from ..errors import DagsterInvalidConfigError, DagsterInvalidInvocationError

T = TypeVar("T", bound=ResourceAddable)


def with_resources(
definitions: Iterable[T],
resource_defs: Mapping[str, ResourceDefinition],
config: Optional[Dict[str, Any]] = None,
resource_config_by_key: Optional[Mapping[str, Any]] = None,
) -> Sequence[T]:
"""Adds dagster resources to copies of resource-requiring dagster definitions.
An error will be thrown if any provided definitions have a conflicting
resource definition provided for a key provided to resource_defs. Resource
config can be provided, with keys in the config dictionary corresponding to
the keys for each resource definition. If any definition has unsatisfied
resource keys after applying with_resources, an error will be thrown.
Args:
definitions (Iterable[ResourceAddable]): Dagster definitions to provide resources to.
resource_defs (Mapping[str, ResourceDefinition]):
Mapping of resource keys to ResourceDefinition objects to satisfy
resource requirements of provided dagster definitions.
resource_config_by_key (Optional[Mapping[str, Any]]):
Specifies config for provided resources. The key in this dictionary
corresponds to configuring the same key in the resource_defs
dictionary.
Examples:
.. code-block:: python
from dagster import asset, resource, with_resources
@resource(config_schema={"bar": str})
def foo_resource():
...
@asset(required_resource_keys={"foo"})
def asset1(context):
foo = context.resources.foo
...
@asset(required_resource_keys={"foo"})
def asset2(context):
foo = context.resources.foo
...
asset1_with_foo, asset2_with_foo = with_resources(
[the_asset, other_asset],
resource_config_by_key={
"foo": {
"config": {"bar": ...}
}
}
)
"""
from dagster.config.validate import validate_config
from dagster.core.storage.fs_io_manager import fs_io_manager

check.mapping_param(resource_defs, "resource_defs")
config = check.opt_dict_param(config, "config")
resource_config_by_key = check.opt_mapping_param(
resource_config_by_key, "resource_config_by_key"
)

resource_defs = merge_dicts({"io_manager": fs_io_manager}, resource_defs)
for key in resource_defs.keys():
if key in config:
resource_defs[key] = resource_defs[key].configured(config[key])

for key, resource_def in resource_defs.items():
if key in resource_config_by_key:
resource_config = resource_config_by_key[key]
if not isinstance(resource_config, dict) or "config" not in resource_config:
raise DagsterInvalidInvocationError(
f"Error with config for resource key '{key}': Expected a "
"dictionary of the form {'config': ...}, but received "
f"{str(resource_config)}"
)

outer_config_shape = Shape({"config": resource_def.get_config_field()})
config_evr = validate_config(outer_config_shape, resource_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
f"Error when applying config for resource with key '{key}' ",
config_evr.errors,
resource_config,
)
resource_defs[key] = resource_defs[key].configured(resource_config["config"])

transformed_defs: List[T] = []
for definition in definitions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@

from dagster import (
AssetKey,
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
IOManager,
ResourceDefinition,
build_op_context,
io_manager,
mem_io_manager,
resource,
)
from dagster.core.asset_defs import AssetsDefinition, SourceAsset, asset, build_assets_job
from dagster.core.errors import (
DagsterInvalidConfigError,
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvariantViolationError,
)
from dagster.core.execution.with_resources import with_resources
from dagster.core.storage.mem_io_manager import InMemoryIOManager

Expand Down Expand Up @@ -344,3 +349,88 @@ def bar():
DagsterInvariantViolationError, match='Resource key "bar" transitively depends on itself.'
):
with_resources([the_asset], resource_defs={"foo": foo, "bar": bar})


def get_resource_and_asset_for_config_tests():
@asset(required_resource_keys={"foo", "bar"})
def the_asset(context):
assert context.resources.foo == "blah"
assert context.resources.bar == "baz"

@resource(config_schema=str)
def the_resource(context):
return context.resource_config

return the_asset, the_resource


def test_config():
the_asset, the_resource = get_resource_and_asset_for_config_tests()

transformed_asset = with_resources(
[the_asset],
resource_defs={"foo": the_resource, "bar": the_resource},
resource_config_by_key={"foo": {"config": "blah"}, "bar": {"config": "baz"}},
)[0]

transformed_asset(build_op_context())


def test_config_not_satisfied():
the_asset, the_resource = get_resource_and_asset_for_config_tests()

transformed_asset = with_resources(
[the_asset],
resource_defs={"foo": the_resource, "bar": the_resource},
)[0]

result = build_assets_job(
"test",
[transformed_asset],
config={"resources": {"foo": {"config": "blah"}, "bar": {"config": "baz"}}},
).execute_in_process()

assert result.success


def test_bad_key_provided():

the_asset, the_resource = get_resource_and_asset_for_config_tests()

transformed_asset = with_resources(
[the_asset],
resource_defs={"foo": the_resource, "bar": the_resource},
resource_config_by_key={
"foo": {"config": "blah"},
"bar": {"config": "baz"},
"bad": "whatever",
},
)[0]

transformed_asset(build_op_context())


def test_bad_config_provided():
the_asset, the_resource = get_resource_and_asset_for_config_tests()

with pytest.raises(
DagsterInvalidConfigError, match="Error when applying config for resource with key 'foo'"
):
with_resources(
[the_asset],
resource_defs={"foo": the_resource, "bar": the_resource},
resource_config_by_key={
"foo": {"config": object()},
},
)

with pytest.raises(
DagsterInvalidInvocationError, match="Error with config for resource key 'foo'"
):
with_resources(
[the_asset],
resource_defs={"foo": the_resource, "bar": the_resource},
resource_config_by_key={
"foo": "bad",
},
)

0 comments on commit 8d5806e

Please sign in to comment.