Skip to content

Commit

Permalink
graph_asset and graph_multi_asset decorators (#10152)
Browse files Browse the repository at this point in the history
### Summary & Motivation

I was inspired to write this when I came across this example:
https://docs.dagster.io/concepts/assets/graph-backed-assets#defining-basic-dependencies-for-graph-backed-assets.

I think `AssetsDefinition.from_graph` is useful in situations where
someone has a general / reusable graph that they want to build assets
from, but the decorators in this PR will be a lot more ergonomic in
situations where someone has specific assets in mind when they define
their graph.

This PR also updates the docs to focus primarily on the new decorators:
[preview](https://dagster-git-graph-backed-asset-decorator-elementl.vercel.app/concepts/assets/graph-backed-assets).

### How I Tested These Changes

Added tests
  • Loading branch information
sryza authored and clairelin135 committed Feb 22, 2023
1 parent 314b968 commit c4f158d
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 110 deletions.
102 changes: 41 additions & 61 deletions docs/content/concepts/assets/graph-backed-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@ description: Defining a software-defined asset with multiple discrete computatio

# Graph-Backed Assets

[Basic software-defined assets](/concepts/assets/software-defined-assets#a-basic-software-defined-asset) are computed using a single op. If generating an asset involves multiple discrete computations, you can use graph-backed assets by separating each computation into an op and building a graph to combine your computations. This allows you to launch re-executions of runs at the op boundaries but doesn't require you to link each intermediate value to an asset in persistent storage.

Graph-backed assets are useful if you have an existing graph that produces and consumes assets. Wrapping your graph inside a software-defined asset gives you all the benefits of software-defined assets — like cross-job lineage — without requiring you to change the code inside your graph.

Additionally, graph-backed assets allow you to reuse an existing op across other graph-backed assets and jobs, or within the same graph.
[Basic software-defined assets](/concepts/assets/software-defined-assets#a-basic-software-defined-asset) are computed using a single op. If generating an asset involves multiple discrete computations, you can use graph-backed assets by separating each computation into an op and building a graph to combine your computations. This allows you to launch re-executions of runs at the op boundaries, but doesn't require you to link each intermediate value to an asset in persistent storage.

---

## Relevant APIs

| Name | Description |
| ---------------------------------------------------------- | --------------------------------------------- |
| <PyObject object="AssetsDefinition" method="from_graph" /> | Constructs an asset given a graph definition. |
| Name | Description |
| ---------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| <PyObject object="graph_asset" decorator /> | Decorator for defining an asset that's computed using a graph of ops. The dependencies between the ops are specified inside the body of the decorated function. |
| <PyObject object="graph_multi_asset" decorator /> | Decorator for defining a set of assets that are computed using a graph of ops. The dependencies between the ops are specified inside the body of the decorated function. |
| <PyObject object="AssetsDefinition" method="from_graph" /> | Constructs an asset, given a graph definition. Useful if you have a single graph that you want to use to power multiple different assets. |

---

## Defining graph-backed assets

To define a graph-backed asset, use the `from_graph` attribute on the <PyObject object="AssetsDefinition" /> object. The value returned from the graph that becomes the graph-backed asset will be stored in persistent storage as the asset:
To define a graph-backed asset, use the <PyObject object="graph_asset" decorator /> decorator. The decorated function defines the dependencies between a set of ops, which are combined to compute the asset.

In this case, when you tell Dagster to materialize the `slack_files_table` asset, Dagster will invoke `fetch_files_from_slack` and then invoke `store_files` after `fetch_files_from_slack` has completed.

```python file=/concepts/assets/graph_backed_asset.py startafter=start example endbefore=end example
import pandas as pd
from dagster import AssetsDefinition, graph, op
from dagster import graph_asset, op


@op(required_resource_keys={"slack"})
Expand All @@ -51,79 +51,60 @@ def store_files(files):
return files.to_sql(name="slack_files", con=create_db_connection())


@graph
def store_slack_files_in_sql():
@graph_asset
def slack_files_table():
return store_files(fetch_files_from_slack())


graph_asset = AssetsDefinition.from_graph(store_slack_files_in_sql)
```

### Defining basic dependencies for graph-backed assets

The `from_graph` attribute on the `AssetsDefinition` object infers upstream and downstream asset dependencies from the graph definition provided. In the most simple case when the graph returns a singular output, Dagster infers the name of the graph to be the outputted asset key.
Similar to with basic software-defined assets, Dagster infers the upstream assets from the names of the arguments to the decorated function.

In the example below, Dagster creates an asset with key `middle_asset` from the `middle_asset` graph. Just like assets defined via <PyObject object="asset" decorator />, each argument to the decorated graph function is an upstream asset name. `middle_asset` depends on `upstream_asset`, and `downstream_asset` depends on `middle_asset`:
The example below includes an asset named `middle_asset`. `middle_asset` depends on `upstream_asset`, and `downstream_asset` depends on `middle_asset`:

```python file=/concepts/assets/graph_backed_asset.py startafter=start_basic_dependencies endbefore=end_basic_dependencies
from dagster import AssetsDefinition, asset, graph
from dagster import asset, graph_asset, op


@asset
def upstream_asset():
return 1


@graph
def middle_asset(upstream_asset):
return add_one(upstream_asset)
@op
def add_one(input_num):
return input_num + 1


middle_asset = AssetsDefinition.from_graph(middle_asset)
@op
def multiply_by_two(input_num):
return input_num * 2


@graph_asset
def middle_asset(upstream_asset):
return multiply_by_two(add_one(upstream_asset))


@asset
def downstream_asset(middle_asset):
return middle_asset + 1
return middle_asset + 7
```

When your graph returns multiple outputs, Dagster infers each output name to be the outputted asset key. In the below example, `two_assets_graph` accepts `upstream_asset` and outputs two assets, `first_asset` and `second_asset`:
### Graph-backed multi-assets

Using the <PyObject object="graph_multi_asset" decorator />, you can create a combined definition of multiple assets that are computed using the same graph of ops and same upstream assets.

In the below example, `two_assets` accepts `upstream_asset` and outputs two assets, `first_asset` and `second_asset`:

```python file=/concepts/assets/graph_backed_asset.py startafter=start_basic_dependencies_2 endbefore=end_basic_dependencies_2
from dagster import AssetsDefinition, GraphOut, graph
from dagster import AssetOut, graph_multi_asset


@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
def two_assets_graph(upstream_asset):
@graph_multi_asset(outs={"first_asset": AssetOut(), "second_asset": AssetOut()})
def two_assets(upstream_asset):
one, two = two_outputs(upstream_asset)
return {"first_asset": one, "second_asset": two}


two_assets = AssetsDefinition.from_graph(two_assets_graph)
```

### Defining explicit dependencies for graph-backed assets

You can also define dependencies for graph-backed assets explicitly via the `asset_keys_by_input_name` and `asset_keys_by_output_name` arguments to `from_graph`:

```python file=/concepts/assets/graph_backed_asset.py startafter=start_explicit_dependencies endbefore=end_explicit_dependencies
from dagster import AssetsDefinition, GraphOut, graph


@graph(out={"one": GraphOut(), "two": GraphOut()})
def return_one_and_two(zero):
one, two = two_outputs(zero)
return {"one": one, "two": two}


explicit_deps_asset = AssetsDefinition.from_graph(
return_one_and_two,
keys_by_input_name={"zero": AssetKey("upstream_asset")},
keys_by_output_name={
"one": AssetKey("asset_one"),
"two": AssetKey("asset_two"),
},
)
```

### Advanced: Subsetting graph-backed assets
Expand Down Expand Up @@ -176,17 +157,16 @@ def baz(foo_2, bar_2):
return foo_2 + bar_2


@graph(out={"foo_asset": GraphOut(), "baz_asset": GraphOut()})
def my_graph():
@graph_multi_asset(
outs={"foo_asset": AssetOut(), "baz_asset": AssetOut()}, can_subset=True
)
def my_graph_assets():
bar_1, bar_2 = bar()
foo_1, foo_2 = foo(bar_1)
return {"foo_asset": foo_1, "baz_asset": baz(foo_2, bar_2)}


defs = Definitions(
assets=[AssetsDefinition.from_graph(my_graph, can_subset=True)],
jobs=[define_asset_job("graph_asset")],
)
defs = Definitions(assets=[my_graph_assets], jobs=[define_asset_job("graph_asset")])
```

Depending on how outputs are returned from the ops within a graph-backed asset, there could be unexpected materializations. For example, the following `foo` implementation would unexpectedly materialize `foo_asset` if `baz_asset` was the only asset selected for execution.
Expand All @@ -198,7 +178,7 @@ def foo():


# Will unexpectedly materialize foo_asset
defs.get_job_def("graph_asset").execute_in_process(
defs.get_job_def("my_graph_assets").execute_in_process(
asset_selection=[AssetKey("baz_asset")]
)
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def create_db_connection():

# start example
import pandas as pd
from dagster import AssetsDefinition, graph, op
from dagster import graph_asset, op


@op(required_resource_keys={"slack"})
Expand All @@ -41,54 +41,54 @@ def store_files(files):
return files.to_sql(name="slack_files", con=create_db_connection())


@graph
def store_slack_files_in_sql():
@graph_asset
def slack_files_table():
return store_files(fetch_files_from_slack())


graph_asset = AssetsDefinition.from_graph(store_slack_files_in_sql)

# end example

slack_mock = MagicMock()

store_slack_files = define_asset_job(
"store_slack_files", selection=AssetSelection.assets(graph_asset)
"store_slack_files", selection=AssetSelection.assets(slack_files_table)
)


@op
def add_one(input_num):
return input_num + 1


# start_basic_dependencies
from dagster import AssetsDefinition, asset, graph
from dagster import asset, graph_asset, op


@asset
def upstream_asset():
return 1


@graph
def middle_asset(upstream_asset):
return add_one(upstream_asset)
@op
def add_one(input_num):
return input_num + 1


@op
def multiply_by_two(input_num):
return input_num * 2

middle_asset = AssetsDefinition.from_graph(middle_asset) # type: ignore

@graph_asset
def middle_asset(upstream_asset):
return multiply_by_two(add_one(upstream_asset))


@asset
def downstream_asset(middle_asset):
return middle_asset + 1
return middle_asset + 7


# end_basic_dependencies

basic_deps_job = define_asset_job(
"basic_deps_job",
AssetSelection.assets(upstream_asset, middle_asset, downstream_asset), # type: ignore
AssetSelection.assets(upstream_asset, middle_asset, downstream_asset),
)


Expand All @@ -99,46 +99,35 @@ def two_outputs(upstream):


# start_basic_dependencies_2
from dagster import AssetsDefinition, GraphOut, graph
from dagster import AssetOut, graph_multi_asset


@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
def two_assets_graph(upstream_asset):
@graph_multi_asset(outs={"first_asset": AssetOut(), "second_asset": AssetOut()})
def two_assets(upstream_asset):
one, two = two_outputs(upstream_asset)
return {"first_asset": one, "second_asset": two}


two_assets = AssetsDefinition.from_graph(two_assets_graph)

# end_basic_dependencies_2

second_basic_deps_job = define_asset_job(
"second_basic_deps_job", AssetSelection.assets(upstream_asset, two_assets)
)

# start_explicit_dependencies
from dagster import AssetsDefinition, GraphOut, graph

from dagster import AssetOut, graph_multi_asset

@graph(out={"one": GraphOut(), "two": GraphOut()})
def return_one_and_two(zero):
one, two = two_outputs(zero)
return {"one": one, "two": two}

@graph_multi_asset(outs={"asset_one": AssetOut(), "asset_two": AssetOut()})
def one_and_two(upstream_asset):
one, two = two_outputs(upstream_asset)
return {"asset_one": one, "asset_two": two}

explicit_deps_asset = AssetsDefinition.from_graph(
return_one_and_two,
keys_by_input_name={"zero": AssetKey("upstream_asset")},
keys_by_output_name={
"one": AssetKey("asset_one"),
"two": AssetKey("asset_two"),
},
)

# end_explicit_dependencies

explicit_deps_job = define_asset_job(
"explicit_deps_job", AssetSelection.assets(upstream_asset, explicit_deps_asset)
"explicit_deps_job", AssetSelection.assets(upstream_asset, one_and_two)
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from dagster import (
AssetsDefinition,
AssetOut,
Definitions,
GraphOut,
Out,
Output,
define_asset_job,
graph,
graph_multi_asset,
op,
)

Expand Down Expand Up @@ -35,16 +34,15 @@ def baz(foo_2, bar_2):
return foo_2 + bar_2


@graph(out={"foo_asset": GraphOut(), "baz_asset": GraphOut()})
def my_graph():
@graph_multi_asset(
outs={"foo_asset": AssetOut(), "baz_asset": AssetOut()}, can_subset=True
)
def my_graph_assets():
bar_1, bar_2 = bar()
foo_1, foo_2 = foo(bar_1)
return {"foo_asset": foo_1, "baz_asset": baz(foo_2, bar_2)}


defs = Definitions(
assets=[AssetsDefinition.from_graph(my_graph, can_subset=True)],
jobs=[define_asset_job("graph_asset")],
)
defs = Definitions(assets=[my_graph_assets], jobs=[define_asset_job("graph_asset")])

# end_graph_backed_asset_example
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def foo():


# Will unexpectedly materialize foo_asset
defs.get_job_def("graph_asset").execute_in_process(
defs.get_job_def("my_graph_assets").execute_in_process(
asset_selection=[AssetKey("baz_asset")]
)
# end_unexpected_materialization_foo
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
from dagster._core.definitions.configurable import configured as configured
from dagster._core.definitions.decorators.asset_decorator import (
asset as asset,
graph_asset as graph_asset,
graph_multi_asset as graph_multi_asset,
multi_asset as multi_asset,
)
from dagster._core.definitions.decorators.config_mapping_decorator import (
Expand Down

0 comments on commit c4f158d

Please sign in to comment.