Skip to content

Commit

Permalink
[dagster-dbt] fix issues with dbt asset integration (#6777)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Feb 24, 2022
1 parent a360bbd commit 5cec1c8
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 44 deletions.
67 changes: 45 additions & 22 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
import textwrap
from typing import AbstractSet, Any, Callable, Dict, Mapping, Optional, Sequence, Set, Tuple

from dagster import AssetKey, Out, Output, SolidExecutionContext, check, get_dagster_logger
from dagster import (
AssetKey,
Out,
Output,
SolidExecutionContext,
TableColumn,
TableSchema,
check,
get_dagster_logger,
)
from dagster.core.asset_defs import AssetsDefinition, multi_asset
from dagster_dbt.cli.types import DbtCliOutput
from dagster_dbt.cli.utils import execute_cli
Expand Down Expand Up @@ -46,7 +55,7 @@ def _get_node_asset_key(node_info):
def _dbt_nodes_to_assets(
dbt_nodes: Mapping[str, Any],
select: str,
selected_unique_ids: AbstractSet[str] = None,
selected_unique_ids: AbstractSet[str],
runtime_metadata_fn: Optional[
Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]
] = None,
Expand All @@ -57,22 +66,24 @@ def _dbt_nodes_to_assets(
sources: Set[AssetKey] = set()
out_name_to_node_info: Dict[str, Mapping[str, Any]] = {}
internal_asset_deps: Dict[str, Set[AssetKey]] = {}
for unique_id, node_info in dbt_nodes.items():
if unique_id not in selected_unique_ids:
continue
for unique_id in selected_unique_ids:
asset_deps = set()
node_info = dbt_nodes[unique_id]
for dep_name in node_info["depends_on"]["nodes"]:
dep_type = dbt_nodes[dep_name]["resource_type"]
# ignore seeds/snapshots
if dep_type not in ["source", "model"]:
continue
dep_asset_key = node_info_to_asset_key(dbt_nodes[dep_name])

# if it's a source, it will be used as an input to this multi-asset
if dep_type == "source":
sources.add(dep_asset_key)
# regardless of type, list this as a dependency for the current asset
asset_deps.add(dep_asset_key)
code_block = textwrap.indent(node_info["raw_sql"], " ")
description_sections = [
node_info["description"],
"#### Columns:\n" + _columns_to_markdown(node_info["columns"])
if len(node_info["columns"]) > 0
else None,
f"#### Raw SQL:\n```\n{code_block}\n```",
]
description = "\n\n".join(filter(None, description_sections))
Expand All @@ -83,6 +94,7 @@ def _dbt_nodes_to_assets(
asset_key=node_info_to_asset_key(node_info),
description=description,
io_manager_key=io_manager_key,
metadata=_columns_to_metadata(node_info["columns"]),
)
out_name_to_node_info[node_name] = node_info
internal_asset_deps[node_name] = asset_deps
Expand Down Expand Up @@ -116,15 +128,22 @@ def _dbt_project_multi_assset(context):
return _dbt_project_multi_assset


def _columns_to_markdown(columns: Mapping[str, Any]) -> str:
def _columns_to_metadata(columns: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
return (
textwrap.dedent(
"""
| Name | Description |
| - | - |
"""
)
+ "\n".join([f"| {name} | {metadata['description']}" for name, metadata in columns.items()])
{
"schema": TableSchema(
columns=[
TableColumn(
name=name,
type=metadata.get("data_type") or "?",
description=metadata.get("description"),
)
for name, metadata in columns.items()
]
)
}
if len(columns) > 0
else None
)


Expand Down Expand Up @@ -221,17 +240,21 @@ def load_assets_from_dbt_manifest(
"""
check.dict_param(manifest_json, "manifest_json", key_type=str)
dbt_nodes = {**manifest_json["nodes"], **manifest_json["sources"]}

def _unique_id_to_selector(uid):
# take the fully-qualified node name and use it to select the model
return ".".join(dbt_nodes[uid]["fqn"])

select = (
"*"
if selected_unique_ids is None
else " ".join(_unique_id_to_selector(uid) for uid in selected_unique_ids)
)
selected_unique_ids = selected_unique_ids or set(
unique_id
for unique_id, node_info in dbt_nodes.items()
if node_info["resource_type"] == "model"
)
# create a selection string by converting the unique ids to model names
select = (
"*"
if selected_unique_ids is None
else " ".join((uid.split(".")[-1] for uid in selected_unique_ids))
)
return [
_dbt_nodes_to_assets(
dbt_nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def my_dbt_solid(context):

context = get_dbt_solid_context(test_project_dir, dbt_config_dir)
dbt_result = my_dbt_solid(context)
assert len(dbt_result.raw_output.split("\n\n")) == 20
assert len(dbt_result.raw_output.split("\n\n")) == 22


def test_ls_resource_type(
Expand All @@ -63,7 +63,7 @@ def my_dbt_solid(context):

context = get_dbt_solid_context(test_project_dir, dbt_config_dir)
dbt_result = my_dbt_solid(context)
assert len(dbt_result.raw_output.split("\n\n")) == 4
assert len(dbt_result.raw_output.split("\n\n")) == 6


def test_test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ def test_dbt_cli_with_unset_env_var_in_profile(
execute_solid(test_solid)

failure: DagsterDbtCliFatalRuntimeError = exc.value
assert "Env var required but not provided:" in failure.metadata_entries[1].entry_data.text
expected_str = "Env var required but not provided:"
assert (
expected_str in str(failure.metadata_entries[0].entry_data.data)
or expected_str in failure.metadata_entries[1].entry_data.text
)

def test_dbt_cli_run(
self, dbt_seed, test_project_dir, dbt_config_dir
Expand Down Expand Up @@ -145,7 +149,7 @@ def test_dbt_cli_run_operation(
result = execute_solid(test_solid)
assert result.success
assert any(
"Log macro: <<test succeded!>>" in log["message"]
"Log macro: <<test succeded!>>" in log.get("message", log.get("msg", []))
for log in result.output_value("dbt_cli_output").logs
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'dagster_dbt_test_project'
version: '1.0.0'
name: "dagster_dbt_test_project"
version: "1.0.0"
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'dagster_dbt_test_project'
profile: "dagster_dbt_test_project"

# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
Expand All @@ -19,11 +18,10 @@ data-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "{{ env_var('DBT_TARGET_PATH') }}" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "{{ env_var('DBT_TARGET_PATH') }}"
- "dbt_modules"

target-path: "{{ env_var('DBT_TARGET_PATH') }}" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "{{ env_var('DBT_TARGET_PATH') }}"
- "dbt_modules"

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
Expand All @@ -36,4 +34,4 @@ models:
materialized: view

seeds:
quote_columns: false
quote_columns: false
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def dbt_rpc_server(
): # pylint: disable=unused-argument, redefined-outer-name
proc = subprocess.Popen(
[
dbt_executable,
"rpc",
"dbt-rpc",
"serve",
"--host",
TEST_HOSTNAME,
"--port",
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
from unittest.mock import MagicMock

import pytest
from dagster import AssetKey, MetadataEntry, ResourceDefinition
from dagster.core.asset_defs import build_assets_job
from dagster.core.asset_defs.decorators import ASSET_DEPENDENCY_METADATA_KEY
from dagster.utils import file_relative_path
from dagster_dbt import dbt_cli_resource
from dagster_dbt.asset_defs import load_assets_from_dbt_manifest, load_assets_from_dbt_project
from dagster_dbt.errors import DagsterDbtCliFatalRuntimeError
from dagster_dbt.types import DbtOutput


Expand Down Expand Up @@ -126,7 +128,7 @@ def test_select_from_project(
): # pylint: disable=unused-argument

dbt_assets = load_assets_from_dbt_project(
test_project_dir, dbt_config_dir, select="sort_by_calories"
test_project_dir, dbt_config_dir, select="sort_by_calories subdir.least_caloric"
)

result = build_assets_job(
Expand All @@ -145,7 +147,12 @@ def test_select_from_project(
for event in result.events_for_node(dbt_assets[0].op.name)
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materializations) == 1
assert len(materializations) == 2


def test_dbt_ls_fail_fast():
with pytest.raises(DagsterDbtCliFatalRuntimeError):
load_assets_from_dbt_project("bad_project_dir", "bad_config_dir")


def test_select_from_manifest(
Expand All @@ -156,7 +163,11 @@ def test_select_from_manifest(
with open(manifest_path, "r") as f:
manifest_json = json.load(f)
dbt_assets = load_assets_from_dbt_manifest(
manifest_json, selected_unique_ids={"model.dagster_dbt_test_project.sort_by_calories"}
manifest_json,
selected_unique_ids={
"model.dagster_dbt_test_project.sort_by_calories",
"model.dagster_dbt_test_project.least_caloric",
},
)

result = build_assets_job(
Expand All @@ -175,7 +186,7 @@ def test_select_from_manifest(
for event in result.events_for_node(dbt_assets[0].op.name)
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materializations) == 1
assert len(materializations) == 2


def test_node_info_to_asset_key(
Expand Down
4 changes: 3 additions & 1 deletion python_modules/libraries/dagster-dbt/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def get_version() -> str:
"test": [
# https://github.com/dagster-io/dagster/issues/4167
"Jinja2<3.0",
"dbt>=0.17.0",
"dbt-core",
"dbt-rpc",
"dbt-postgres",
"matplotlib",
]
},
Expand Down

0 comments on commit 5cec1c8

Please sign in to comment.