Skip to content

Commit

Permalink
AssetsDefinition.from_graph (#7620)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Apr 28, 2022
1 parent ba9d867 commit b8be100
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 3 deletions.
121 changes: 118 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import warnings
from typing import AbstractSet, Iterable, Mapping, Optional, cast
from typing import AbstractSet, Dict, Iterable, Mapping, Optional, Set, cast

from dagster import check
from dagster.core.definitions import NodeDefinition, OpDefinition
from dagster.core.definitions import GraphDefinition, NodeDefinition, OpDefinition
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.partition import PartitionsDefinition
from dagster.utils.backcompat import ExperimentalWarning
from dagster.utils.backcompat import ExperimentalWarning, experimental

from .partition_mapping import PartitionMapping

Expand Down Expand Up @@ -55,6 +55,65 @@ def __init__(
def __call__(self, *args, **kwargs):
return self._node_def(*args, **kwargs)

@staticmethod
@experimental
def from_graph(
graph_def: GraphDefinition,
asset_keys_by_input_name: Optional[Mapping[str, AssetKey]] = None,
asset_keys_by_output_name: Optional[Mapping[str, AssetKey]] = None,
internal_asset_deps: Optional[Mapping[str, Set[AssetKey]]] = None,
) -> "AssetsDefinition":
"""
Constructs an AssetsDefinition from a GraphDefinition.
Args:
graph_def (GraphDefinition): The GraphDefinition that is an asset.
asset_keys_by_input_name (Optional[Mapping[str, AssetKey]]): A mapping of the input
names of the decorated graph to their corresponding asset keys. If not provided,
the input asset keys will be created from the graph input names.
asset_keys_by_output_name (Optional[Mapping[str, AssetKey]]): A mapping of the output
names of the decorated graph to their corresponding asset keys. If not provided,
the output asset keys will be created from the graph output names.
internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]): By default, it is assumed
that all assets produced by the graph depend on all assets that are consumed by that
graph. If this default is not correct, you pass in a map of output names to a
corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be
either used as input to the asset or produced within the graph.
"""
asset_keys_by_input_name = check.opt_dict_param(
asset_keys_by_input_name, "asset_keys_by_input_name", key_type=str, value_type=AssetKey
)
asset_keys_by_output_name = check.opt_dict_param(
asset_keys_by_output_name,
"asset_keys_by_output_name",
key_type=str,
value_type=AssetKey,
)
internal_asset_deps = check.opt_dict_param(
internal_asset_deps, "internal_asset_deps", key_type=str, value_type=set
)

transformed_internal_asset_deps = {}
if internal_asset_deps:
for output_name, asset_keys in internal_asset_deps.items():
check.invariant(
output_name in asset_keys_by_output_name,
f"output_name {output_name} specified in internal_asset_deps does not exist in the decorated function",
)
transformed_internal_asset_deps[asset_keys_by_output_name[output_name]] = asset_keys

return AssetsDefinition(
asset_keys_by_input_name=_infer_asset_keys_by_input_names(
graph_def,
asset_keys_by_input_name or {},
),
asset_keys_by_output_name=_infer_asset_keys_by_output_names(
graph_def, asset_keys_by_output_name or {}
),
node_def=graph_def,
asset_deps=transformed_internal_asset_deps or None,
)

@property
def op(self) -> OpDefinition:
check.invariant(
Expand Down Expand Up @@ -131,3 +190,59 @@ def with_replaced_asset_keys(
partitions_def=self.partitions_def,
partition_mappings=self._partition_mappings,
)


def _infer_asset_keys_by_input_names(
graph_def: GraphDefinition, asset_keys_by_input_name: Mapping[str, AssetKey]
) -> Mapping[str, AssetKey]:
all_input_names = {graph_input.definition.name for graph_input in graph_def.input_mappings}

if asset_keys_by_input_name:
check.invariant(
set(asset_keys_by_input_name.keys()) == all_input_names,
"The set of input names keys specified in the asset_keys_by_input_name argument must "
"equal the set of asset keys inputted by this GraphDefinition. \n"
f"asset_keys_by_input_name keys: {set(asset_keys_by_input_name.keys())} \n"
f"expected keys: {all_input_names}",
)

# If asset key is not supplied in asset_keys_by_input_name, create asset key
# from input name
inferred_input_names_by_asset_key: Dict[str, AssetKey] = {
input_name: asset_keys_by_input_name.get(input_name, AssetKey([input_name]))
for input_name in all_input_names
}

return inferred_input_names_by_asset_key


def _infer_asset_keys_by_output_names(
graph_def: GraphDefinition, asset_keys_by_output_name: Mapping[str, AssetKey]
) -> Mapping[str, AssetKey]:
output_names = [output_def.name for output_def in graph_def.output_defs]
if asset_keys_by_output_name:
check.invariant(
set(asset_keys_by_output_name.keys()) == set(output_names),
"The set of output names keys specified in the asset_keys_by_output_name argument must "
"equal the set of asset keys outputted by this GraphDefinition. \n"
f"asset_keys_by_input_name keys: {set(asset_keys_by_output_name.keys())} \n"
f"expected keys: {set(output_names)}",
)

inferred_asset_keys_by_output_names: Dict[str, AssetKey] = {
output_name: asset_key for output_name, asset_key in asset_keys_by_output_name.items()
}

if (
len(output_names) == 1
and output_names[0] not in asset_keys_by_output_name
and output_names[0] == "result"
):
# If there is only one output and the name is the default "result", generate asset key
# from the name of the node
inferred_asset_keys_by_output_names[output_names[0]] = AssetKey([graph_def.name])

for output_name in output_names:
if output_name not in inferred_asset_keys_by_output_names:
inferred_asset_keys_by_output_names[output_name] = AssetKey([output_name])
return inferred_asset_keys_by_output_names
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
DependencyDefinition,
GraphIn,
GraphOut,
IOManager,
Out,
Expand Down Expand Up @@ -637,6 +638,80 @@ def bar(foo):
job.execute_in_process()


def test_internal_asset_deps():
with pytest.raises(Exception, match="output_name non_exist_output_name"):

@op
def my_op(x, y): # pylint: disable=unused-argument
return x

@graph(ins={"x": GraphIn()})
def my_graph(x, y):
my_op(x, y)

AssetsDefinition.from_graph(
graph_def=my_graph, internal_asset_deps={"non_exist_output_name": {AssetKey("b")}}
)


def test_asset_def_from_graph_inputs():
@op
def my_op(x, y): # pylint: disable=unused-argument
return x

@graph(ins={"x": GraphIn(), "y": GraphIn()})
def my_graph(x, y):
my_op(x, y)

assets_def = AssetsDefinition.from_graph(
graph_def=my_graph,
asset_keys_by_input_name={"x": AssetKey("x_asset"), "y": AssetKey("y_asset")},
)

assert assets_def.asset_keys_by_input_name["x"] == AssetKey("x_asset")
assert assets_def.asset_keys_by_input_name["y"] == AssetKey("y_asset")


def test_asset_def_from_graph_outputs():
@op
def x_op(x):
return x

@op
def y_op(y):
return y

@graph(out={"x": GraphOut(), "y": GraphOut()})
def my_graph(x, y):
return {"x": x_op(x), "y": y_op(y)}

assets_def = AssetsDefinition.from_graph(
graph_def=my_graph,
asset_keys_by_output_name={"y": AssetKey("y_asset"), "x": AssetKey("x_asset")},
)

assert assets_def.asset_keys_by_output_name["y"] == AssetKey("y_asset")
assert assets_def.asset_keys_by_output_name["x"] == AssetKey("x_asset")


def test_graph_asset_decorator_no_args():
@op
def my_op(x, y): # pylint: disable=unused-argument
return x

@graph
def my_graph(x, y):
return my_op(x, y)

assets_def = AssetsDefinition.from_graph(
graph_def=my_graph,
)

assert assets_def.asset_keys_by_input_name["x"] == AssetKey("x")
assert assets_def.asset_keys_by_input_name["y"] == AssetKey("y")
assert assets_def.asset_keys_by_output_name["result"] == AssetKey("my_graph")


def test_all_assets_job():
@asset
def a1():
Expand Down

0 comments on commit b8be100

Please sign in to comment.