Skip to content

Commit

Permalink
un-asset-group-ify SDA guide (#8283)
Browse files Browse the repository at this point in the history
* un-asset-group-ify SDA guide

* get rid of unused imports
  • Loading branch information
dpeng817 committed Jun 9, 2022
1 parent 02e9d20 commit 0e1fc50
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 35 deletions.
30 changes: 16 additions & 14 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: Software-Defined Assets | Dagster
description: The "software-defined asset" APIs sit atop of the graph/job/op APIs and enable a novel approach to orchestration that puts assets at the forefront.
---

# Software-Defined Assets <Experimental />
# Software-Defined Assets

<CodeReferenceLink filePath="examples/software_defined_assets" />

Expand Down Expand Up @@ -61,30 +61,31 @@ def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:

The framework infers asset dependencies by looking at the names of the arguments to the decorated functions. The function that defines the `daily_temperature_highs` asset has an argument named `sfo_q2_weather_sample`, which corresponds to the asset definition of the same name.

### Combining the assets into a group
### Connecting assets to external services

Having defined some assets, we can combine them into an <PyObject object="AssetGroup" />, which allows working with them in Dagit. It also allows combining them with resources and IO managers that determine how they're stored and connect them to external services.
Having defined some assets, we can combine them with resources and IO managers to determine how they're stored, and connect them to external services. We use <PyObject object="with_resources" /> to provide resources to assets and source assets.

It's common to use a utility like <PyObject object="AssetGroup" method="from_module" /> or <PyObject object="AssetGroup" method="from_package_name" /> to pick up all the assets within a module or package, so you don't need to list them individually.
It's common to use a utility like <PyObject object="load_assets_from_modules" /> or <PyObject object="load_assets_from_package_name" /> to pick up all the assets within a module or package, so you don't need to list them individually.

```python file=../../software_defined_assets/software_defined_assets/weather_assets_group.py startafter=asset_group_start endbefore=asset_group_end
```python file=../../software_defined_assets/software_defined_assets/weather_assets.py startafter=gather_assets_start endbefore=gather_assets_end
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
from . import assets
from dagster import load_assets_from_modules, with_resources

weather_assets = AssetGroup.from_modules(
modules=[assets],
weather_assets = with_resources(
load_assets_from_modules(modules=[assets]),
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)
```

The order that we supply the assets when constructing an <PyObject object="AssetGroup" /> doesn't matter, since the dependencies are determined by each asset definition.
The order that we supply the assets doesn't matter, since the dependencies are determined by each asset definition.

The functions we used to define our assets describe how to compute their contents, but not how to read and write them to persistent storage. For reading and writing, we define an <PyObject object="IOManager" />. In this case, our `LocalFileSystemIOManager` stores DataFrames as CSVs on the local filesystem:

```python file=../../software_defined_assets/software_defined_assets/weather_assets_group.py startafter=io_manager_start endbefore=io_manager_end
```python file=../../software_defined_assets/software_defined_assets/weather_assets.py startafter=io_manager_start endbefore=io_manager_end
class LocalFileSystemIOManager(IOManager):
"""Translates between Pandas DataFrames and CSVs on the local filesystem."""

Expand All @@ -105,7 +106,7 @@ class LocalFileSystemIOManager(IOManager):

## Adding in Spark assets

Not all the assets in a group need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the `daily_temperature_highs` asset we defined above using Pandas.
Not all the assets in the same dependency graph need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the `daily_temperature_highs` asset we defined above using Pandas.

```python file=../../software_defined_assets/software_defined_assets/spark_asset.py
from pyspark.sql import DataFrame as SparkDF
Expand All @@ -130,11 +131,12 @@ def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF:

Here's an extended version of `weather_assets` that contains the new asset:

```python file=../../software_defined_assets/software_defined_assets/spark_weather_assets_group.py startafter=asset_group_start endbefore=asset_group_end
```python file=../../software_defined_assets/software_defined_assets/spark_weather_assets.py startafter=gather_assets_start endbefore=gather_assets_end
from . import assets, spark_asset
from dagster import load_assets_from_modules, with_resources

spark_weather_assets = AssetGroup.from_modules(
modules=[assets, spark_asset],
spark_weather_assets = with_resources(
load_assets_from_modules(modules=[assets, spark_asset]),
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
Expand All @@ -145,7 +147,7 @@ spark_weather_assets = AssetGroup.from_modules(

Because the same assets will be written and read into different Python types in different situations, we need to define an <PyObject object="IOManager" /> that can handle both of those types. Here's an extended version of the <PyObject object="IOManager" /> we defined before:

```python file=../../software_defined_assets/software_defined_assets/spark_weather_assets_group.py startafter=io_manager_start endbefore=io_manager_end
```python file=../../software_defined_assets/software_defined_assets/spark_weather_assets.py startafter=io_manager_start endbefore=io_manager_end
class LocalFileSystemIOManager(IOManager):
def _get_fs_path(self, asset_key: AssetKey) -> str:
return os.path.abspath(os.path.join(*asset_key.path))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from software_defined_assets.spark_weather_assets_group import spark_weather_assets
from software_defined_assets.spark_weather_assets import spark_weather_assets

from dagster import repository

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import SparkSession

from dagster import AssetGroup, AssetKey, IOManager, IOManagerDefinition, _check as check
from dagster import AssetKey, IOManager, IOManagerDefinition, _check as check

# io_manager_start
class LocalFileSystemIOManager(IOManager):
Expand Down Expand Up @@ -78,13 +78,14 @@ def load_input(self, context) -> Union[PandasDF, SparkDF]:

# io_manager_end

# asset_group_start
# gather_assets_start
from . import assets, spark_asset
from dagster import load_assets_from_modules, with_resources

spark_weather_assets = AssetGroup.from_modules(
modules=[assets, spark_asset],
spark_weather_assets = with_resources(
load_assets_from_modules(modules=[assets, spark_asset]),
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)
# asset_group_end
# gather_assets_end
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pandas as pd
from pandas import DataFrame

from dagster import AssetGroup, AssetKey, IOManager, IOManagerDefinition
from dagster import AssetKey, IOManager, IOManagerDefinition

# io_manager_start
class LocalFileSystemIOManager(IOManager):
Expand All @@ -32,15 +32,17 @@ def load_input(self, context):

# io_manager_end

# asset_group_start
# gather_assets_start
# imports the module called "assets" from the package containing the current module
# the "assets" module contains the asset definitions
from . import assets
from dagster import load_assets_from_modules, with_resources

weather_assets = AssetGroup.from_modules(
modules=[assets],
weather_assets = with_resources(
load_assets_from_modules(modules=[assets]),
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
},
)
# asset_group_end

# gather_assets_end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from software_defined_assets.spark_weather_assets import spark_weather_assets

from dagster import materialize
from dagster.core.test_utils import instance_for_test


def test_airport_weather_assets():
with instance_for_test() as instance:
assert materialize(spark_weather_assets, instance=instance).success

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from software_defined_assets.weather_assets import weather_assets

from dagster import materialize
from dagster.core.test_utils import instance_for_test


def test_airport_weather_assets():
with instance_for_test() as instance:
assert materialize(weather_assets, instance=instance).success

This file was deleted.

1 comment on commit 0e1fc50

@vercel
Copy link

@vercel vercel bot commented on 0e1fc50 Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.