Skip to content

Commit

Permalink
[examples] Modern Data Stack + SDA Example (#6862)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Mar 2, 2022
1 parent 6092f3e commit 42d2d72
Show file tree
Hide file tree
Showing 25 changed files with 10,569 additions and 10 deletions.
2 changes: 2 additions & 0 deletions examples/modern_data_stack_assets/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
This is an example of how to use the Software-Defined Asset APIs alongside Modern Data Stack tools
(specifically, Airbyte and dbt).
4 changes: 4 additions & 0 deletions examples/modern_data_stack_assets/mds_dbt/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_packages/
logs/
15 changes: 15 additions & 0 deletions examples/modern_data_stack_assets/mds_dbt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Welcome to your new dbt project!

### Using the starter project

Try running the following commands:
- dbt run
- dbt test


### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
Empty file.
23 changes: 23 additions & 0 deletions examples/modern_data_stack_assets/mds_dbt/config/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
mds_dbt:
target: prod
outputs:
prod:
type: postgres
host: localhost
port: 5432
user: postgres
pass: password
dbname: postgres_replica
schema: public
threads: 2
keepalives_idle: 0
skip_airbyte:
type: postgres
host: localhost
port: 5432
user: postgres
pass: password
dbname: postgres
schema: public
threads: 2
keepalives_idle: 0
26 changes: 26 additions & 0 deletions examples/modern_data_stack_assets/mds_dbt/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "mds_dbt"
version: "1.0.0"
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: "mds_dbt"

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
analysis-paths: ["analyses"]
test-paths: ["tests"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"

models:
mds_dbt:
+materialized: "table"
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
select
date_trunc('d', oc.order_time::timestamp) as order_date,
sum(oc.order_value) as total_value,
count(*) as num_orders
from
{{ ref("orders_cleaned") }} oc
join
{{ ref("users_augmented") }} ua
on oc.user_id = ua.user_id
where not ua.is_bot
group by 1 order by 1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ source('postgres_replica', 'orders') }}
36 changes: 36 additions & 0 deletions examples/modern_data_stack_assets/mds_dbt/models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: 2

models:
- name: daily_order_summary
description: "Daily metrics for orders placed on this platform."
columns:
- name: order_date
description: "The UTC day for which these orders were aggregated."
data_type: "date"
- name: total_value
description: "The total value of all orders placed on this day."
data_type: "float"
- name: num_orders
description: "The total number of orders placed on this day."
data_type: "int"
- name: orders_cleaned
description: "Filtered version of the raw orders data."
columns:
- name: "user_id"
description: "Platform id of the user that placed this order."
data_type: "int"
- name: "order_time"
description: "The timestamp (in UTC) that this order was placed."
data_type: "timestamp"
- name: "order_value"
description: "The dollar amount that this order was placed for."
data_type: "float"
- name: users_augmented
description: "Raw users data augmented with backend data."
columns:
- name: "user_id"
description: "Platform id for this user."
data_type: "int"
- name: "is_spam"
description: "True if this user has been marked as a fraudulent account."
data_type: "bool"
9 changes: 9 additions & 0 deletions examples/modern_data_stack_assets/mds_dbt/models/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 2

sources:
- name: postgres_replica
database: postgres_replica
schema: public
tables:
- name: users
- name: orders
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ source('postgres_replica', 'users') }}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .assets import analytics_assets
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Any, Tuple

import numpy as np
import pandas as pd
from dagster_airbyte import airbyte_resource, build_airbyte_assets
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from scipy import optimize

from dagster import AssetGroup, Output, asset

from .constants import *
from .pandas_io_manager import pandas_io_manager

airbyte_assets = build_airbyte_assets(
connection_id=AIRBYTE_CONNECTION_ID, destination_tables=["orders", "users"]
)

dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_DIR, io_manager_key="pandas_io_manager"
)


@asset(compute_kind="python")
def order_forecast_model(daily_order_summary: pd.DataFrame) -> Any:
"""Model parameters that best fit the observed data"""
df = daily_order_summary
return tuple(
optimize.curve_fit(
f=model_func, xdata=df.order_date.astype(np.int64), ydata=df.num_orders, p0=[10, 100]
)[0]
)


@asset(compute_kind="python", io_manager_key="pandas_io_manager")
def predicted_orders(
daily_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
) -> pd.DataFrame:
"""Predicted orders for the next 30 days based on the fit paramters"""
a, b = order_forecast_model
start_date = daily_order_summary.order_date.max()
future_dates = pd.date_range(start=start_date, end=start_date + pd.DateOffset(days=30))
predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b)
return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data})


analytics_assets = AssetGroup(
airbyte_assets + dbt_assets + [order_forecast_model, predicted_orders],
resource_defs={
"airbyte": airbyte_resource.configured(AIRBYTE_CONFIG),
"dbt": dbt_cli_resource.configured(DBT_CONFIG),
"pandas_io_manager": pandas_io_manager.configured(PG_CONFIG),
},
).build_job("Assets")
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import numpy as np
from dagster_postgres.utils import get_conn_string

from dagster.utils import file_relative_path


def model_func(x, a, b):
return a * np.exp(b * (x / 10**18 - 1.6095))


AIRBYTE_CONNECTION_ID = "your_airbyte_connection_id"
AIRBYTE_CONFIG = {"host": "localhost", "port": "8000"}
DBT_PROJECT_DIR = file_relative_path(__file__, "../mds_dbt")
DBT_PROFILES_DIR = file_relative_path(__file__, "../mds_dbt/config")
DBT_CONFIG = {"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR}
PG_CONFIG = {
"con_string": get_conn_string(
username="postgres", password="password", hostname="localhost", db_name="postgres_replica"
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import random

import numpy as np
import pandas as pd
from dagster_postgres.utils import get_conn_string

START_DATE = pd.to_datetime("2021-01-01")
END_DATE = pd.to_datetime("2022-01-01")

N_USERS = 100
N_ORDERS = 10000


def random_dates(start, end):

start_u = start.value // 10**9
end_u = end.value // 10**9

dist = np.random.standard_exponential(size=N_ORDERS) / 10

clipped_flipped_dist = 1 - dist[dist <= 1]

return pd.to_datetime((clipped_flipped_dist * (end_u - start_u)) + start_u, unit="s")


con_string = get_conn_string(
username="postgres", password="password", hostname="localhost", db_name="postgres"
)

users = pd.DataFrame(
{"user_id": range(N_USERS), "is_bot": [random.choice([True, False]) for _ in range(N_USERS)]}
)

users.to_sql("users", con=con_string, if_exists="replace")

orders = pd.DataFrame(
{
"user_id": [random.randint(0, N_USERS) for _ in range(N_ORDERS)],
"order_time": random_dates(START_DATE, END_DATE),
"order_value": np.random.normal(loc=100.0, scale=15.0, size=N_ORDERS),
}
)

orders.to_sql("orders", con=con_string, if_exists="replace")
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Any, Tuple

import numpy as np
import pandas as pd
from scipy import optimize

from .constants import model_func


def order_forecast_model(daily_order_summary: pd.DataFrame) -> Tuple[Any, Any]:
"""Model parameters that best fit the observed data"""
df = daily_order_summary
return optimize.curve_fit(
f=model_func, xdata=df.order_date.astype(np.int64), ydata=df.num_orders, p0=[10, 100]
)[0]


def predicted_orders(
daily_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
) -> pd.DataFrame:
"""Predicted orders for the next 30 days based on the fit paramters"""
a, b = order_forecast_model
start_date = daily_order_summary.order_date.max()
future_dates = pd.date_range(start=start_date, end=start_date + pd.DateOffset(days=30))
predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b)
return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data})
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pandas as pd

from dagster import IOManager, check, io_manager


class PandasIOManager(IOManager):
"""Sample IOManager to handle loading the contents of tables as pandas DataFrames.
Does not handle cases where data is written to different schemas for different outputs, and
uses the name of the asset key as the table name.
"""

def __init__(self, con_string: str):
self._con = con_string

def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
# write df to table
obj.to_sql(name=context.asset_key.path[-1], con=self._con, if_exists="replace")
elif obj is None:
# dbt has already written the data to this table
pass
else:
raise check.CheckError(f"Unsupported object type {type(obj)} for PandasIOManager.")

def load_input(self, context) -> pd.DataFrame:
"""Load the contents of a table as a pandas DataFrame."""
model_name = context.upstream_output.asset_key.path[-1]
return pd.read_sql(f"SELECT * FROM {model_name}", con=self._con)


@io_manager(config_schema={"con_string": str})
def pandas_io_manager(context):
return PandasIOManager(context.resource_config["con_string"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def test_repo_loads():
# placeholder for future testing
assert True

0 comments on commit 42d2d72

Please sign in to comment.