Skip to content

Commit

Permalink
feat(dagster-dbt): use cached run id to fetch artifacts (#11744)
Browse files Browse the repository at this point in the history
### Summary & Motivation
Start threading through the cached run id when generating the software
defined assets in dbt Cloud.

### How I Tested These Changes
pytest, local
  • Loading branch information
rexledesma authored and alangenfeld committed Jan 19, 2023
1 parent 6c41bf4 commit 609b3e7
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
from ..utils import ASSET_RESOURCE_TYPES, result_to_events
from .resources import DbtCloudResource

DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR = "DBT_DAGSTER_COMPILE_RUN_ID"


class DbtCloudCacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(
Expand Down Expand Up @@ -127,35 +129,19 @@ def get_compile_filters(parsed_args: Namespace) -> List[str]:

return dbt_compile_options

def _get_dbt_nodes_and_dependencies(
self,
) -> Tuple[Mapping[str, Any], Mapping[str, FrozenSet[str]]]:
"""
For a given dbt Cloud job, fetch the latest run's dependency structure of executed nodes.
"""
# Fetch information about the job.
job = self._dbt_cloud.get_job(job_id=self._job_id)
self._project_id = job["project_id"]
self._has_generate_docs = job["generate_docs"]

# We constraint the kinds of dbt Cloud jobs that we support running.
#
# A simple constraint is that we only support jobs that run multiple steps,
# but it must contain one of either `dbt run` or `dbt build`.
#
# As a reminder, `dbt deps` is automatically run before the job's configured commands.
# And if the settings are enabled, `dbt docs generate` and `dbt source freshness` can
# automatically run after the job's configured commands.
#
# These commands that execute before and after the job's configured commands do not count
# towards the single command constraint.
self._job_commands = job["execute_steps"]
self._job_materialization_command_step = (
DbtCloudCacheableAssetsDefinition.get_job_materialization_command_step(
execute_steps=self._job_commands
)
def _get_cached_compile_dbt_cloud_job_run(self, compile_run_id: int) -> Tuple[int, int]:
compile_run = self._dbt_cloud.get_run(
run_id=compile_run_id, include_related=["trigger", "run_steps"]
)
compile_run_has_generate_docs = compile_run["trigger"]["generate_docs_override"]

compile_job_materialization_command_step = len(compile_run["run_steps"])
if compile_run_has_generate_docs:
compile_job_materialization_command_step -= 1

return compile_run_id, compile_job_materialization_command_step

def _compile_dbt_cloud_job(self, dbt_cloud_job: Mapping[str, Any]) -> Tuple[int, int]:
# Retrieve the filters options from the dbt Cloud job's materialization command.
#
# There are three filters: `--select`, `--exclude`, and `--selector`.
Expand All @@ -176,9 +162,10 @@ def _get_dbt_nodes_and_dependencies(
dbt_vars = json.loads(parsed_args.vars or "{}")
if dbt_vars:
raise DagsterDbtCloudJobInvariantViolationError(
f"The dbt Cloud job '{job['name']}' ({job['id']}) must not have variables defined "
"from `--vars` in its `dbt run` or `dbt build` command. Instead, declare the "
f"variables in the `dbt_project.yml` file. Received commands: {self._job_commands}."
f"The dbt Cloud job '{dbt_cloud_job['name']}' ({dbt_cloud_job['id']}) must not have"
" variables defined from `--vars` in its `dbt run` or `dbt build` command."
" Instead, declare the variables in the `dbt_project.yml` file. Received commands:"
f" {self._job_commands}."
)

if self._partitions_def and self._partition_key_to_vars_fn:
Expand Down Expand Up @@ -213,8 +200,55 @@ def _get_dbt_nodes_and_dependencies(
if self._has_generate_docs:
compile_job_materialization_command_step -= 1

# Fetch the compilation run's manifest and run results.
compile_run_id = compile_run_dbt_output.run_id
return compile_run_dbt_output.run_id, compile_job_materialization_command_step

def _get_dbt_nodes_and_dependencies(
self,
) -> Tuple[Mapping[str, Any], Mapping[str, FrozenSet[str]]]:
"""
For a given dbt Cloud job, fetch the latest run's dependency structure of executed nodes.
"""
# Fetch information about the job.
job = self._dbt_cloud.get_job(job_id=self._job_id)
self._project_id = job["project_id"]
self._has_generate_docs = job["generate_docs"]

# We constraint the kinds of dbt Cloud jobs that we support running.
#
# A simple constraint is that we only support jobs that run multiple steps,
# but it must contain one of either `dbt run` or `dbt build`.
#
# As a reminder, `dbt deps` is automatically run before the job's configured commands.
# And if the settings are enabled, `dbt docs generate` and `dbt source freshness` can
# automatically run after the job's configured commands.
#
# These commands that execute before and after the job's configured commands do not count
# towards the single command constraint.
self._job_commands = job["execute_steps"]
self._job_materialization_command_step = (
DbtCloudCacheableAssetsDefinition.get_job_materialization_command_step(
execute_steps=self._job_commands
)
)

# Determine whether to use a cached compile run. This should only be set up if the user is
# using a GitHub action along with their dbt project.
dbt_cloud_job_env_vars = self._dbt_cloud.get_job_environment_variables(
project_id=self._project_id, job_id=self._job_id
)
compile_run_id = (
dbt_cloud_job_env_vars.get(DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR, {})
.get("job", {})
.get("value")
)

compile_run_id, compile_job_materialization_command_step = (
# If a compile run is cached, then use it.
self._get_cached_compile_dbt_cloud_job_run(compile_run_id=int(compile_run_id))
if compile_run_id
# Otherwise, compile the dbt Cloud project in an ad-hoc manner.
else self._compile_dbt_cloud_job(dbt_cloud_job=job)
)

manifest_json = self._dbt_cloud.get_manifest(
run_id=compile_run_id, step=compile_job_materialization_command_step
Expand Down
12 changes: 9 additions & 3 deletions python_modules/libraries/dagster-dbt/dagster_dbt/cloud/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import typer

from dagster_dbt.cloud.asset_defs import DbtCloudCacheableAssetsDefinition
from dagster_dbt.cloud.asset_defs import (
DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR,
DbtCloudCacheableAssetsDefinition,
)
from dagster_dbt.cloud.resources import DbtCloudResource

app = typer.Typer()

DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR = "DBT_DAGSTER_COMPILE_RUN_ID"


@app.command()
def cache_compile_references(
Expand Down Expand Up @@ -51,6 +52,7 @@ def cache_compile_references(
job_id=job_id,
cause="Generating software-defined assets for Dagster.",
steps_override=[dbt_compile_command],
generate_docs_override=True,
)

# Cache the compile run as a reference in the dbt Cloud job's env var
Expand Down Expand Up @@ -83,3 +85,7 @@ def cache_compile_references(
@app.callback()
def callback() -> None:
pass


if __name__ == "__main__":
app()

0 comments on commit 609b3e7

Please sign in to comment.