Skip to content

Commit

Permalink
Add namespaces / prefixes to the hacker news assets demo (#7782)
Browse files Browse the repository at this point in the history
* validate collisions

* do not error on source asset collision

* hacker news namespaced
  • Loading branch information
sryza committed May 9, 2022
1 parent 5f1e9fb commit 8cfa136
Show file tree
Hide file tree
Showing 49 changed files with 251 additions and 182 deletions.
5 changes: 5 additions & 0 deletions examples/hacker_news_assets/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Regenerate the dbt manifest.json
dbt:
dbt ls \
--project-dir hacker_news_assets/activity_analytics/hacker_news_dbt \
--profiles-dir hacker_news_assets/activity_analytics/hacker_news_dbt/config
49 changes: 48 additions & 1 deletion examples/hacker_news_assets/hacker_news_assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,48 @@
from .repo import hacker_news_assets_local, hacker_news_assets_prod, hacker_news_assets_staging
from hacker_news_assets.activity_analytics import (
activity_analytics_definitions_local,
activity_analytics_definitions_prod,
activity_analytics_definitions_staging,
)
from hacker_news_assets.core import (
core_definitions_local,
core_definitions_prod,
core_definitions_staging,
)
from hacker_news_assets.recommender import (
recommender_definitions_local,
recommender_definitions_prod,
recommender_definitions_staging,
)

from dagster import repository

from .sensors.slack_on_failure_sensor import make_slack_on_failure_sensor


@repository
def prod():
return [
*core_definitions_prod,
*recommender_definitions_prod,
*activity_analytics_definitions_prod,
make_slack_on_failure_sensor(base_url="my_dagit_url"),
]


@repository
def staging():
return [
*core_definitions_staging,
*recommender_definitions_staging,
*activity_analytics_definitions_staging,
make_slack_on_failure_sensor(base_url="my_dagit_url"),
]


@repository
def local():
return [
*core_definitions_local,
*recommender_definitions_local,
*activity_analytics_definitions_local,
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import os

from dagster_dbt import dbt_cli_resource
from dagster_dbt.asset_defs import load_assets_from_dbt_manifest
from hacker_news_assets.resources import RESOURCES_LOCAL, RESOURCES_PROD, RESOURCES_STAGING
from hacker_news_assets.sensors.hn_tables_updated_sensor import make_hn_tables_updated_sensor

from dagster import AssetGroup
from dagster.utils import file_relative_path

from . import assets

DBT_PROJECT_DIR = file_relative_path(__file__, "hacker_news_dbt")
DBT_PROFILES_DIR = DBT_PROJECT_DIR + "/config"
dbt_staging_resource = dbt_cli_resource.configured(
{"profiles-dir": DBT_PROFILES_DIR, "project-dir": DBT_PROJECT_DIR, "target": "staging"}
)
dbt_prod_resource = dbt_cli_resource.configured(
{"profiles_dir": DBT_PROFILES_DIR, "project_dir": DBT_PROJECT_DIR, "target": "prod"}
)


dbt_assets = load_assets_from_dbt_manifest(
json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"), encoding="utf-8")),
io_manager_key="warehouse_io_manager",
)

activity_analytics_assets_prod = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_PROD
).prefixed("activity_analytics") + AssetGroup(dbt_assets, resource_defs=RESOURCES_PROD)

activity_analytics_assets_staging = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_STAGING
).prefixed("activity_analytics") + AssetGroup(dbt_assets, resource_defs=RESOURCES_STAGING)

activity_analytics_assets_local = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_LOCAL
).prefixed("activity_analytics") + AssetGroup(dbt_assets, resource_defs=RESOURCES_LOCAL)


activity_analytics_assets_sensor_prod = make_hn_tables_updated_sensor(
activity_analytics_assets_prod.build_job(name="story_activity_analytics_job")
)

activity_analytics_assets_sensor_staging = make_hn_tables_updated_sensor(
activity_analytics_assets_staging.build_job(name="story_activity_analytics_job")
)

activity_analytics_assets_sensor_local = make_hn_tables_updated_sensor(
activity_analytics_assets_local.build_job(name="story_activity_analytics_job")
)

activity_analytics_definitions_prod = [
activity_analytics_assets_prod,
activity_analytics_assets_sensor_prod,
]


activity_analytics_definitions_staging = [
activity_analytics_assets_staging,
activity_analytics_assets_sensor_staging,
]

activity_analytics_definitions_local = [
activity_analytics_assets_local,
activity_analytics_assets_sensor_local,
]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
from dagster import AssetIn, asset


@asset(ins={"activity_daily_stats": AssetIn(namespace="hackernews")})
@asset(ins={"activity_daily_stats": AssetIn(namespace="activity_analytics")})
def activity_forecast(activity_daily_stats: DataFrame) -> DataFrame:
return activity_daily_stats.head(100)
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
hacker_news_dbt:
target: dev
target: local
outputs:
local:
type: duckdb
path: hackernews.duckdb
schema: hackernews
schema: activity_analytics

staging:
type: snowflake
Expand All @@ -14,9 +14,9 @@ hacker_news_dbt:
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"

database: DEMO_DB
database: DEMO_DB_STAGING
warehouse: TINY_WAREHOUSE
schema: hackernews_dev
schema: activity_analytics
client_session_keep_alive: False

prod:
Expand All @@ -29,5 +29,5 @@ hacker_news_dbt:

database: DEMO_DB
warehouse: TINY_WAREHOUSE
schema: hackernews
schema: activity_analytics
client_session_keep_alive: False
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT date_trunc('day', to_timestamp(time::int)) as date,
count(DISTINCT user_id) AS commenting_users,
count(*) AS num_comments
FROM {{ source('hackernews', 'comments') }}
FROM {{ source('core', 'comments') }}
GROUP BY 1
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: 2

sources:
- name: hackernews
- name: core
tables:
- name: comments
- name: stories
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT date_trunc('day', to_timestamp(time::int)) as date,
count(DISTINCT user_id) AS posting_users,
count(*) AS num_stories
FROM {{ source('hackernews', 'stories') }}
FROM {{ source('core', 'stories') }}
GROUP BY 1

Large diffs are not rendered by default.

This file was deleted.

This file was deleted.

42 changes: 42 additions & 0 deletions examples/hacker_news_assets/hacker_news_assets/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from hacker_news_assets.resources import RESOURCES_LOCAL, RESOURCES_PROD, RESOURCES_STAGING

from dagster import AssetGroup, schedule_from_partitions

from . import assets

core_assets_prod = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_PROD
).prefixed("core")
core_assets_staging = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_STAGING
).prefixed("core")
core_assets_local = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_LOCAL
).prefixed("core")

RUN_TAGS = {
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "500m", "memory": "2Gi"},
}
},
}
}

core_assets_schedule_prod = schedule_from_partitions(
core_assets_prod.build_job(name="core_job", tags=RUN_TAGS)
)

core_assets_schedule_staging = schedule_from_partitions(
core_assets_staging.build_job(name="core_job", tags=RUN_TAGS)
)

core_assets_schedule_local = schedule_from_partitions(
core_assets_local.build_job(name="core_job", tags=RUN_TAGS)
)


core_definitions_prod = [core_assets_prod, core_assets_schedule_prod]
core_definitions_staging = [core_assets_staging, core_assets_schedule_staging]
core_definitions_local = [core_assets_local, core_assets_schedule_local]

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from hacker_news_assets.core import core_assets_local, core_assets_prod, core_assets_staging
from hacker_news_assets.resources import RESOURCES_LOCAL, RESOURCES_PROD, RESOURCES_STAGING
from hacker_news_assets.sensors.hn_tables_updated_sensor import make_hn_tables_updated_sensor

from dagster import AssetGroup

from . import assets

recommender_assets_prod = AssetGroup.from_package_module(
package_module=assets,
extra_source_assets=core_assets_prod.to_source_assets(),
resource_defs=RESOURCES_PROD,
).prefixed("recommender")

recommender_assets_staging = AssetGroup.from_package_module(
package_module=assets,
extra_source_assets=core_assets_staging.to_source_assets(),
resource_defs=RESOURCES_STAGING,
).prefixed("recommender")

recommender_assets_local = AssetGroup.from_package_module(
package_module=assets,
extra_source_assets=core_assets_local.to_source_assets(),
resource_defs=RESOURCES_LOCAL,
).prefixed("recommender")

recommender_assets_sensor_prod = make_hn_tables_updated_sensor(
recommender_assets_prod.build_job(name="story_recommender_job")
)
recommender_assets_sensor_staging = make_hn_tables_updated_sensor(
recommender_assets_staging.build_job(name="story_recommender_job")
)
recommender_assets_sensor_local = make_hn_tables_updated_sensor(
recommender_assets_local.build_job(name="story_recommender_job")
)

recommender_definitions_prod = [recommender_assets_prod, recommender_assets_sensor_prod]
recommender_definitions_staging = [recommender_assets_staging, recommender_assets_sensor_staging]
recommender_definitions_local = [recommender_assets_local, recommender_assets_sensor_local]
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

@asset(
ins={
"stories": AssetIn(metadata={"columns": ["id"]}),
"comments": AssetIn(metadata={"columns": ["id", "user_id", "parent"]}),
"stories": AssetIn(namespace="core", metadata={"columns": ["id"]}),
"comments": AssetIn(namespace="core", metadata={"columns": ["id", "user_id", "parent"]}),
},
io_manager_key="warehouse_io_manager",
)
Expand Down

0 comments on commit 8cfa136

Please sign in to comment.