Skip to content

Commit

Permalink
[docs] - Clean up graph-backed asset example, put under test (#8893)
Browse files Browse the repository at this point in the history
* clean up graph-backed asset example, put under test

* un-remove asset_group.py

* fix snaps
  • Loading branch information
dpeng817 authored Jul 19, 2022
1 parent c00de5b commit 31f3283
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 27 deletions.
21 changes: 17 additions & 4 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ Graph-backed assets are useful if you have an existing graph that produces and c
To define a graph-backed asset, use the `from_graph` attribute on the <PyObject object="AssetsDefinition" /> object. The value returned from the graph that becomes the graph-backed asset will be stored in persistent storage as the asset:

```python file=/concepts/assets/graph_backed_asset.py startafter=start example endbefore=end example
import pandas as pd
from dagster import AssetsDefinition, asset, graph, op


@op(required_resource_keys={"slack"})
def fetch_files_from_slack(context) -> DataFrame:
def fetch_files_from_slack(context) -> pd.DataFrame:
files = context.resources.slack.files_list(channel="#random")
return DataFrame(
return pd.DataFrame(
[
{
"id": file.get("id"),
Expand Down Expand Up @@ -245,6 +249,9 @@ The `from_graph` attribute on the `AssetsDefinition` object infers upstream and
In the example below, Dagster creates an asset with key `middle_asset` from the `middle_asset` graph. Just like assets defined via <PyObject object="asset" decorator />, each argument to the decorated graph function is an upstream asset name. `middle_asset` depends on `upstream_asset`, and `downstream_asset` depends on `middle_asset`:

```python file=/concepts/assets/graph_backed_asset.py startafter=start_basic_dependencies endbefore=end_basic_dependencies
from dagster import AssetsDefinition, asset, graph


@asset
def upstream_asset():
return 1
Expand All @@ -266,6 +273,9 @@ def downstream_asset(middle_asset):
When your graph returns multiple outputs, Dagster infers each output name to be the outputted asset key. In the below example, `two_assets_graph` accepts `upstream_asset` and outputs two assets, `first_asset` and `second_asset`:

```python file=/concepts/assets/graph_backed_asset.py startafter=start_basic_dependencies_2 endbefore=end_basic_dependencies_2
from dagster import AssetsDefinition, GraphOut, graph


@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
def two_assets_graph(upstream_asset):
one, two = two_outputs(upstream_asset)
Expand All @@ -280,6 +290,9 @@ two_assets = AssetsDefinition.from_graph(two_assets_graph)
You can also define dependencies for graph-backed assets explicitly via the `asset_keys_by_input_name` and `asset_keys_by_output_name` arguments to `from_graph`:

```python file=/concepts/assets/graph_backed_asset.py startafter=start_explicit_dependencies endbefore=end_explicit_dependencies
from dagster import AssetsDefinition, GraphOut, graph


@graph(out={"one": GraphOut(), "two": GraphOut()})
def return_one_and_two(zero):
one, two = two_outputs(zero)
Expand All @@ -288,8 +301,8 @@ def return_one_and_two(zero):

explicit_deps_asset = AssetsDefinition.from_graph(
return_one_and_two,
asset_keys_by_input_name={"zero": AssetKey("upstream_asset")},
asset_keys_by_output_name={
keys_by_input_name={"zero": AssetKey("upstream_asset")},
keys_by_output_name={
"one": AssetKey("asset_one"),
"two": AssetKey("asset_two"),
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
from unittest import mock

from pandas import DataFrame

# isort: skip_file
# pylint: disable=reimported
from dagster import (
AssetGroup,
AssetKey,
load_assets_from_current_module,
AssetsDefinition,
with_resources,
GraphOut,
Out,
Output,
ResourceDefinition,
AssetSelection,
asset,
build_assets_job,
define_asset_job,
graph,
op,
repository,
)
from mock import MagicMock


def create_db_connection():
return "yay"
return MagicMock()


# start example
import pandas as pd
from dagster import AssetsDefinition, asset, graph, op


@op(required_resource_keys={"slack"})
def fetch_files_from_slack(context) -> DataFrame:
def fetch_files_from_slack(context) -> pd.DataFrame:
files = context.resources.slack.files_list(channel="#random")
return DataFrame(
return pd.DataFrame(
[
{
"id": file.get("id"),
Expand All @@ -55,12 +58,11 @@ def store_slack_files_in_sql():

# end example

slack_mock = mock.MagicMock()
slack_mock = MagicMock()

store_slack_files = AssetGroup(
[graph_asset],
resource_defs={"slack": ResourceDefinition.hardcoded_resource(slack_mock)},
).build_job("store_slack_files")
store_slack_files = define_asset_job(
"store_slack_files", selection=AssetSelection.assets(graph_asset)
)


@op
Expand All @@ -69,6 +71,7 @@ def add_one(input_num):


# start_basic_dependencies
from dagster import AssetsDefinition, asset, graph


@asset
Expand All @@ -91,8 +94,9 @@ def downstream_asset(middle_asset):

# end_basic_dependencies

basic_deps_job = build_assets_job(
"basic_deps_job", [upstream_asset, middle_asset, downstream_asset]
basic_deps_job = define_asset_job(
"basic_deps_job",
AssetSelection.assets(upstream_asset, middle_asset, downstream_asset),
)


Expand All @@ -103,6 +107,7 @@ def two_outputs(upstream):


# start_basic_dependencies_2
from dagster import AssetsDefinition, GraphOut, graph


@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
Expand All @@ -115,11 +120,12 @@ def two_assets_graph(upstream_asset):

# end_basic_dependencies_2

second_basic_deps_job = build_assets_job(
"second_basic_deps_job", [upstream_asset, two_assets]
second_basic_deps_job = define_asset_job(
"second_basic_deps_job", AssetSelection.assets(upstream_asset, two_assets)
)

# start_explicit_dependencies
from dagster import AssetsDefinition, GraphOut, graph


@graph(out={"one": GraphOut(), "two": GraphOut()})
Expand All @@ -130,20 +136,29 @@ def return_one_and_two(zero):

explicit_deps_asset = AssetsDefinition.from_graph(
return_one_and_two,
asset_keys_by_input_name={"zero": AssetKey("upstream_asset")},
asset_keys_by_output_name={
keys_by_input_name={"zero": AssetKey("upstream_asset")},
keys_by_output_name={
"one": AssetKey("asset_one"),
"two": AssetKey("asset_two"),
},
)

# end_explicit_dependencies

explicit_deps_job = build_assets_job(
"explicit_deps_job", [upstream_asset, explicit_deps_asset]
explicit_deps_job = define_asset_job(
"explicit_deps_job", AssetSelection.assets(upstream_asset, explicit_deps_asset)
)


@repository
def my_repo():
return [basic_deps_job, store_slack_files, second_basic_deps_job, explicit_deps_job]
return [
basic_deps_job,
store_slack_files,
second_basic_deps_job,
explicit_deps_job,
*with_resources(
load_assets_from_current_module(),
{"slack": ResourceDefinition.hardcoded_resource(slack_mock)},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest

from docs_snippets.concepts.assets.graph_backed_asset import my_repo


@pytest.mark.parametrize(
"job",
[
"basic_deps_job",
"store_slack_files",
"second_basic_deps_job",
"explicit_deps_job",
],
)
def test_jobs(job):
job = my_repo.get_job(job)
assert job.execute_in_process().success

1 comment on commit 31f3283

@vercel
Copy link

@vercel vercel bot commented on 31f3283 Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.