Skip to content

Commit

Permalink
RFC: include schema in default dbt asset keys (#7645)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 4, 2022
1 parent ed04dab commit fbb1abb
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from pandas import DataFrame

from dagster import asset
from dagster import AssetIn, asset


@asset
@asset(ins={"activity_daily_stats": AssetIn(namespace="hackernews")})
def activity_forecast(activity_daily_stats: DataFrame) -> DataFrame:
return activity_daily_stats.head(100)
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def make_activity_stats_job(asset_group: AssetGroup) -> JobDefinition:
return asset_group.build_job(
name="activity_stats",
selection=[
"comment_daily_stats",
"story_daily_stats",
"activity_daily_stats",
"hackernews>comment_daily_stats",
"hackernews>story_daily_stats",
"hackernews>activity_daily_stats",
"activity_forecast",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def _get_node_name(node_info: Mapping[str, Any]):


def _get_node_asset_key(node_info):
return AssetKey(node_info["name"])
if node_info.get("schema") is not None:
components = [node_info["schema"], node_info["name"]]
else:
components = [node_info["name"]]

return AssetKey(components)


def _dbt_nodes_to_assets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def runtime_metadata_fn(context, node_info):
assert len(materializations) == 4
assert materializations[0].metadata_entries == [
MetadataEntry("op_name", value=dbt_assets[0].op.name),
MetadataEntry("dbt_model", value=materializations[0].asset_key.path[0]),
MetadataEntry("dbt_model", value=materializations[0].asset_key.path[-1]),
]


Expand All @@ -90,12 +90,14 @@ def assert_assets_match_project(dbt_assets):
"sort_cold_cereals_by_calories",
]:
out_def = assets_op.output_dict.get(model_name)
assert out_def.hardcoded_asset_key == AssetKey([model_name])
assert dbt_assets[0].asset_deps[AssetKey([model_name])] == {AssetKey("sort_by_calories")}
assert out_def.hardcoded_asset_key == AssetKey(["test-schema", model_name])
assert dbt_assets[0].asset_deps[AssetKey(["test-schema", model_name])] == {
AssetKey(["test-schema", "sort_by_calories"])
}

root_out_def = assets_op.output_dict.get("sort_by_calories")
assert root_out_def.hardcoded_asset_key == AssetKey(["sort_by_calories"])
assert not dbt_assets[0].asset_deps[AssetKey(["sort_by_calories"])]
assert root_out_def.hardcoded_asset_key == AssetKey(["test-schema", "sort_by_calories"])
assert not dbt_assets[0].asset_deps[AssetKey(["test-schema", "sort_by_calories"])]


def test_basic(
Expand Down

0 comments on commit fbb1abb

Please sign in to comment.