Skip to content

Commit

Permalink
build_assets_job -> AssetGroup in SDA guide (#6879)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Mar 3, 2022
1 parent c14ac93 commit 6b57eb6
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 58 deletions.
44 changes: 24 additions & 20 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
---
title: Software-Defined Assets | Dagster
description: The "software-defined asset" APIs sit atop of the graph/job/op APIs and enable a novel way of constructing Dagster jobs that puts assets at the forefront.
description: The "software-defined asset" APIs sit atop of the graph/job/op APIs and enable a novel approach to orchestration that puts assets at the forefront.
---

# Software-Defined Assets <Experimental />

<CodeReferenceLink filePath="examples/software_defined_assets" />

The "Software-defined asset" APIs sit atop of the graph/job/op APIs and enable a novel way of constructing Dagster jobs that puts assets at the forefront. As a reminder, to Dagster, an "asset" is a data product: an object produced by a data pipeline, e.g. a table, ML model, or report.
The "Software-defined asset" APIs sit atop of the graph/job/op APIs and enable a novel novel approach to orchestration that puts assets at the forefront. As a reminder, to Dagster, an "asset" is a data product: an object produced by a data pipeline, e.g. a table, ML model, or report.

Conceptually, software-defined assets invert the typical relationship between assets and computation. Instead of defining a graph of ops and recording which assets those ops end up materializing, you define a set of assets, each of which knows how to compute its contents from upstream assets.

Expand All @@ -17,7 +17,7 @@ Taking a software-defined asset approach has a few main benefits:
- **Track cross-job dependencies via asset lineage** - Dagit allows you to find the parents and children of any asset, even if they live in different jobs. This is useful for finding the sources of problems and for understanding the consequences of changing or removing an asset.
- **Know when you need to take action on an asset** - In a unified view, Dagster compares the assets you've defined in code to the assets you've materialized in storage. You can catch that you've deployed code for generating a new table, but that you haven't yet materialized it. Or that you've deployed code that adds a column to a table, but that your stored table is still missing that column. Or that you've removed an asset definition, but the table still exists in storage.

In this example, we'll define some tables and generate a Dagster job that updates them. We have a table of temperature samples collected in five-minute increments, and we want to compute a table of the highest temperatures for each day.
In this example, we'll define some tables with dependencies on each other. We have a table of temperature samples collected in five-minute increments, and we want to compute a table of the highest temperatures for each day.

## Assets computed with Pandas and stored as CSVs

Expand Down Expand Up @@ -58,26 +58,30 @@ def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:

The framework infers asset dependencies by looking at the names of the arguments to the decorated functions. E.g. the function that defines the `daily_temperature_highs` asset has an argument named `sfo_q2_weather_sample` - corresponding to the asset of the same name.

### Building a job
### Combining the assets into a group

Having defined some assets, we can use <PyObject module="dagster.core.asset_defs" object="build_assets_job" /> to build a job that, when it runs, materializes them - i.e. it computes their contents and writes them to a location in persistent storage.
Having defined some assets, we can combine them into an <PyObject object="AssetGroup" />, which allows working with them in Dagit. It also allows combining them with resources and IO managers that determine how they're stored and connect them to external services.

```python file=../../software_defined_assets/software_defined_assets/weather_job.py startafter=build_assets_job_start endbefore=build_assets_job_end
weather_job = build_assets_job(
"weather",
assets=[daily_temperature_highs, hottest_dates],
source_assets=[sfo_q2_weather_sample],
It's common to use a utility like <PyObject object="AssetGroup" method="from_module" /> or \<PyObject object="AssetGroup" method='from_package_name" /> to pick up all the assets within a module or package, so you don't need to list them individually.

```python file=../../software_defined_assets/software_defined_assets/weather_assets_group.py startafter=asset_group_start endbefore=asset_group_end
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
from . import assets

weather_assets = AssetGroup.from_modules(
modules=[assets],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)
```

The order that we supply the assets to <PyObject module="dagster.core.asset_defs" object="build_assets_job" /> doesn't matter. <PyObject module="dagster.core.asset_defs" object="build_assets_job" /> determines the execution dependencies based on the dependencies declared in the assets. `sfo_q2_weather_sample` is included in `source_assets` because it's not computed as part of the job - just consumed by other assets in the job.
The order that we supply the assets when constructing an <PyObject object="AssetGroup" /> doesn't matter - the dependencies are determined by what's declared inside each asset.

The functions we used to define our assets describe how to compute their contents, but not how to read and write them to persistent storage. For reading and writing, we define an <PyObject object="IOManager" />. In this case, our `LocalFileSystemIOManager` stores DataFrames as CSVs on the local filesystem:

```python file=../../software_defined_assets/software_defined_assets/weather_job.py startafter=io_manager_start endbefore=io_manager_end
```python file=../../software_defined_assets/software_defined_assets/weather_assets_group.py startafter=io_manager_start endbefore=io_manager_end
class LocalFileSystemIOManager(IOManager):
"""Translates between Pandas DataFrames and CSVs on the local filesystem."""

Expand All @@ -98,7 +102,7 @@ class LocalFileSystemIOManager(IOManager):

## Adding in Spark assets

Not all the assets in a job need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the `daily_temperature_highs` asset we defined above using Pandas.
Not all the assets in a group need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the `daily_temperature_highs` asset we defined above using Pandas.

```python file=../../software_defined_assets/software_defined_assets/spark_asset.py
from pyspark.sql import DataFrame as SparkDF
Expand All @@ -121,13 +125,13 @@ def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF:
)
```

Here's an extended version of `weather_job` that contains the new asset:
Here's an extended version of `weather_assets` that contains the new asset:

```python file=../../software_defined_assets/software_defined_assets/spark_weather_assets_group.py startafter=asset_group_start endbefore=asset_group_end
from . import assets, spark_asset

```python file=../../software_defined_assets/software_defined_assets/spark_weather_job.py startafter=build_assets_job_start endbefore=build_assets_job_end
spark_weather_job = build_assets_job(
"spark_weather",
assets=[daily_temperature_highs, hottest_dates, daily_temperature_high_diffs],
source_assets=[sfo_q2_weather_sample],
spark_weather_assets = AssetGroup.from_modules(
modules=[assets, spark_asset],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
Expand All @@ -138,7 +142,7 @@ spark_weather_job = build_assets_job(

Because the same assets will be written and read into different Python types in different situations, we need to define an <PyObject object="IOManager" /> that can handle both of those types. Here's an extended version of the <PyObject object="IOManager" /> we defined before:

```python file=../../software_defined_assets/software_defined_assets/spark_weather_job.py startafter=io_manager_start endbefore=io_manager_end
```python file=../../software_defined_assets/software_defined_assets/spark_weather_assets_group.py startafter=io_manager_start endbefore=io_manager_end
class LocalFileSystemIOManager(IOManager):
def _get_fs_path(self, asset_key: AssetKey) -> str:
return os.path.abspath(os.path.join(*asset_key.path))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from software_defined_assets.spark_weather_job import spark_weather_job
from software_defined_assets.spark_weather_assets_group import spark_weather_assets

from dagster import repository


@repository
def software_defined_assets():
return [spark_weather_job]
return [spark_weather_assets]
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""
Defines a job that computes the weather assets.
"""isort:skip_file
Defines a group of the weather assets.
Data is stored in Parquet files using the "Hadoop-style" layout in which each table corresponds to a
directory, and each file within the directory contains some of the rows.
Expand All @@ -16,11 +17,7 @@
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import SparkSession

from dagster import AssetKey, IOManager, IOManagerDefinition, build_assets_job, check

from .assets import daily_temperature_highs, hottest_dates, sfo_q2_weather_sample
from .spark_asset import daily_temperature_high_diffs

from dagster import AssetGroup, AssetKey, IOManager, IOManagerDefinition, check

# io_manager_start
class LocalFileSystemIOManager(IOManager):
Expand Down Expand Up @@ -81,15 +78,13 @@ def load_input(self, context) -> Union[PandasDF, SparkDF]:

# io_manager_end

# build_assets_job_start
# asset_group_start
from . import assets, spark_asset

spark_weather_job = build_assets_job(
"spark_weather",
assets=[daily_temperature_highs, hottest_dates, daily_temperature_high_diffs],
source_assets=[sfo_q2_weather_sample],
spark_weather_assets = AssetGroup.from_modules(
modules=[assets, spark_asset],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)

# build_assets_job_end
# asset_group_end
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""
Defines a job that computes the weather assets.
"""isort:skip_file
Defines a group of weather assets.
Data is locally stored in csv files on the local filesystem.
"""
Expand All @@ -8,10 +9,7 @@
import pandas as pd
from pandas import DataFrame

from dagster import AssetKey, IOManager, IOManagerDefinition, build_assets_job

from .assets import daily_temperature_highs, hottest_dates, sfo_q2_weather_sample

from dagster import AssetGroup, AssetKey, IOManager, IOManagerDefinition

# io_manager_start
class LocalFileSystemIOManager(IOManager):
Expand All @@ -34,13 +32,15 @@ def load_input(self, context):

# io_manager_end

# build_assets_job_start
weather_job = build_assets_job(
"weather",
assets=[daily_temperature_highs, hottest_dates],
source_assets=[sfo_q2_weather_sample],
# asset_group_start
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
from . import assets

weather_assets = AssetGroup.from_modules(
modules=[assets],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)
# build_assets_job_end
# asset_group_end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from software_defined_assets.spark_weather_assets_group import spark_weather_assets


def test_airport_weather_assets():
assert spark_weather_assets.build_job(name="job").execute_in_process().success

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from software_defined_assets.weather_assets_group import weather_assets


def test_airport_weather_assets():
assert weather_assets.build_job(name="job").execute_in_process().success

This file was deleted.

1 comment on commit 6b57eb6

@vercel
Copy link

@vercel vercel bot commented on 6b57eb6 Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.