Skip to content

Commit

Permalink
[dagster-dbt] update dbt keys (#8228)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jun 8, 2022
1 parent 198053a commit 8e6b70f
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@
io_manager_key="warehouse_io_manager",
)

activity_analytics_assets_prod = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_PROD
).prefixed("activity_analytics") + AssetGroup(dbt_assets, resource_defs=RESOURCES_PROD)

activity_analytics_assets_staging = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_STAGING
).prefixed("activity_analytics") + AssetGroup(dbt_assets, resource_defs=RESOURCES_STAGING)

activity_analytics_assets_local = AssetGroup.from_package_module(
package_module=assets, resource_defs=RESOURCES_LOCAL
).prefixed("activity_analytics") + AssetGroup(dbt_assets, resource_defs=RESOURCES_LOCAL)
activity_analytics_assets_prod = (
AssetGroup.from_package_module(package_module=assets, resource_defs=RESOURCES_PROD)
+ AssetGroup(dbt_assets, resource_defs=RESOURCES_PROD)
).prefixed("activity_analytics")

activity_analytics_assets_staging = (
AssetGroup.from_package_module(package_module=assets, resource_defs=RESOURCES_STAGING)
+ AssetGroup(dbt_assets, resource_defs=RESOURCES_STAGING)
).prefixed("activity_analytics")

activity_analytics_assets_local = (
AssetGroup.from_package_module(package_module=assets, resource_defs=RESOURCES_LOCAL)
+ AssetGroup(dbt_assets, resource_defs=RESOURCES_LOCAL)
).prefixed("activity_analytics")


activity_analytics_assets_sensor_prod = make_hn_tables_updated_sensor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)


@asset(compute_kind="python", namespace="public")
@asset(compute_kind="python")
def order_forecast_model(daily_order_summary: pd.DataFrame) -> Any:
"""Model parameters that best fit the observed data"""
df = daily_order_summary
Expand All @@ -33,7 +33,7 @@ def order_forecast_model(daily_order_summary: pd.DataFrame) -> Any:
)


@asset(compute_kind="python", io_manager_key="pandas_io_manager", namespace="public")
@asset(compute_kind="python", io_manager_key="pandas_io_manager")
def predicted_orders(
daily_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
) -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
def test_repo_loads():
# placeholder for future testing
assert True
# placeholder for future tests
return True
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ def _get_node_name(node_info: Mapping[str, Any]):


def _get_node_asset_key(node_info):
if node_info.get("schema") is not None:
components = [node_info["schema"], node_info["name"]]
"""By default, a dbt node's key is the union of its model name and any schema configured on
the model itself.
"""
configured_schema = node_info["config"].get("schema")
if configured_schema is not None:
components = [configured_schema, node_info["name"]]
else:
components = [node_info["name"]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ clean-targets: # directories to be removed by `dbt clean`
# using the `{{ config(...) }}` macro.
models:
dagster_dbt_test_project:
subdir:
+schema: "subdir_schema"
materialized: view

seeds:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{ config(tags=["foo"]) }}
{{ config(tags=["foo"], schema="cold_schema") }}
SELECT *
FROM {{ ref('sort_by_calories') }}
WHERE type='C'

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from dagster import (
AssetGroup,
AssetIn,
AssetKey,
IOManager,
MetadataEntry,
Expand Down Expand Up @@ -99,22 +100,20 @@ def assert_assets_match_project(dbt_assets):
"sort_hot_cereals_by_calories",
"sort_cold_cereals_by_calories",
}
for model_name in [
"least_caloric",
for asset_name in [
"subdir_schema/least_caloric",
"sort_hot_cereals_by_calories",
"sort_cold_cereals_by_calories",
"cold_schema/sort_cold_cereals_by_calories",
]:
assert dbt_assets[0].asset_keys_by_output_name[model_name] == AssetKey(
["test-schema", model_name]
)
assert dbt_assets[0].asset_deps[AssetKey(["test-schema", model_name])] == {
AssetKey(["test-schema", "sort_by_calories"])
}
asset_key = AssetKey(asset_name.split("/"))
output_name = asset_key.path[-1]
assert dbt_assets[0].asset_keys_by_output_name[output_name] == asset_key
assert dbt_assets[0].asset_deps[asset_key] == {AssetKey(["sort_by_calories"])}

assert dbt_assets[0].asset_keys_by_output_name["sort_by_calories"] == AssetKey(
["test-schema", "sort_by_calories"]
["sort_by_calories"]
)
assert not dbt_assets[0].asset_deps[AssetKey(["test-schema", "sort_by_calories"])]
assert not dbt_assets[0].asset_deps[AssetKey(["sort_by_calories"])]


@pytest.mark.parametrize("use_build, fail_test", [(True, False), (True, True), (False, False)])
Expand Down Expand Up @@ -151,7 +150,7 @@ def test_basic(
if fail_test:
# the test will fail after the first model is completed, so others will not be emitted
assert len(materializations) == 1
assert materializations[0].asset_key == AssetKey(["test-schema", "sort_by_calories"])
assert materializations[0].asset_key == AssetKey(["sort_by_calories"])
else:
assert len(materializations) == 4
observations = [
Expand Down Expand Up @@ -324,16 +323,18 @@ def test_node_info_to_asset_key(
[
(
"*",
"sort_by_calories,sort_cold_cereals_by_calories,sort_hot_cereals_by_calories,least_caloric,hanger1,hanger2",
"sort_by_calories,cold_schema/sort_cold_cereals_by_calories,"
"sort_hot_cereals_by_calories,subdir_schema/least_caloric,hanger1,hanger2",
),
(
"test-schema/sort_by_calories+",
"sort_by_calories,least_caloric,sort_cold_cereals_by_calories,sort_hot_cereals_by_calories,hanger1",
"sort_by_calories+",
"sort_by_calories,subdir_schema/least_caloric,cold_schema/sort_cold_cereals_by_calories,"
"sort_hot_cereals_by_calories,hanger1",
),
("*test-schema/hanger2", "hanger2,least_caloric,sort_by_calories"),
("*hanger2", "hanger2,subdir_schema/least_caloric,sort_by_calories"),
(
["test-schema/sort_cold_cereals_by_calories", "test-schema/least_caloric"],
"sort_cold_cereals_by_calories,least_caloric",
["cold_schema/sort_cold_cereals_by_calories", "subdir_schema/least_caloric"],
"cold_schema/sort_cold_cereals_by_calories,subdir_schema/least_caloric",
),
],
)
Expand All @@ -348,11 +349,11 @@ def test_subsetting(

dbt_assets = load_assets_from_dbt_project(test_project_dir, dbt_config_dir)

@asset(namespace="test-schema")
@asset
def hanger1(sort_by_calories):
return None

@asset(namespace="test-schema")
@asset(ins={"least_caloric": AssetIn(namespace="subdir_schema")})
def hanger2(least_caloric):
return None

Expand All @@ -375,7 +376,7 @@ def hanger2(least_caloric):
for event in result.all_events
if event.event_type_value == "ASSET_MATERIALIZATION"
}
expected_keys = {AssetKey(["test-schema", name]) for name in expected_asset_names.split(",")}
expected_keys = {AssetKey(name.split("/")) for name in expected_asset_names.split(",")}
assert all_keys == expected_keys


Expand All @@ -387,31 +388,31 @@ def hanger2(least_caloric):
"*",
{
"sort_by_calories",
"sort_cold_cereals_by_calories",
"least_caloric",
"cold_schema/sort_cold_cereals_by_calories",
"subdir_schema/least_caloric",
"sort_hot_cereals_by_calories",
},
),
(
"+least_caloric",
{"sort_by_calories", "least_caloric"},
{"sort_by_calories", "subdir_schema/least_caloric"},
),
(
"sort_by_calories least_caloric",
{"sort_by_calories", "least_caloric"},
{"sort_by_calories", "subdir_schema/least_caloric"},
),
(
"tag:bar+",
{
"sort_by_calories",
"sort_cold_cereals_by_calories",
"least_caloric",
"cold_schema/sort_cold_cereals_by_calories",
"subdir_schema/least_caloric",
"sort_hot_cereals_by_calories",
},
),
(
"tag:foo",
{"sort_by_calories", "sort_cold_cereals_by_calories"},
{"sort_by_calories", "cold_schema/sort_cold_cereals_by_calories"},
),
(
"tag:foo,tag:bar",
Expand Down Expand Up @@ -439,7 +440,7 @@ def test_dbt_selects(
project_dir=test_project_dir, profiles_dir=dbt_config_dir, select=select
)

expected_asset_keys = {AssetKey(["test-schema", key]) for key in expected_asset_names}
expected_asset_keys = {AssetKey(key.split("/")) for key in expected_asset_names}
assert dbt_assets[0].asset_keys == expected_asset_keys

result = (
Expand Down Expand Up @@ -489,14 +490,16 @@ def handle_output(self, context, obj):
# handling dbt output
if obj is None:
return
schema, table = context.asset_key.path
table = context.asset_key.path[-1]
try:
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
cur.execute(
f'CREATE TABLE IF NOT EXISTS "{schema}"."{table}" (user_id integer, is_bot bool)'
f'CREATE TABLE IF NOT EXISTS "test-python-schema"."{table}" (user_id integer, is_bot bool)'
)
cur.executemany(
f'INSERT INTO "test-python-schema"."{table}"' + " VALUES(%s,%s)", obj
)
cur.executemany(f'INSERT INTO "{schema}"."{table}"' + " VALUES(%s,%s)", obj)
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
Expand All @@ -506,13 +509,13 @@ def handle_output(self, context, obj):
conn.close()

def load_input(self, context):
schema, table = context.asset_key.path
table = context.asset_key.path[-1]
result = None
conn = None
try:
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
cur.execute(f'SELECT * FROM "{schema}"."{table}"')
cur.execute(f'SELECT * FROM "test-python-schema"."{table}"')
result = cur.fetchall()
except (Exception, psycopg2.DatabaseError) as error:
raise error
Expand All @@ -523,7 +526,7 @@ def load_input(self, context):

return TestIOManager()

@asset(namespace="test-python-schema")
@asset
def bot_labeled_users(cleaned_users):
# super advanced bot labeling algorithm
return [(uid, uid % 5 == 0) for _, uid in cleaned_users]
Expand Down Expand Up @@ -553,5 +556,5 @@ def bot_labeled_users(cleaned_users):
"bot_labeled_users",
"bot_labeled_events",
]
expected_keys = {AssetKey(["test-python-schema", name]) for name in expected_asset_names}
expected_keys = {AssetKey([name]) for name in expected_asset_names}
assert all_keys == expected_keys

0 comments on commit 8e6b70f

Please sign in to comment.