Skip to content

Commit

Permalink
hn local repo that uses duckdb as warehouse (#6968)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Mar 16, 2022
1 parent ee146ef commit 0f61c12
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,6 @@ pythonenv*/

# Vim project-local settings
.vim

# DuckDB
*.duckdb
2 changes: 1 addition & 1 deletion examples/hacker_news_assets/hacker_news_assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .repo import hacker_news_assets_prod, hacker_news_assets_staging
from .repo import hacker_news_assets_local, hacker_news_assets_prod, hacker_news_assets_staging
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from hacker_news_assets.assets import prod_assets, staging_assets
from hacker_news_assets.assets import local_assets, prod_assets, staging_assets

from dagster import AssetGroup, JobDefinition

Expand All @@ -17,3 +17,4 @@ def make_activity_stats_job(asset_group: AssetGroup) -> JobDefinition:

activity_stats_prod_job = make_activity_stats_job(prod_assets)
activity_stats_staging_job = make_activity_stats_job(staging_assets)
activity_stats_local_job = make_activity_stats_job(local_assets)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from hacker_news_assets.assets import prod_assets, staging_assets
from hacker_news_assets.assets import local_assets, prod_assets, staging_assets

from dagster import AssetGroup, JobDefinition

Expand All @@ -12,3 +12,4 @@ def make_story_recommender_job(asset_group: AssetGroup) -> JobDefinition:

story_recommender_prod_job = make_story_recommender_job(prod_assets)
story_recommender_staging_job = make_story_recommender_job(staging_assets)
story_recommender_local_job = make_story_recommender_job(local_assets)
25 changes: 21 additions & 4 deletions examples/hacker_news_assets/hacker_news_assets/repo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from dagster import repository, schedule_from_partitions

from .assets import prod_assets, staging_assets
from .jobs.activity_stats import activity_stats_prod_job, activity_stats_staging_job
from .jobs.hacker_news_api_download import download_prod_job, download_staging_job
from .jobs.story_recommender import story_recommender_prod_job, story_recommender_staging_job
from .assets import local_assets, prod_assets, staging_assets
from .jobs.activity_stats import (
activity_stats_local_job,
activity_stats_prod_job,
activity_stats_staging_job,
)
from .jobs.hacker_news_api_download import (
download_local_job,
download_prod_job,
download_staging_job,
)
from .jobs.story_recommender import (
story_recommender_local_job,
story_recommender_prod_job,
story_recommender_staging_job,
)
from .sensors.hn_tables_updated_sensor import make_hn_tables_updated_sensor
from .sensors.slack_on_failure_sensor import make_slack_on_failure_sensor

Expand All @@ -28,3 +40,8 @@ def hacker_news_assets_staging():
make_hn_tables_updated_sensor(activity_stats_staging_job),
make_hn_tables_updated_sensor(story_recommender_staging_job),
]


@repository
def hacker_news_assets_local():
return [local_assets, download_local_job, activity_stats_local_job, story_recommender_local_job]
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from dagster.utils import file_relative_path

from .common_bucket_s3_pickle_io_manager import common_bucket_s3_pickle_io_manager
from .hn_resource import hn_api_subsample_client, hn_snapshot_client
from .duckdb_parquet_io_manager import duckdb_partitioned_parquet_io_manager
from .hn_resource import hn_api_client, hn_api_subsample_client
from .parquet_io_manager import (
local_partitioned_parquet_io_manager,
s3_partitioned_parquet_io_manager,
Expand All @@ -17,6 +18,9 @@

DBT_PROJECT_DIR = file_relative_path(__file__, "../../hacker_news_dbt")
DBT_PROFILES_DIR = DBT_PROJECT_DIR + "/config"
dbt_local_resource = dbt_cli_resource.configured(
{"profiles_dir": DBT_PROFILES_DIR, "project_dir": DBT_PROJECT_DIR, "target": "local"}
)
dbt_staging_resource = dbt_cli_resource.configured(
{"profiles-dir": DBT_PROFILES_DIR, "project-dir": DBT_PROJECT_DIR, "target": "staging"}
)
Expand Down Expand Up @@ -76,8 +80,10 @@

RESOURCES_LOCAL = {
"parquet_io_manager": local_partitioned_parquet_io_manager,
"warehouse_io_manager": local_partitioned_parquet_io_manager,
"warehouse_io_manager": duckdb_partitioned_parquet_io_manager.configured(
{"duckdb_path": os.path.join(DBT_PROJECT_DIR, "hackernews.duckdb")},
),
"pyspark": configured_pyspark,
"hn_client": hn_snapshot_client,
"dbt": ResourceDefinition.none_resource(),
"hn_client": hn_api_client,
"dbt": dbt_local_resource,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os

import duckdb
import pandas as pd

from dagster import Field, check, io_manager
from dagster.seven.temp_dir import get_system_temp_directory

from .parquet_io_manager import PartitionedParquetIOManager


class DuckDBPartitionedParquetIOManager(PartitionedParquetIOManager):
"""Stores data in parquet files and creates duckdb views over those files."""

def handle_output(self, context, obj):
if obj is not None: # if this is a dbt output, then the value will be None
yield from super().handle_output(context, obj)
con = self._connect_duckdb(context)

path = self._get_path(context)
if context.has_asset_partitions:
to_scan = os.path.join(os.path.dirname(path), "*.pq", "*.parquet")
else:
to_scan = path
con.execute("create schema if not exists hackernews;")
con.execute(
f"create or replace view {self._table_path(context)} as "
f"select * from parquet_scan('{to_scan}');"
)

def load_input(self, context):
check.invariant(not context.has_asset_partitions, "Can't load partitioned inputs")

if context.dagster_type.typing_type == pd.DataFrame:
con = self._connect_duckdb(context)
return con.execute(f"SELECT * FROM {self._table_path(context)}").fetchdf()

check.failed(
f"Inputs of type {context.dagster_type} not supported. Please specify a valid type "
"for this input either on the argument of the @asset-decorated function."
)

def _table_path(self, context):
return f"hackernews.{context.asset_key.path[-1]}"

def _connect_duckdb(self, context):
return duckdb.connect(database=context.resource_config["duckdb_path"], read_only=False)


@io_manager(
config_schema={"base_path": Field(str, is_required=False), "duckdb_path": str},
required_resource_keys={"pyspark"},
)
def duckdb_partitioned_parquet_io_manager(init_context):
return DuckDBPartitionedParquetIOManager(
base_path=init_context.resource_config.get("base_path", get_system_temp_directory())
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ def __init__(self, base_path):
def handle_output(
self, context: OutputContext, obj: Union[pandas.DataFrame, pyspark.sql.DataFrame]
):

path = self._get_path(context)
if "://" not in self._base_path:
os.makedirs(os.path.dirname(path), exist_ok=True)

if isinstance(obj, pandas.DataFrame):
row_count = len(obj)
context.log.info(f"Row count: {row_count}")
obj.to_parquet(path=path, index=False)
elif isinstance(obj, pyspark.sql.DataFrame):
row_count = obj.count()
Expand All @@ -39,7 +42,6 @@ def handle_output(
yield MetadataEntry.path(path=path, label="path")

def load_input(self, context) -> Union[pyspark.sql.DataFrame, str]:
# In this load_input function, we vary the behavior based on the type of the downstream input
path = self._get_path(context.upstream_output)
if context.dagster_type.typing_type == pyspark.sql.DataFrame:
# return pyspark dataframe
Expand All @@ -51,17 +53,15 @@ def load_input(self, context) -> Union[pyspark.sql.DataFrame, str]:
)

def _get_path(self, context: OutputContext):
# filesystem-friendly string that is scoped to the start/end times of the data slice
start, end = context.asset_partitions_time_window
dt_format = "%Y%m%d%H%M%S"
partition_str = start.strftime(dt_format) + "_" + end.strftime(dt_format)

key = context.asset_key.path[-1]
# if local fs path, store all outptus in same directory
if "://" not in self._base_path:
return os.path.join(self._base_path, f"{key}-{partition_str}.pq")
# otherwise seperate into different dirs
return os.path.join(self._base_path, key, f"{partition_str}.pq")

if context.has_asset_partitions:
start, end = context.asset_partitions_time_window
dt_format = "%Y%m%d%H%M%S"
partition_str = start.strftime(dt_format) + "_" + end.strftime(dt_format)
return os.path.join(self._base_path, key, f"{partition_str}.pq")
else:
return os.path.join(self._base_path, f"{key}.pq")


@io_manager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def spark_input_asset(pandas_df_asset: SparkDF):
},
)

expected_path = os.path.join(temp_dir, "pandas_df_asset-20220101160000_20220101170000.pq")
expected_path = os.path.join(
temp_dir, "pandas_df_asset", "20220101160000_20220101170000.pq"
)
res = io_manager_test_job.execute_in_process(partition_key="2022-01-01-16:00")
assert res.success
assert os.path.exists(expected_path)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
hacker_news_dbt:
target: dev
outputs:
dev:
local:
type: duckdb
path: hackernews.duckdb
schema: hackernews

staging:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
SELECT date_trunc("day", to_timestamp(time::int)) as date,
SELECT date_trunc('day', to_timestamp(time::int)) as date,
count(DISTINCT user_id) AS commenting_users,
count(*) AS num_comments
FROM {{ source('hackernews', 'comments') }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
SELECT date_trunc("day", to_timestamp(time::int)) as date,
SELECT date_trunc('day', to_timestamp(time::int)) as date,
count(DISTINCT user_id) AS posting_users,
count(*) AS num_stories
FROM {{ source('hackernews', 'stories') }}
Expand Down

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions examples/hacker_news_assets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"dagster-slack",
"dagster-postgres",
"dbt-core",
"dbt-duckdb",
"dbt-snowflake",
"duckdb",
"mock",
# DataFrames were not written to Snowflake, causing errors
"pandas<1.4.0",
Expand Down

0 comments on commit 0f61c12

Please sign in to comment.