Skip to content

Commit

Permalink
[3/n] Subsetting Stack: dbt assets can be subset (#7798)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 25, 2022
1 parent 9122968 commit 407e698
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 33 deletions.
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
required_resource_keys.add(key)

op = _Op(
name="__".join(out_asset_key.path),
name="__".join(out_asset_key.path).replace("-", "_"),
description=self.description,
ins=dict(asset_ins.values()),
out=out,
Expand Down Expand Up @@ -431,12 +431,12 @@ def build_asset_ins(
)

ins_by_asset_key[asset_key] = (
input_name,
input_name.replace("-", "_"),
In(metadata=metadata, root_manager_key="root_manager"),
)

for asset_key in non_argument_deps:
stringified_asset_key = "_".join(asset_key.path)
stringified_asset_key = "_".join(asset_key.path).replace("-", "_")
# mypy doesn't realize that Nothing is a valid type here
ins_by_asset_key[asset_key] = (stringified_asset_key, In(cast(type, Nothing)))

Expand Down
87 changes: 62 additions & 25 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
from dagster import (
AssetKey,
AssetMaterialization,
AssetsDefinition,
In,
MetadataValue,
Nothing,
Out,
Output,
SolidExecutionContext,
TableColumn,
TableSchema,
)
from dagster import _check as check
from dagster import get_dagster_logger
from dagster.core.asset_defs import AssetsDefinition, multi_asset
from dagster import get_dagster_logger, op
from dagster.core.definitions.metadata import RawMetadataValue


Expand Down Expand Up @@ -63,6 +65,15 @@ def _get_node_asset_key(node_info):
return AssetKey(components)


def _get_node_description(node_info):
code_block = textwrap.indent(node_info["raw_sql"], " ")
description_sections = [
node_info["description"],
f"#### Raw SQL:\n```\n{code_block}\n```",
]
return "\n\n".join(filter(None, description_sections))


def _dbt_nodes_to_assets(
dbt_nodes: Mapping[str, Any],
select: str,
Expand All @@ -74,64 +85,79 @@ def _dbt_nodes_to_assets(
node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey] = _get_node_asset_key,
use_build_command: bool = False,
) -> AssetsDefinition:

outs: Dict[str, Out] = {}
sources: Set[AssetKey] = set()
asset_ins: Dict[AssetKey, Tuple[str, In]] = {}

asset_deps: Dict[AssetKey, Set[AssetKey]] = {}

out_name_to_node_info: Dict[str, Mapping[str, Any]] = {}
internal_asset_deps: Dict[str, Set[AssetKey]] = {}

package_name = None
for unique_id in selected_unique_ids:
asset_deps = set()
cur_asset_deps = set()
node_info = dbt_nodes[unique_id]
package_name = node_info.get("package_name", package_name)

for dep_name in node_info["depends_on"]["nodes"]:
dep_type = dbt_nodes[dep_name]["resource_type"]

# ignore seeds/snapshots
if dep_type not in ["source", "model"]:
continue
dep_asset_key = node_info_to_asset_key(dbt_nodes[dep_name])

# if it's a source, it will be used as an input to this multi-asset
if dep_type == "source":
sources.add(dep_asset_key)
asset_ins[dep_asset_key] = (dep_name.replace(".", "_"), In(Nothing))

# regardless of type, list this as a dependency for the current asset
asset_deps.add(dep_asset_key)
code_block = textwrap.indent(node_info["raw_sql"], " ")
description_sections = [
node_info["description"],
f"#### Raw SQL:\n```\n{code_block}\n```",
]
description = "\n\n".join(filter(None, description_sections))
cur_asset_deps.add(dep_asset_key)

# generate the Out that corresponds to this model
node_name = node_info["name"]
outs[node_name] = Out(
asset_key=node_info_to_asset_key(node_info),
description=description,
description=_get_node_description(node_info),
io_manager_key=io_manager_key,
metadata=_columns_to_metadata(node_info["columns"]),
is_required=False,
)
out_name_to_node_info[node_name] = node_info
internal_asset_deps[node_name] = asset_deps

# set the asset dependencies for this asset
asset_deps[node_info_to_asset_key(node_info)] = cur_asset_deps

# prevent op name collisions between multiple dbt multi-assets
op_name = f"run_dbt_{package_name}"
if select != "*":
op_name += "_" + hashlib.md5(select.encode()).hexdigest()[-5:]

@multi_asset(
@op(
name=op_name,
non_argument_deps=sources,
outs=outs,
tags={"kind": "dbt"},
ins=dict(asset_ins.values()),
out=outs,
required_resource_keys={"dbt"},
compute_kind="dbt",
internal_asset_deps=internal_asset_deps,
)
def _dbt_project_multi_assset(context):
def dbt_op(context):
dbt_output = None
try:
# in the case that we're running everything, opt for the cleaner selection string
if len(context.selected_output_names) == len(outs):
subselect = select
else:
# for each output that we want to emit, translate to a dbt select string by converting
# the out to it's corresponding fqn
subselect = [
".".join(out_name_to_node_info[out_name]["fqn"])
for out_name in context.selected_output_names
]

if use_build_command:
dbt_output = context.resources.dbt.build(select=select)
dbt_output = context.resources.dbt.build(select=subselect)
else:
dbt_output = context.resources.dbt.run(select=select)
dbt_output = context.resources.dbt.run(select=subselect)

finally:
# in the case that the project only partially runs successfully, still attempt to generate
# events for the parts that were successful
Expand Down Expand Up @@ -165,7 +191,18 @@ def _dbt_project_multi_assset(context):
else:
yield event

return _dbt_project_multi_assset
return AssetsDefinition(
asset_keys_by_input_name={
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
},
asset_keys_by_output_name={
output_name: node_info_to_asset_key(out_name_to_node_info[output_name])
for output_name in outs.keys()
},
node_def=dbt_op,
can_subset=True,
asset_deps=asset_deps,
)


def _columns_to_metadata(columns: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,11 @@ def dbt_seed(
prepare_dbt_cli, dbt_executable, dbt_config_dir
): # pylint: disable=unused-argument, redefined-outer-name
subprocess.run([dbt_executable, "seed", "--profiles-dir", dbt_config_dir], check=True)


@pytest.fixture(scope="class")
def dbt_build(
prepare_dbt_cli, dbt_executable, dbt_config_dir
): # pylint: disable=unused-argument, redefined-outer-name
subprocess.run([dbt_executable, "seed", "--profiles-dir", dbt_config_dir], check=True)
subprocess.run([dbt_executable, "run", "--profiles-dir", dbt_config_dir], check=True)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dagster_dbt.errors import DagsterDbtCliFatalRuntimeError
from dagster_dbt.types import DbtOutput

from dagster import AssetGroup, AssetKey, MetadataEntry, ResourceDefinition, repository
from dagster import AssetGroup, AssetKey, MetadataEntry, ResourceDefinition, asset, repository
from dagster.core.asset_defs import build_assets_job
from dagster.utils import file_relative_path

Expand All @@ -25,6 +25,7 @@ def test_load_from_manifest_json():
assert_assets_match_project(dbt_assets)

dbt = MagicMock()
dbt.get_run_results_json.return_value = run_results_json
dbt.run.return_value = DbtOutput(run_results_json)
dbt.build.return_value = DbtOutput(run_results_json)
dbt.get_manifest_json.return_value = manifest_json
Expand Down Expand Up @@ -93,14 +94,16 @@ def assert_assets_match_project(dbt_assets):
"sort_hot_cereals_by_calories",
"sort_cold_cereals_by_calories",
]:
out_def = assets_op.output_dict.get(model_name)
assert out_def.hardcoded_asset_key == AssetKey(["test-schema", model_name])
assert dbt_assets[0].asset_keys_by_output_name[model_name] == AssetKey(
["test-schema", model_name]
)
assert dbt_assets[0].asset_deps[AssetKey(["test-schema", model_name])] == {
AssetKey(["test-schema", "sort_by_calories"])
}

root_out_def = assets_op.output_dict.get("sort_by_calories")
assert root_out_def.hardcoded_asset_key == AssetKey(["test-schema", "sort_by_calories"])
assert dbt_assets[0].asset_keys_by_output_name["sort_by_calories"] == AssetKey(
["test-schema", "sort_by_calories"]
)
assert not dbt_assets[0].asset_deps[AssetKey(["test-schema", "sort_by_calories"])]


Expand Down Expand Up @@ -304,3 +307,63 @@ def test_node_info_to_asset_key(
assert len(observations) == 17
else:
assert len(observations) == 0


@pytest.mark.parametrize(
"job_selection,expected_asset_names",
[
(
"*",
"sort_by_calories,sort_cold_cereals_by_calories,sort_hot_cereals_by_calories,least_caloric,hanger1,hanger2",
),
(
"test-schema>sort_by_calories+",
"sort_by_calories,least_caloric,sort_cold_cereals_by_calories,sort_hot_cereals_by_calories,hanger1",
),
("*test-schema>hanger2", "hanger2,least_caloric,sort_by_calories"),
(
["test-schema>sort_cold_cereals_by_calories", "test-schema>least_caloric"],
"sort_cold_cereals_by_calories,least_caloric",
),
],
)
def test_subsetting(
dbt_build,
conn_string,
test_project_dir,
dbt_config_dir,
job_selection,
expected_asset_names,
): # pylint: disable=unused-argument

dbt_assets = load_assets_from_dbt_project(test_project_dir, dbt_config_dir)

@asset(namespace="test-schema")
def hanger1(sort_by_calories):
return None

@asset(namespace="test-schema")
def hanger2(least_caloric):
return None

result = (
AssetGroup(
dbt_assets + [hanger1, hanger2],
resource_defs={
"dbt": dbt_cli_resource.configured(
{"project_dir": test_project_dir, "profiles_dir": dbt_config_dir}
)
},
)
.build_job(name="dbt_job", selection=job_selection)
.execute_in_process()
)

assert result.success
all_keys = {
event.event_specific_data.materialization.asset_key
for event in result.all_events
if event.event_type_value == "ASSET_MATERIALIZATION"
}
expected_keys = {AssetKey(["test-schema", name]) for name in expected_asset_names.split(",")}
assert all_keys == expected_keys

0 comments on commit 407e698

Please sign in to comment.