Skip to content

Commit

Permalink
[dagster-dbt] Fix bug where a dbt invocation that did not successfull…
Browse files Browse the repository at this point in the history
…y start could emit materialization events. (#8293)
  • Loading branch information
OwenKephart committed Jun 9, 2022
1 parent 9fc1b11 commit 3b8c1e6
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 13 deletions.
27 changes: 15 additions & 12 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,23 +191,26 @@ def _dbt_nodes_to_assets(
)
def dbt_op(context):
dbt_output = None
try:
# in the case that we're running everything, opt for the cleaner selection string
if len(context.selected_output_names) == len(outs):
subselect = select
else:
# for each output that we want to emit, translate to a dbt select string by converting
# the out to it's corresponding fqn
subselect = [
".".join(out_name_to_node_info[out_name]["fqn"])
for out_name in context.selected_output_names
]

# clean up any run results from the last run
context.resources.dbt.remove_run_results_json()

# in the case that we're running everything, opt for the cleaner selection string
if len(context.selected_output_names) == len(outs):
subselect = select
else:
# for each output that we want to emit, translate to a dbt select string by converting
# the out to it's corresponding fqn
subselect = [
".".join(out_name_to_node_info[out_name]["fqn"])
for out_name in context.selected_output_names
]

try:
if use_build_command:
dbt_output = context.resources.dbt.build(select=subselect)
else:
dbt_output = context.resources.dbt.run(select=subselect)

finally:
# in the case that the project only partially runs successfully, still attempt to generate
# events for the parts that were successful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..dbt_resource import DbtResource
from .constants import CLI_COMMON_FLAGS_CONFIG_SCHEMA, CLI_COMMON_OPTIONS_CONFIG_SCHEMA
from .types import DbtCliOutput
from .utils import execute_cli, parse_manifest, parse_run_results
from .utils import execute_cli, parse_manifest, parse_run_results, remove_run_results


class DbtCliResource(DbtResource):
Expand Down Expand Up @@ -279,6 +279,14 @@ def get_run_results_json(self, **kwargs) -> Optional[Dict[str, Any]]:
target_path = kwargs.get("target_path", self._target_path)
return parse_run_results(project_dir, target_path)

def remove_run_results_json(self, **kwargs):
"""
Remove the run_results.json file from previous runs (if it exists).
"""
project_dir = kwargs.get("project_dir", self.default_flags["project-dir"])
target_path = kwargs.get("target_path", self._target_path)
remove_run_results(project_dir, target_path)

def get_manifest_json(self, **kwargs) -> Optional[Dict[str, Any]]:
"""
Get a parsed version of the manifest.json file for the relevant dbt project.
Expand Down
7 changes: 7 additions & 0 deletions python_modules/libraries/dagster-dbt/dagster_dbt/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ def parse_run_results(path: str, target_path: str = DEFAULT_DBT_TARGET_PATH) ->
raise DagsterDbtCliOutputsNotFoundError(path=run_results_path)


def remove_run_results(path: str, target_path: str = DEFAULT_DBT_TARGET_PATH):
"""Parses the `target/run_results.json` artifact that is produced by a dbt process."""
run_results_path = os.path.join(path, target_path, "run_results.json")
if os.path.exists(run_results_path):
os.remove(run_results_path)


def parse_manifest(path: str, target_path: str = DEFAULT_DBT_TARGET_PATH) -> Dict[str, Any]:
"""Parses the `target/manifest.json` artifact that is produced by a dbt process."""
manifest_path = os.path.join(path, target_path, "manifest.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,50 @@ def assert_assets_match_project(dbt_assets):
assert not dbt_assets[0].asset_deps[AssetKey(["sort_by_calories"])]


def test_fail_immediately(
dbt_seed, conn_string, test_project_dir, dbt_config_dir
): # pylint: disable=unused-argument
from dagster import build_init_resource_context

dbt_assets = load_assets_from_dbt_project(test_project_dir, dbt_config_dir)
good_dbt = dbt_cli_resource.configured(
{
"project_dir": test_project_dir,
"profiles_dir": dbt_config_dir,
}
)

# ensure that there will be a run results json
result = build_assets_job(
"test_job",
dbt_assets,
resource_defs={"dbt": good_dbt},
).execute_in_process()

assert good_dbt(build_init_resource_context()).get_run_results_json()

result = build_assets_job(
"test_job",
dbt_assets,
resource_defs={
"dbt": dbt_cli_resource.configured(
{
"project_dir": test_project_dir,
"profiles_dir": "BAD PROFILES DIR",
}
)
},
).execute_in_process(raise_on_error=False)

assert not result.success
materializations = [
event.event_specific_data.materialization
for event in result.events_for_node(dbt_assets[0].op.name)
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materializations) == 0


@pytest.mark.parametrize("use_build, fail_test", [(True, False), (True, True), (False, False)])
def test_basic(
dbt_seed, conn_string, test_project_dir, dbt_config_dir, use_build, fail_test
Expand Down

0 comments on commit 3b8c1e6

Please sign in to comment.