Skip to content

Commit

Permalink
Basic asset config (#7590)
Browse files Browse the repository at this point in the history
* Backend capabilities for asset config
  • Loading branch information
smackesey committed Jun 1, 2022
1 parent 0c1dc58 commit 992e271
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 5 deletions.
5 changes: 3 additions & 2 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def from_current_module(
)

def materialize(
self, selection: Optional[Union[str, List[str]]] = None
self, selection: Optional[Union[str, List[str]]] = None, run_config: Optional[Any] = None
) -> ExecuteInProcessResult:
"""
Executes an in-process run that materializes all assets in the group.
Expand All @@ -404,6 +404,7 @@ def materialize(
- ``['*some_asset_key']`` select ``some_asset_key`` and all its ancestors (upstream dependencies).
- ``['*some_asset_key+++']`` select ``some_asset_key``, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.
- ``['*some_asset_key', 'other_asset_key_a', 'other_asset_key_b+']`` select ``some_asset_key`` and all its ancestors, ``other_asset_key_a`` itself, and ``other_asset_key_b`` and its direct child asset keys. When subselecting into a multi-asset, all of the asset keys in that multi-asset must be selected.
run_config (Optional[Any]): The run config to use for the run that materializes the assets.
Returns:
ExecuteInProcessResult: The result of the execution.
Expand All @@ -417,7 +418,7 @@ def materialize(

return self.build_job(
name="in_process_materialization_job", selection=selection
).execute_in_process()
).execute_in_process(run_config=run_config)

def get_base_jobs(self) -> Sequence[JobDefinition]:
"""For internal use only."""
Expand Down
31 changes: 28 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dagster._check as check
from dagster.builtins import Nothing
from dagster.config import Field
from dagster.config.config_schema import ConfigSchemaType
from dagster.core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster.core.definitions.decorators.op_decorator import _Op
from dagster.core.definitions.events import AssetKey
Expand Down Expand Up @@ -51,6 +52,7 @@ def asset(
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
description: Optional[str] = ...,
config_schema: Optional[ConfigSchemaType] = None,
required_resource_keys: Optional[Set[str]] = ...,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = ...,
io_manager_def: Optional[IOManagerDefinition] = ...,
Expand All @@ -73,6 +75,7 @@ def asset(
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
metadata: Optional[Mapping[str, Any]] = None,
description: Optional[str] = None,
config_schema: Optional[ConfigSchemaType] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager_def: Optional[IOManagerDefinition] = None,
Expand Down Expand Up @@ -104,6 +107,9 @@ def asset(
and namespaces.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are
upstream dependencies, but do not pass an input to the asset.
config_schema (Optional[ConfigSchema): The configuration schema for the asset's underlying
op. If set, Dagster will check that config provided for the op matches this schema and fail
if it does not. If not set, Dagster will accept any config provided for the op.
metadata (Optional[Dict[str, Any]]): A dict of metadata entries for the asset.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by the op.
io_manager_key (Optional[str]): The resource key of the IOManager used
Expand Down Expand Up @@ -152,6 +158,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
non_argument_deps=_make_asset_keys(non_argument_deps),
metadata=metadata,
description=description,
config_schema=config_schema,
required_resource_keys=required_resource_keys,
resource_defs=resource_defs,
io_manager=io_manager_def or io_manager_key,
Expand All @@ -175,6 +182,7 @@ def __init__(
non_argument_deps: Optional[Set[AssetKey]] = None,
metadata: Optional[Mapping[str, Any]] = None,
description: Optional[str] = None,
config_schema: Optional[ConfigSchemaType] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager: Optional[Union[str, IOManagerDefinition]] = None,
Expand All @@ -196,6 +204,11 @@ def __init__(
required_resource_keys, "required_resource_keys"
)
self.io_manager = io_manager
self.config_schema = check.opt_dict_param(
config_schema,
"config_schema",
additional_message="Only dicts are supported for asset config_schema.",
)
self.compute_kind = compute_kind
self.dagster_type = dagster_type
self.partitions_def = partitions_def
Expand Down Expand Up @@ -237,7 +250,6 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
required_resource_keys.add(key)
for key in self.resource_defs.keys():
required_resource_keys.add(key)

op = _Op(
name="__".join(out_asset_key.path).replace("-", "_"),
description=self.description,
Expand All @@ -252,7 +264,8 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
"assets": {
"input_partitions": Field(dict, is_required=False),
"output_partitions": Field(dict, is_required=False),
}
},
**self.config_schema,
},
)(fn)

Expand Down Expand Up @@ -282,6 +295,7 @@ def multi_asset(
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
description: Optional[str] = None,
config_schema: Optional[ConfigSchemaType] = None,
required_resource_keys: Optional[Set[str]] = None,
compute_kind: Optional[str] = None,
internal_asset_deps: Optional[Mapping[str, Set[AssetKey]]] = None,
Expand All @@ -302,6 +316,10 @@ def multi_asset(
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to their metadata
and namespaces.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are upstream dependencies,
config_schema (Optional[ConfigSchema): The configuration schema for the asset's underlying
op. If set, Dagster will check that config provided for the op matches this schema and fail
if it does not. If not set, Dagster will accept any config provided for the op.
non_argument_deps (Optional[Set[AssetKey]]): Set of asset keys that are upstream dependencies,
but do not pass an input to the multi_asset.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by the op.
io_manager_key (Optional[str]): The resource key of the IOManager used for storing the
Expand Down Expand Up @@ -337,6 +355,11 @@ def multi_asset(
asset_deps = check.opt_dict_param(
internal_asset_deps, "internal_asset_deps", key_type=str, value_type=set
)
config_schema = check.opt_dict_param(
config_schema,
"config_schema",
additional_message="Only dicts are supported for asset config_schema.",
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:

Expand Down Expand Up @@ -380,7 +403,9 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
"assets": {
"input_partitions": Field(dict, is_required=False),
"output_partitions": Field(dict, is_required=False),
}
},
# Mypy scoping bug causing incorrect type inference here
**config_schema, # type: ignore
},
)(fn)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,17 @@ def asset_foo():
assert result.success


def test_materialize_with_config():
@asset(config_schema={"foo": str})
def asset_foo(context):
return context.op_config["foo"]

group = AssetGroup(assets=[asset_foo])

result = group.materialize(run_config={"ops": {"asset_foo": {"config": {"foo": "bar"}}}})
assert result.success


def test_materialize_with_out_of_process_executor():
@asset
def asset_foo():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
DependencyDefinition,
Field,
GraphIn,
GraphOut,
IOManager,
Expand All @@ -23,6 +24,7 @@
multi_asset,
op,
)
from dagster.config.source import StringSource
from dagster.core.asset_defs import AssetIn, SourceAsset, asset, build_assets_job
from dagster.core.definitions.dependency import NodeHandle
from dagster.core.errors import DagsterInvalidSubsetError
Expand Down Expand Up @@ -71,6 +73,18 @@ def asset2(asset1):
assert job.execute_in_process().success


def test_single_asset_pipeline_with_config():
@asset(config_schema={"foo": Field(StringSource)})
def asset1(context):
return context.op_config["foo"]

job = build_assets_job("a", [asset1])
assert job.graph.node_defs == [asset1.op]
assert job.execute_in_process(
run_config={"ops": {"asset1": {"config": {"foo": "bar"}}}}
).success


def test_fork():
@asset
def asset1():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ def my_asset(arg1):
assert AssetKey("arg1") in my_asset.asset_keys_by_input_name.values()


def test_asset_with_config_schema():
@asset(config_schema={"foo": int})
def my_asset(arg1):
return arg1

assert my_asset.op.config_schema


def test_multi_asset_with_config_schema():
@multi_asset(outs={"o1": Out(asset_key=AssetKey("o1"))}, config_schema={"foo": int})
def my_asset(arg1):
return arg1

assert my_asset.op.config_schema


def test_asset_with_compute_kind():
@asset(compute_kind="sql")
def my_asset(arg1):
Expand Down

0 comments on commit 992e271

Please sign in to comment.