Skip to content

Commit

Permalink
Use AssetGroup in hacker news demo (#6965)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Mar 7, 2022
1 parent c193ccd commit 803063e
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 126 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from hacker_news_assets.resources import RESOURCES_LOCAL, RESOURCES_PROD, RESOURCES_STAGING

from dagster import AssetGroup, in_process_executor

prod_assets = AssetGroup.from_package_name(__name__, resource_defs=RESOURCES_PROD)
staging_assets = AssetGroup.from_package_name(__name__, resource_defs=RESOURCES_STAGING)
local_assets = AssetGroup.from_package_name(
__name__, resource_defs=RESOURCES_LOCAL, executor_def=in_process_executor
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import json
import os

import pandas as pd
from dagster_dbt import dbt_cli_resource
from dagster_dbt.asset_defs import load_assets_from_dbt_manifest
from hacker_news_assets.resources.snowflake_io_manager import (
SHARED_SNOWFLAKE_CONF,
connect_snowflake,
)

from dagster import MetadataValue
from dagster.utils import file_relative_path

DBT_PROJECT_DIR = file_relative_path(__file__, "../../hacker_news_dbt")
DBT_PROFILES_DIR = DBT_PROJECT_DIR + "/config"
dbt_staging_resource = dbt_cli_resource.configured(
{"profiles-dir": DBT_PROFILES_DIR, "project-dir": DBT_PROJECT_DIR, "target": "staging"}
)
dbt_prod_resource = dbt_cli_resource.configured(
{"profiles_dir": DBT_PROFILES_DIR, "project_dir": DBT_PROJECT_DIR, "target": "prod"}
)


def asset_metadata(_context, model_info):
config = dict(SHARED_SNOWFLAKE_CONF)
config["schema"] = model_info["schema"]
with connect_snowflake(config=config) as con:
df = pd.read_sql(f"SELECT * FROM {model_info['name']} LIMIT 5", con=con)
num_rows = con.execute(f"SELECT COUNT(*) FROM {model_info['name']}").fetchone()

return {"Data sample": MetadataValue.md(df.to_markdown()), "Rows": num_rows[0]}


assets = load_assets_from_dbt_manifest(
json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"))),
runtime_metadata_fn=asset_metadata,
io_manager_key="warehouse_io_manager",
)
Original file line number Diff line number Diff line change
@@ -1,54 +1,14 @@
import json
import os
from hacker_news_assets.assets import prod_assets, staging_assets

import pandas as pd
from dagster_dbt import dbt_cli_resource
from dagster_dbt.asset_defs import load_assets_from_dbt_manifest
from hacker_news_assets.resources import RESOURCES_PROD, RESOURCES_STAGING
from hacker_news_assets.resources.snowflake_io_manager import (
SHARED_SNOWFLAKE_CONF,
connect_snowflake,
)
from dagster import AssetGroup, JobDefinition

from dagster import MetadataValue, build_assets_job
from dagster.utils import file_relative_path

DBT_PROJECT_DIR = file_relative_path(__file__, "../../hacker_news_dbt")
DBT_PROFILES_DIR = DBT_PROJECT_DIR + "/config"
dbt_staging_resource = dbt_cli_resource.configured(
{"profiles-dir": DBT_PROFILES_DIR, "project-dir": DBT_PROJECT_DIR, "target": "staging"}
)
dbt_prod_resource = dbt_cli_resource.configured(
{"profiles_dir": DBT_PROFILES_DIR, "project_dir": DBT_PROJECT_DIR, "target": "prod"}
)
def make_activity_stats_job(asset_group: AssetGroup) -> JobDefinition:
return asset_group.build_job(
name="activity_stats",
selection=["comment_daily_stats", "story_daily_stats", "activity_daily_stats"],
)


def asset_metadata(_context, model_info):
config = dict(SHARED_SNOWFLAKE_CONF)
config["schema"] = model_info["schema"]
with connect_snowflake(config=config) as con:
df = pd.read_sql(f"SELECT * FROM {model_info['name']} LIMIT 5", con=con)
num_rows = con.execute(f"SELECT COUNT(*) FROM {model_info['name']}").fetchone()

return {"Data sample": MetadataValue.md(df.to_markdown()), "Rows": num_rows[0]}


# this list has one element per dbt model
assets = load_assets_from_dbt_manifest(
json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"))),
runtime_metadata_fn=asset_metadata,
io_manager_key="warehouse_io_manager",
)
activity_stats_staging_job = build_assets_job(
"activity_stats",
assets,
[],
resource_defs={**RESOURCES_STAGING, **{"dbt": dbt_prod_resource}},
)

activity_stats_prod_job = build_assets_job(
"activity_stats",
assets,
[],
resource_defs={**RESOURCES_PROD, **{"dbt": dbt_prod_resource}},
)
activity_stats_prod_job = make_activity_stats_job(prod_assets)
activity_stats_staging_job = make_activity_stats_job(staging_assets)
Original file line number Diff line number Diff line change
@@ -1,49 +1,24 @@
from hacker_news_assets.assets.id_range_for_time import id_range_for_time
from hacker_news_assets.assets.items import comments, items, stories
from hacker_news_assets.resources import RESOURCES_LOCAL, RESOURCES_PROD, RESOURCES_STAGING
from hacker_news_assets.resources.hn_resource import hn_api_subsample_client, hn_snapshot_client

from dagster import build_assets_job, in_process_executor

DOWNLOAD_TAGS = {
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "500m", "memory": "2Gi"},
from hacker_news_assets.assets import local_assets, prod_assets, staging_assets

from dagster import AssetGroup, JobDefinition


def make_download_job(asset_group: AssetGroup) -> JobDefinition:
return asset_group.build_job(
name="hacker_news_api_download",
selection=["*comments", "*stories"],
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "500m", "memory": "2Gi"},
}
},
}
},
}
}


ASSETS = [id_range_for_time, items, comments, stories]


download_prod_job = build_assets_job(
"hacker_news_api_download",
assets=ASSETS,
resource_defs={
**{"hn_client": hn_api_subsample_client.configured({"sample_rate": 10})},
**RESOURCES_PROD,
},
tags=DOWNLOAD_TAGS,
)

)

download_staging_job = build_assets_job(
"hacker_news_api_download",
assets=ASSETS,
resource_defs={
**{"hn_client": hn_api_subsample_client.configured({"sample_rate": 10})},
**RESOURCES_STAGING,
},
tags=DOWNLOAD_TAGS,
)

download_local_job = build_assets_job(
"hacker_news_api_download",
assets=ASSETS,
resource_defs={**{"hn_client": hn_snapshot_client}, **RESOURCES_LOCAL},
tags=DOWNLOAD_TAGS,
executor_def=in_process_executor,
)
download_prod_job = make_download_job(prod_assets)
download_staging_job = make_download_job(staging_assets)
download_local_job = make_download_job(local_assets)
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
from hacker_news_assets.assets.comment_stories import comment_stories
from hacker_news_assets.assets.items import comments, stories
from hacker_news_assets.assets.recommender_model import component_top_stories, recommender_model
from hacker_news_assets.assets.user_story_matrix import user_story_matrix
from hacker_news_assets.assets.user_top_recommended_stories import user_top_recommended_stories
from hacker_news_assets.resources import RESOURCES_PROD, RESOURCES_STAGING
from hacker_news_assets.assets import prod_assets, staging_assets

from dagster import build_assets_job
from dagster import AssetGroup, JobDefinition

assets = [
comment_stories,
user_story_matrix,
recommender_model,
component_top_stories,
user_top_recommended_stories,
]

source_assets = [comments, stories]
def make_story_recommender_job(asset_group: AssetGroup) -> JobDefinition:
return asset_group.build_job(
name="story_recommender",
selection=["comment_stories*"],
)

story_recommender_prod_job = build_assets_job(
"story_recommender",
assets=assets,
source_assets=source_assets,
resource_defs=RESOURCES_PROD,
)

story_recommender_staging_job = build_assets_job(
"story_recommender",
assets=assets,
source_assets=source_assets,
resource_defs=RESOURCES_STAGING,
)
story_recommender_prod_job = make_story_recommender_job(prod_assets)
story_recommender_staging_job = make_story_recommender_job(staging_assets)
3 changes: 3 additions & 0 deletions examples/hacker_news_assets/hacker_news_assets/repo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dagster import repository, schedule_from_partitions

from .assets import prod_assets, staging_assets
from .jobs.activity_stats import activity_stats_prod_job, activity_stats_staging_job
from .jobs.hacker_news_api_download import download_prod_job, download_staging_job
from .jobs.story_recommender import story_recommender_prod_job, story_recommender_staging_job
Expand All @@ -10,6 +11,7 @@
@repository
def hacker_news_assets_prod():
return [
prod_assets,
schedule_from_partitions(download_prod_job),
make_slack_on_failure_sensor(base_url="my_dagit_url.com"),
make_hn_tables_updated_sensor(activity_stats_prod_job),
Expand All @@ -20,6 +22,7 @@ def hacker_news_assets_prod():
@repository
def hacker_news_assets_staging():
return [
staging_assets,
schedule_from_partitions(download_staging_job),
make_slack_on_failure_sensor(base_url="my_dagit_url.com"),
make_hn_tables_updated_sensor(activity_stats_staging_job),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
import os

from dagster_aws.s3 import s3_resource
from dagster_dbt import dbt_cli_resource
from dagster_pyspark import pyspark_resource

from dagster import ResourceDefinition
from dagster.utils import file_relative_path

from .common_bucket_s3_pickle_io_manager import common_bucket_s3_pickle_io_manager
from .hn_resource import hn_api_subsample_client, hn_snapshot_client
from .parquet_io_manager import (
local_partitioned_parquet_io_manager,
s3_partitioned_parquet_io_manager,
)
from .snowflake_io_manager import snowflake_io_manager

DBT_PROJECT_DIR = file_relative_path(__file__, "../../hacker_news_dbt")
DBT_PROFILES_DIR = DBT_PROJECT_DIR + "/config"
dbt_staging_resource = dbt_cli_resource.configured(
{"profiles-dir": DBT_PROFILES_DIR, "project-dir": DBT_PROJECT_DIR, "target": "staging"}
)
dbt_prod_resource = dbt_cli_resource.configured(
{"profiles_dir": DBT_PROFILES_DIR, "project_dir": DBT_PROJECT_DIR, "target": "prod"}
)


configured_pyspark = pyspark_resource.configured(
{
"spark_conf": {
Expand Down Expand Up @@ -41,6 +54,8 @@
"warehouse_io_manager": snowflake_io_manager_prod,
"pyspark": configured_pyspark,
"warehouse_loader": snowflake_io_manager_prod,
"hn_client": hn_api_subsample_client.configured({"sample_rate": 10}),
"dbt": dbt_prod_resource,
}

snowflake_io_manager_staging = snowflake_io_manager.configured(
Expand All @@ -56,6 +71,8 @@
"warehouse_io_manager": snowflake_io_manager_staging,
"pyspark": configured_pyspark,
"warehouse_loader": snowflake_io_manager_staging,
"hn_client": hn_api_subsample_client.configured({"sample_rate": 10}),
"dbt": dbt_staging_resource,
}


Expand All @@ -64,4 +81,6 @@
"warehouse_io_manager": local_partitioned_parquet_io_manager,
"pyspark": configured_pyspark,
"warehouse_loader": snowflake_io_manager_prod,
"hn_client": hn_snapshot_client,
"dbt": ResourceDefinition.none_resource(),
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import tempfile

from dagster_pyspark import pyspark_resource
from hacker_news_assets.jobs.hacker_news_api_download import ASSETS
from hacker_news_assets.resources.hn_resource import hn_snapshot_client
from hacker_news_assets.resources.parquet_io_manager import local_partitioned_parquet_io_manager

from dagster import ResourceDefinition, build_assets_job, fs_io_manager, mem_io_manager
from dagster import AssetGroup, ResourceDefinition, fs_io_manager, mem_io_manager


def test_download():
with tempfile.TemporaryDirectory() as temp_dir:
test_job = build_assets_job(
"test_job",
assets=ASSETS,
test_job = AssetGroup.from_package_name(
"hacker_news_assets.assets",
resource_defs={
"io_manager": fs_io_manager,
"partition_start": ResourceDefinition.string_resource(),
Expand All @@ -23,8 +21,13 @@ def test_download():
"warehouse_io_manager": mem_io_manager,
"pyspark": pyspark_resource,
"hn_client": hn_snapshot_client,
"dbt": ResourceDefinition.none_resource(),
},
).build_job(
"test_job",
selection=["*comments", "*stories"],
)

result = test_job.execute_in_process(partition_key="2020-12-30-00:00")

assert result.success

0 comments on commit 803063e

Please sign in to comment.