Skip to content

Commit

Permalink
partitioned assets from graphs (#7837)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 11, 2022
1 parent d60c3c8 commit 3125227
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def from_graph(
asset_keys_by_input_name: Optional[Mapping[str, AssetKey]] = None,
asset_keys_by_output_name: Optional[Mapping[str, AssetKey]] = None,
internal_asset_deps: Optional[Mapping[str, Set[AssetKey]]] = None,
partitions_def: Optional[PartitionsDefinition] = None,
) -> "AssetsDefinition":
"""
Constructs an AssetsDefinition from a GraphDefinition.
Expand All @@ -80,6 +81,8 @@ def from_graph(
graph. If this default is not correct, you pass in a map of output names to a
corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be
either used as input to the asset or produced within the graph.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the assets.
"""
asset_keys_by_input_name = check.opt_dict_param(
asset_keys_by_input_name, "asset_keys_by_input_name", key_type=str, value_type=AssetKey
Expand Down Expand Up @@ -113,6 +116,9 @@ def from_graph(
),
node_def=graph_def,
asset_deps=transformed_internal_asset_deps or None,
partitions_def=check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
),
)

@property
Expand Down
19 changes: 13 additions & 6 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)

import dagster._check as check
from dagster.config import Shape
from dagster.core.definitions.asset_layer import AssetLayer
from dagster.core.definitions.config import ConfigMapping
from dagster.core.definitions.decorators.op_decorator import op
Expand Down Expand Up @@ -189,14 +190,20 @@ def run_config_for_partition_fn(partition_key: str) -> Dict[str, Any]:
"end": upstream_partition_key_range.end,
}

ops_config[assets_def.op.name] = {
"config": {
"assets": {
"input_partitions": inputs_dict,
"output_partitions": outputs_dict,
config_schema = assets_def.node_def.config_schema
if (
config_schema
and isinstance(config_schema.config_type, Shape)
and "assets" in config_schema.config_type.fields
):
ops_config[assets_def.node_def.name] = {
"config": {
"assets": {
"input_partitions": inputs_dict,
"output_partitions": outputs_dict,
}
}
}
}

return {"ops": ops_config}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from dagster import (
AssetGroup,
AssetKey,
AssetsDefinition,
DagsterInvalidDefinitionError,
Expand All @@ -14,6 +15,7 @@
Out,
Output,
ResourceDefinition,
StaticPartitionsDefinition,
graph,
io_manager,
multi_asset,
Expand Down Expand Up @@ -712,6 +714,22 @@ def my_graph(x, y):
assert assets_def.asset_keys_by_output_name["result"] == AssetKey("my_graph")


def test_graph_asset_partitioned():
@op
def my_op(context):
assert context.partition_key == "a"

@graph
def my_graph():
return my_op()

assets_def = AssetsDefinition.from_graph(
graph_def=my_graph, partitions_def=StaticPartitionsDefinition(["a", "b", "c"])
)

AssetGroup([assets_def]).build_job("abc").execute_in_process(partition_key="a")


def test_all_assets_job():
@asset
def a1():
Expand Down

0 comments on commit 3125227

Please sign in to comment.