Skip to content

Commit

Permalink
feat(dbt-cloud): cache compile run id as env var for job (#11691)
Browse files Browse the repository at this point in the history
### Summary & Motivation
When generating software-defined assets from a dbt Cloud project, most
of the latency comes from compiling the project. Rather than compiling
the dbt project in an ad-hoc manner, only compile it when either:

- the project changes (via git commit), or when
- the dbt Cloud job commands change

If one of those conditions are met, then compile the project. And save
the reference/overwrite the reference to the compilation.

### How I Tested These Changes
pytest, local
  • Loading branch information
rexledesma committed Jan 18, 2023
1 parent 48bb79a commit 6d1c659
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 77 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import shlex
from argparse import Namespace
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -90,6 +91,42 @@ def build_definitions(
{"dbt_cloud": self._dbt_cloud_resource_def},
)

@staticmethod
def parse_dbt_command(dbt_command: str) -> Namespace:
return dbt_parse_args(args=shlex.split(dbt_command)[1:])

@staticmethod
def get_job_materialization_command_step(execute_steps: List[str]) -> int:
materialization_command_filter = [
DbtCloudCacheableAssetsDefinition.parse_dbt_command(command).which in ["run", "build"]
for command in execute_steps
]

if sum(materialization_command_filter) != 1:
raise DagsterDbtCloudJobInvariantViolationError(
"The dbt Cloud job must have a single `dbt run` or `dbt build` in its commands. "
f"Received commands: {execute_steps}."
)

return materialization_command_filter.index(True)

@staticmethod
def get_compile_filters(parsed_args: Namespace) -> List[str]:
dbt_compile_options: List[str] = []

selected_models = parsed_args.select or []
if selected_models:
dbt_compile_options.append(f"--select {' '.join(selected_models)}")

excluded_models = parsed_args.exclude or []
if excluded_models:
dbt_compile_options.append(f"--exclude {' '.join(excluded_models)}")

if parsed_args.selector_name:
dbt_compile_options.append(f"--selector {parsed_args.selector_name}")

return dbt_compile_options

def _get_dbt_nodes_and_dependencies(
self,
) -> Tuple[Mapping[str, Any], Mapping[str, FrozenSet[str]]]:
Expand All @@ -113,35 +150,20 @@ def _get_dbt_nodes_and_dependencies(
# These commands that execute before and after the job's configured commands do not count
# towards the single command constraint.
self._job_commands = job["execute_steps"]
materialization_command_filter = [
dbt_parse_args(shlex.split(command)[1:]).which in ["run", "build"]
for command in self._job_commands
]
if sum(materialization_command_filter) != 1:
raise DagsterDbtCloudJobInvariantViolationError(
f"The dbt Cloud job '{job['name']}' ({job['id']}) must have a single `dbt run` "
f"or `dbt build` in its commands. Received commands: {self._job_commands}."
self._job_materialization_command_step = (
DbtCloudCacheableAssetsDefinition.get_job_materialization_command_step(
execute_steps=self._job_commands
)

self._job_materialization_command_step = materialization_command_filter.index(True)
)

# Retrieve the filters options from the dbt Cloud job's materialization command.
#
# There are three filters: `--select`, `--exclude`, and `--selector`.
materialization_command = self._job_commands[self._job_materialization_command_step]
parsed_args = dbt_parse_args(args=shlex.split(materialization_command)[1:])
dbt_compile_options: List[str] = []

selected_models = parsed_args.select or []
if selected_models:
dbt_compile_options.append(f"--select {' '.join(selected_models)}")

excluded_models = parsed_args.exclude or []
if excluded_models:
dbt_compile_options.append(f"--exclude {' '.join(excluded_models)}")

if parsed_args.selector_name:
dbt_compile_options.append(f"--selector {parsed_args.selector_name}")
parsed_args = DbtCloudCacheableAssetsDefinition.parse_dbt_command(materialization_command)
dbt_compile_options = DbtCloudCacheableAssetsDefinition.get_compile_filters(
parsed_args=parsed_args
)

# Add the partition variable as a variable to the dbt Cloud job command.
#
Expand Down Expand Up @@ -174,11 +196,11 @@ def _get_dbt_nodes_and_dependencies(
# By always doing a compile step, we can always get the latest dependency structure.
# This incurs some latency, but at least it doesn't run through the entire materialization
# process.
dbt_compile_command = f"dbt compile {' '.join(dbt_compile_options)}"
compile_run_dbt_output = self._dbt_cloud.run_job_and_poll(
job_id=self._job_id,
cause="Generating software-defined assets for Dagster.",
# Pass the filters from the run/build step to the compile step.
steps_override=[f"dbt compile {' '.join(dbt_compile_options)}"],
steps_override=[dbt_compile_command],
)

# Target the compile execution step when retrieving run artifacts, rather than assuming
Expand Down
85 changes: 85 additions & 0 deletions python_modules/libraries/dagster-dbt/dagster_dbt/cloud/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from typing import List

import typer

from dagster_dbt.cloud.asset_defs import DbtCloudCacheableAssetsDefinition
from dagster_dbt.cloud.resources import DbtCloudResource

app = typer.Typer()

DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR = "DBT_DAGSTER_COMPILE_RUN_ID"


@app.command()
def cache_compile_references(
auth_token: str = typer.Argument(..., envvar="DBT_CLOUD_API_KEY"),
account_id: int = typer.Argument(..., envvar="DBT_CLOUD_ACCOUNT_ID"),
project_id: int = typer.Argument(..., envvar="DBT_CLOUD_PROJECT_ID"),
) -> None:
"""
Cache the latest dbt cloud compile run id for a given project.
"""
dbt_cloud_resource = DbtCloudResource(
auth_token=auth_token, account_id=account_id, disable_schedule_on_trigger=False
)

# List the jobs from the project
dbt_cloud_jobs = dbt_cloud_resource.list_jobs(project_id=project_id)

# Compile each job with an override
for dbt_cloud_job in dbt_cloud_jobs:
job_id: int = dbt_cloud_job["id"]
job_commands: List[str] = dbt_cloud_job["execute_steps"]

# Retrieve the filters for the compile override step
job_materialization_command_step = (
DbtCloudCacheableAssetsDefinition.get_job_materialization_command_step(
execute_steps=job_commands
)
)
dbt_materialization_command = job_commands[job_materialization_command_step]
parsed_args = DbtCloudCacheableAssetsDefinition.parse_dbt_command(
dbt_materialization_command
)
dbt_compile_options: List[str] = DbtCloudCacheableAssetsDefinition.get_compile_filters(
parsed_args=parsed_args
)
dbt_compile_command = f"dbt compile {' '.join(dbt_compile_options)}"

# Run the compile command
dbt_cloud_compile_run = dbt_cloud_resource.run_job(
job_id=job_id,
cause="Generating software-defined assets for Dagster.",
steps_override=[dbt_compile_command],
)

# Cache the compile run as a reference in the dbt Cloud job's env var
dbt_cloud_compile_run_id = str(dbt_cloud_compile_run["id"])
dbt_cloud_job_env_vars = dbt_cloud_resource.get_job_environment_variables(
project_id=project_id, job_id=job_id
)
compile_run_environment_variable_id = dbt_cloud_job_env_vars[
DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR
]["job"]["id"]

typer.echo(
f"Updating the value of environment variable `{DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR}`"
f" with id `{compile_run_environment_variable_id}` for job id `{job_id}`. Setting new"
f" value to `{dbt_cloud_compile_run_id}`."
)

dbt_cloud_resource.set_job_environment_variable(
project_id=project_id,
job_id=job_id,
environment_variable_id=compile_run_environment_variable_id,
name=DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR,
value=dbt_cloud_compile_run_id,
)

typer.echo("Update complete.")


# https://typer.tiangolo.com/tutorial/commands/one-or-multiple/#one-command-and-one-callback
@app.callback()
def callback() -> None:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import responses
from dagster_dbt.cloud.cli import DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR, app
from typer.testing import CliRunner

from .utils import (
SAMPLE_ACCOUNT_ID,
SAMPLE_API_PREFIX,
SAMPLE_API_V3_PREFIX,
SAMPLE_JOB_ID,
SAMPLE_PROJECT_ID,
sample_get_environment_variables,
sample_list_job_details,
sample_run_details,
sample_set_environment_variable,
)

runner = CliRunner()


@responses.activate
def test_cache_compile_references(monkeypatch):
monkeypatch.setenv("DBT_CLOUD_API_KEY", "test")
monkeypatch.setenv("DBT_CLOUD_ACCOUNT_ID", SAMPLE_ACCOUNT_ID)
monkeypatch.setenv("DBT_CLOUD_PROJECT_ID", SAMPLE_PROJECT_ID)
compile_run_environment_variable_id = 3

responses.get(f"{SAMPLE_API_PREFIX}/jobs", json=sample_list_job_details())
responses.post(f"{SAMPLE_API_PREFIX}/jobs/{SAMPLE_JOB_ID}/run/", json=sample_run_details())
responses.get(
f"{SAMPLE_API_V3_PREFIX}/projects/{SAMPLE_PROJECT_ID}/environment-variables/job",
json=sample_get_environment_variables(
environment_variable_id=compile_run_environment_variable_id,
name=DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR,
),
)
responses.post(
f"{SAMPLE_API_V3_PREFIX}/projects/{SAMPLE_PROJECT_ID}/environment-variables/{compile_run_environment_variable_id}",
json=sample_set_environment_variable(
environment_variable_id=compile_run_environment_variable_id,
name=DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR,
value="500000",
),
)

result = runner.invoke(app, ["cache-compile-references"])

assert result.exit_code == 0
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
from .utils import (
SAMPLE_ACCOUNT_ID,
SAMPLE_API_PREFIX,
SAMPLE_API_V3_PREFIX,
SAMPLE_JOB_ID,
SAMPLE_PROJECT_ID,
SAMPLE_RUN_ID,
sample_get_environment_variables,
sample_job_details,
sample_list_artifacts,
sample_list_job_details,
sample_run_details,
sample_run_results,
sample_runs_details,
sample_set_environment_variable,
)


Expand Down Expand Up @@ -303,69 +306,39 @@ def test_no_run_results_job():
@responses.activate
def test_get_environment_variables():
dc_resource = get_dbt_cloud_resource()
project_id = 1000

responses.add(
responses.GET,
f"{dc_resource.api_v3_base_url}{SAMPLE_ACCOUNT_ID}/projects/{project_id}/environment-variables/job",
json={
"status": {
"code": 200,
"is_success": True,
"user_message": "Success!",
"developer_message": "",
},
"data": {
"DBT_DAGSTER_ENV_VAR": {
"project": {"id": 1, "value": "-1"},
"environment": {"id": 2, "value": "-1"},
"job": {"id": 3, "value": "100"},
},
},
},
f"{SAMPLE_API_V3_PREFIX}/projects/{SAMPLE_PROJECT_ID}/environment-variables/job",
json=sample_get_environment_variables(
environment_variable_id=3, name="DBT_DAGSTER_ENV_VAR"
),
)

dc_resource.get_job_environment_variables(project_id=project_id, job_id=SAMPLE_JOB_ID)
assert dc_resource.get_job_environment_variables(
project_id=SAMPLE_PROJECT_ID, job_id=SAMPLE_JOB_ID
)


@responses.activate
def test_set_environment_variable():
dc_resource = get_dbt_cloud_resource()
project_id = 1000
environment_variable_id = 1
name = "DBT_DAGSTER_ENV_VAR"
value = "2000"

responses.add(
responses.POST,
f"{dc_resource.api_v3_base_url}{SAMPLE_ACCOUNT_ID}/projects/{project_id}/environment-variables/{environment_variable_id}",
json={
"status": {
"code": 200,
"is_success": True,
"user_message": "Success!",
"developer_message": "",
},
"data": {
"account_id": SAMPLE_ACCOUNT_ID,
"project_id": project_id,
"name": "DBT_DAGSTER_ENV_VAR",
"type": "job",
"state": 1,
"user_id": None,
"environment_id": None,
"job_definition_id": SAMPLE_JOB_ID,
"environment": None,
"display_value": "2000",
"id": 1,
"created_at": "2023-01-01 10:00:00.000000+00:00",
"updated_at": "2023-01-02 10:00:00.000000+00:00",
},
},
f"{SAMPLE_API_V3_PREFIX}/projects/{SAMPLE_PROJECT_ID}/environment-variables/{environment_variable_id}",
json=sample_set_environment_variable(
environment_variable_id=environment_variable_id, name=name, value=value
),
)

dc_resource.set_job_environment_variable(
project_id=project_id,
assert dc_resource.set_job_environment_variable(
project_id=SAMPLE_PROJECT_ID,
job_id=SAMPLE_JOB_ID,
environment_variable_id=environment_variable_id,
name="DBT_DAGSTER_ENV_VAR",
value=2000,
name=name,
value=value,
)

0 comments on commit 6d1c659

Please sign in to comment.