Skip to content

Commit

Permalink
[docs] - Graph-backed asset examples (#8339)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Jun 13, 2022
1 parent d954334 commit 142cd80
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 1 deletion.
58 changes: 58 additions & 0 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,64 @@ graph_asset = AssetsDefinition.from_graph(store_slack_files_in_sql)

**Note**: All output assets must be selected when using a graph-backed asset to create a job. Dagster will select all graph output automatically upon creating a job.

#### Defining basic dependencies

The `from_graph` attribute on the `AssetsDefinition` object infers upstream and downstream asset dependencies from the graph definition provided. In the most simple case when the graph returns a singular output, Dagster infers the name of the graph to be the outputted asset key.

In the example below, Dagster creates an asset with key `middle_asset` from the `middle_asset` graph. Just like assets defined via <PyObject object="asset" decorator />, each argument to the decorated graph function is an upstream asset name. `middle_asset` depends on `upstream_asset`, and `downstream_asset` depends on `middle_asset`.

```python file=/concepts/assets/graph_backed_asset.py startafter=start_basic_dependencies endbefore=end_basic_dependencies
@asset
def upstream_asset():
return 1


@graph
def middle_asset(upstream_asset):
return add_one(upstream_asset)


middle_asset = AssetsDefinition.from_graph(middle_asset)


@asset
def downstream_asset(middle_asset):
return middle_asset + 1
```

When your graph returns multiple outputs, Dagster infers each output name to be the outputted asset key. In the below example, `two_assets_graph` accepts `upstream_asset` and outputs two assets, `first_asset` and `second_asset`.

```python file=/concepts/assets/graph_backed_asset.py startafter=start_basic_dependencies_2 endbefore=end_basic_dependencies_2
@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
def two_assets_graph(upstream_asset):
one, two = two_outputs(upstream_asset)
return {"first_asset": one, "second_asset": two}


two_assets = AssetsDefinition.from_graph(two_assets_graph)
```

#### Defining explicit dependencies

You can also define dependencies for graph-backed assets explicitly via the `asset_keys_by_input_name` and `asset_keys_by_output_name` arguments to `from_graph`:

```python file=/concepts/assets/graph_backed_asset.py startafter=start_explicit_dependencies endbefore=end_explicit_dependencies
@graph(out={"one": GraphOut(), "two": GraphOut()})
def return_one_and_two(zero):
one, two = two_outputs(zero)
return {"one": one, "two": two}


explicit_deps_asset = AssetsDefinition.from_graph(
return_one_and_two,
asset_keys_by_input_name={"zero": AssetKey("upstream_asset")},
asset_keys_by_output_name={
"one": AssetKey("asset_one"),
"two": AssetKey("asset_two"),
},
)
```

### Asset context

Since a software-defined asset contains an op, all the typical functionality of an op - like the use of [resources](/concepts/resources) and [configuration](#asset-configuration) - is available to an asset. Supplying the `context` parameter provides access to system information for the op, for example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@

from pandas import DataFrame

from dagster import AssetGroup, AssetsDefinition, ResourceDefinition, graph, op
from dagster import (
AssetGroup,
AssetKey,
AssetsDefinition,
GraphOut,
Out,
Output,
ResourceDefinition,
asset,
build_assets_job,
graph,
op,
repository,
)


def create_db_connection():
Expand Down Expand Up @@ -48,3 +61,89 @@ def store_slack_files_in_sql():
[graph_asset],
resource_defs={"slack": ResourceDefinition.hardcoded_resource(slack_mock)},
).build_job("store_slack_files")


@op
def add_one(input_num):
return input_num + 1


# start_basic_dependencies


@asset
def upstream_asset():
return 1


@graph
def middle_asset(upstream_asset):
return add_one(upstream_asset)


middle_asset = AssetsDefinition.from_graph(middle_asset)


@asset
def downstream_asset(middle_asset):
return middle_asset + 1


# end_basic_dependencies

basic_deps_job = build_assets_job(
"basic_deps_job", [upstream_asset, middle_asset, downstream_asset]
)


@op(out={"one": Out(), "two": Out()})
def two_outputs(upstream):
yield Output(output_name="one", value=upstream)
yield Output(output_name="two", value=upstream)


# start_basic_dependencies_2


@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
def two_assets_graph(upstream_asset):
one, two = two_outputs(upstream_asset)
return {"first_asset": one, "second_asset": two}


two_assets = AssetsDefinition.from_graph(two_assets_graph)

# end_basic_dependencies_2

second_basic_deps_job = build_assets_job(
"second_basic_deps_job", [upstream_asset, two_assets]
)

# start_explicit_dependencies


@graph(out={"one": GraphOut(), "two": GraphOut()})
def return_one_and_two(zero):
one, two = two_outputs(zero)
return {"one": one, "two": two}


explicit_deps_asset = AssetsDefinition.from_graph(
return_one_and_two,
asset_keys_by_input_name={"zero": AssetKey("upstream_asset")},
asset_keys_by_output_name={
"one": AssetKey("asset_one"),
"two": AssetKey("asset_two"),
},
)

# end_explicit_dependencies

explicit_deps_job = build_assets_job(
"explicit_deps_job", [upstream_asset, explicit_deps_asset]
)


@repository
def my_repo():
return [basic_deps_job, store_slack_files, second_basic_deps_job, explicit_deps_job]

1 comment on commit 142cd80

@vercel
Copy link

@vercel vercel bot commented on 142cd80 Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.