Skip to content

Commit

Permalink
[docs] - SDA guide for existing Dagster users [CON-30] (#8188)
Browse files Browse the repository at this point in the history
* asset upgrade guide

* Updates

* Update title and page description

* Update docs/content/guides/dagster/enriching-with-software-defined-assets.mdx

Co-authored-by: Claire Lin <claire@elementl.com>

* Specify input/output

* Rename example section and move concepts down

* Re-work ergonomics

* Break apart bullet item

* Run snapshot; add OpExecutionContext

* Clarify ops in assets

* Update intro

* sda new apis

* fix broken anchor link

Co-authored-by: Erin Cochran <erin.k.cochran@gmail.com>
Co-authored-by: Claire Lin <claire@elementl.com>
  • Loading branch information
3 people committed Jun 9, 2022
1 parent e0879bf commit 82d510b
Show file tree
Hide file tree
Showing 15 changed files with 718 additions and 9 deletions.
19 changes: 10 additions & 9 deletions docs/content/guides.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

This section explains how to accomplish common tasks in Dagster and showcases Dagster's experimental features.

| Name | Description | |
| ------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------- | - |
| [Migrating to Graphs, Jobs, and Ops](/guides/dagster/graph_job_op) | This guide describes how to migrate to the Graph, Job, and Op APIs from the legacy Dagster APIs (Solids and Pipelines). | |
| [Versioning and Memoization](/guides/dagster/memoization) | This guide describes how to use Dagster's versioning and memoization features. <Experimental /> | |
| [Software-Defined Assets with Pandas and PySpark](/guides/dagster/software-defined-assets) | This guide offers a fast introduction to software-defined assets, with Pandas and PySpark. <Experimental /> | |
| [Run Attribution](/guides/dagster/run-attribution) | This guide describes how to perform Run Attribution by using a Custom Run Coordinator <Experimental /> | |
| [Re-execution](/guides/dagster/re-execution) | This guide describes how to re-execute a job within Dagit and using Dagster's APIs. | |
| [Fully-Featured Example Project](/guides/dagster/example_project) | This guide describes the Hacker News example project, which takes advantage of many of Dagster's features | |
| [Validating Data with Dagster Type Factories](/guides/dagster/dagster_type_factories) | This guide illustrates the use of a Dagster Type factory to validate Pandas dataframes using the third-party library Pandera. | |
| Name | Description | |
| ---------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------- | - |
| [Upgrading to Software-Defined Assets](/guides/dagster/enriching-with-software-defined-assets) | This guide describes how to enrich what you've built in Dagster with software-defined assets. | |
| [Migrating to Graphs, Jobs, and Ops](/guides/dagster/graph_job_op) | This guide describes how to migrate to the Graph, Job, and Op APIs from the legacy Dagster APIs (Solids and Pipelines). | |
| [Versioning and Memoization](/guides/dagster/memoization) | This guide describes how to use Dagster's versioning and memoization features. <Experimental /> | |
| [Software-Defined Assets with Pandas and PySpark](/guides/dagster/software-defined-assets) | This guide offers a fast introduction to software-defined assets, with Pandas and PySpark. | |
| [Run Attribution](/guides/dagster/run-attribution) | This guide describes how to perform Run Attribution by using a Custom Run Coordinator <Experimental /> | |
| [Re-execution](/guides/dagster/re-execution) | This guide describes how to re-execute a job within Dagit and using Dagster's APIs. | |
| [Fully-Featured Example Project](/guides/dagster/example_project) | This guide describes the Hacker News example project, which takes advantage of many of Dagster's features | |
| [Validating Data with Dagster Type Factories](/guides/dagster/dagster_type_factories) | This guide illustrates the use of a Dagster Type factory to validate Pandas dataframes using the third-party library Pandera. | |
407 changes: 407 additions & 0 deletions docs/content/guides/dagster/enriching-with-software-defined-assets.mdx

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Any
from unittest.mock import MagicMock

from pandas import DataFrame

from dagster import IOManager, io_manager


def create_db_connection() -> Any:
return MagicMock()


def train_recommender_model(df: DataFrame) -> Any:
del df


def pickle_to_s3(object: Any, key: str) -> None:
pass


def fetch_products() -> DataFrame:
return DataFrame({"product": ["knive"], "category": ["kitchenware"]})


@io_manager
def snowflake_io_manager():
class SnowflakeIOManager(IOManager):
def handle_output(self, context, obj):
del context
del obj

def load_input(self, context):
return DataFrame()

return SnowflakeIOManager()


@io_manager
def s3_io_manager():
class S3IOManager(IOManager):
def handle_output(self, context, obj):
del context
del obj

def load_input(self, context):
return None

return S3IOManager()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from pandas import DataFrame

from dagster import AssetsDefinition, GraphOut, define_asset_job, graph, op, repository

from .mylib import create_db_connection, fetch_products


@op
def extract_products() -> DataFrame:
return fetch_products()


@op
def get_categories(products: DataFrame) -> DataFrame:
return DataFrame({"category": products["category"].unique()})


@op
def write_products_table(products: DataFrame) -> None:
products.to_sql(name="categories", con=create_db_connection())


@op
def write_categories_table(categories: DataFrame) -> None:
categories.to_sql(name="categories", con=create_db_connection())


@graph(out={"products": GraphOut(), "categories": GraphOut()})
def ingest_graph():
products = extract_products()
product_categories = get_categories(products)
return write_products_table(products), write_categories_table(product_categories)


two_tables = AssetsDefinition.from_graph(ingest_graph)


@repository
def repo():
return [two_tables, define_asset_job("products_and_categories_job")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pandas import DataFrame

from dagster import SourceAsset, asset, define_asset_job, repository, with_resources

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model

raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")


@asset(io_manager_key="warehouse")
def users(raw_users: DataFrame) -> DataFrame:
users_df = raw_users.dropna()
return users_df


@asset(io_manager_key="object_store")
def user_recommender_model(users: DataFrame):
users_recommender_model = train_recommender_model(users)
return users_recommender_model


@repository
def repo():
return [
*with_resources(
[raw_users, users, user_recommender_model],
resource_defs={
"warehouse": snowflake_io_manager,
"object_store": s3_io_manager,
},
),
define_asset_job("users_recommender_job"),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from pandas import read_sql

from dagster import asset, define_asset_job, repository

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model


@asset(non_argument_deps={"raw_users"})
def users():
raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
users_df.to_sql(name="users", con=create_db_connection())


@asset(non_argument_deps={"users"})
def user_recommender_model():
users_df = read_sql(f"select * from users", con=create_db_connection())
users_recommender_model = train_recommender_model(users_df)
pickle_to_s3(users_recommender_model, key="users_recommender_model")


@repository
def repo():
return [users, user_recommender_model, define_asset_job("users_recommender_job")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pandas import DataFrame

from dagster import job, op, repository

from .mylib import create_db_connection, fetch_products


@op
def extract_products() -> DataFrame:
return fetch_products()


@op
def get_categories(products: DataFrame) -> DataFrame:
return DataFrame({"category": products["category"].unique()})


@op
def write_products_table(products: DataFrame) -> None:
products.to_sql(name="categories", con=create_db_connection())


@op
def write_categories_table(categories: DataFrame) -> None:
categories.to_sql(name="categories", con=create_db_connection())


@job
def ingest_products_and_categories():
products = extract_products()
product_categories = get_categories(products)
return write_products_table(products), write_categories_table(product_categories)


@repository
def repo():
return [ingest_products_and_categories]
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pandas import DataFrame

from dagster import In, Out, job, op, repository

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model


@op(
ins={"raw_users": In(root_manager_key="warehouse")},
out={"users": Out(io_manager_key="warehouse")},
)
def build_users(raw_users: DataFrame) -> DataFrame:
users_df = raw_users.dropna()
return users_df


@op(out={"users_recommender_model": Out(io_manager_key="object_store")})
def build_user_recommender_model(users: DataFrame):
users_recommender_model = train_recommender_model(users)
return users_recommender_model


@job(resource_defs={"warehouse": snowflake_io_manager, "object_store": s3_io_manager})
def users_recommender_job():
build_user_recommender_model(build_users())


@repository
def repo():
return [users_recommender_job]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pandas import read_sql

from dagster import In, Nothing, job, op, repository

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model


@op
def build_users():
raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
users_df.to_sql(name="users", con=create_db_connection())


@op(ins={"users": In(Nothing)})
def build_user_recommender_model():
users_df = read_sql(f"select * from users", con=create_db_connection())
users_recommender_model = train_recommender_model(users_df)
pickle_to_s3(users_recommender_model, key="users_recommender_model")


@job
def users_recommender_job():
build_user_recommender_model(build_users())


@repository
def repo():
return [users_recommender_job]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from docs_snippets.guides.dagster.enriching_with_software_defined_assets.sda_graph import (
repo,
)


def test_sda_graph():
assert repo.get_job("products_and_categories_job").execute_in_process().success
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from docs_snippets.guides.dagster.enriching_with_software_defined_assets.sda_io_manager import (
repo,
)


def test_sda_nothing():
assert repo.get_job("users_recommender_job").execute_in_process().success
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from docs_snippets.guides.dagster.enriching_with_software_defined_assets.sda_nothing import (
repo,
)


def test_sda_nothing():
assert repo.get_job("users_recommender_job").execute_in_process().success
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from docs_snippets.guides.dagster.enriching_with_software_defined_assets.vanilla_graph import (
ingest_products_and_categories,
)


def test_ingest_products_and_categories():
assert ingest_products_and_categories.execute_in_process().success
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from unittest.mock import patch

from pandas import DataFrame

from docs_snippets.guides.dagster.enriching_with_software_defined_assets.vanilla_io_manager import (
users_recommender_job,
)


def test_users_recommender_job():
with patch(
"docs_snippets.guides.dagster.enriching_with_software_defined_assets.vanilla_nothing.read_sql"
) as mock_read_sql:
mock_read_sql.return_value = DataFrame([{"COL1": "a", "COL2": 1}])

assert users_recommender_job.execute_in_process().success
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from unittest.mock import patch

from pandas import DataFrame

from docs_snippets.guides.dagster.enriching_with_software_defined_assets.vanilla_nothing import (
users_recommender_job,
)


def test_users_recommender_job():
with patch(
"docs_snippets.guides.dagster.enriching_with_software_defined_assets.vanilla_nothing.read_sql"
) as mock_read_sql:
mock_read_sql.return_value = DataFrame([{"COL1": "a", "COL2": 1}])

assert users_recommender_job.execute_in_process().success

1 comment on commit 82d510b

@vercel
Copy link

@vercel vercel bot commented on 82d510b Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.