Skip to content

Commit

Permalink
feat(dbt-cloud): raise exception if cached compile has not completed (#…
Browse files Browse the repository at this point in the history
…11792)

### Summary & Motivation
If the cached compile run is not completed, then we cannot fetch the
manifests. Raise an exception instead.

### How I Tested These Changes
pytest, local
  • Loading branch information
rexledesma authored and alangenfeld committed Jan 19, 2023
1 parent 609b3e7 commit a5411a0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)
from ..errors import DagsterDbtCloudJobInvariantViolationError
from ..utils import ASSET_RESOURCE_TYPES, result_to_events
from .resources import DbtCloudResource
from .resources import DbtCloudResource, DbtCloudRunStatus

DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR = "DBT_DAGSTER_COMPILE_RUN_ID"

Expand Down Expand Up @@ -133,6 +133,18 @@ def _get_cached_compile_dbt_cloud_job_run(self, compile_run_id: int) -> Tuple[in
compile_run = self._dbt_cloud.get_run(
run_id=compile_run_id, include_related=["trigger", "run_steps"]
)

compile_run_status: str = compile_run["status_humanized"]
if compile_run_status != DbtCloudRunStatus.SUCCESS:
raise DagsterDbtCloudJobInvariantViolationError(
f"The cached dbt Cloud job run `{compile_run_id}` must have a status of"
f" `{DbtCloudRunStatus.SUCCESS}`. Received status: `{compile_run_status}. You can"
f" view the full status of your dbt Cloud run at {compile_run['href']}. Once it has"
" successfully completed, reload your Dagster definitions. If your run has failed,"
" you must manually refresh the cache using the `dagster-dbt"
" cache-compile-references` CLI."
)

compile_run_has_generate_docs = compile_run["trigger"]["generate_docs_override"]

compile_job_materialization_command_step = len(compile_run["run_steps"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,13 @@ def test_load_assets_from_cached_compile_run(
)
responses.get(
url=f"{dbt_cloud_service.api_v2_base_url}{DBT_CLOUD_ACCOUNT_ID}/runs/{cached_compile_run_id}/?include_related=trigger,run_steps",
json={"data": {"trigger": {"generate_docs_override": False}, "run_steps": [{}]}},
json={
"data": {
"trigger": {"generate_docs_override": False},
"run_steps": [{}],
"status_humanized": "Success",
}
},
)
responses.get(
url=f"{dbt_cloud_service.api_v2_base_url}{DBT_CLOUD_ACCOUNT_ID}/runs/{cached_compile_run_id}/artifacts/manifest.json",
Expand Down Expand Up @@ -320,6 +326,44 @@ def test_load_assets_from_cached_compile_run(
)


@responses.activate
def test_invalid_cached_compile_run(dbt_cloud, dbt_cloud_service):
environment_variable_id = 1
cached_compile_run_id = "12345"

_add_dbt_cloud_job_responses(
dbt_cloud_service=dbt_cloud_service,
dbt_commands=["dbt build"],
)
responses.replace(
responses.GET,
url=f"{dbt_cloud_service.api_v3_base_url}{DBT_CLOUD_ACCOUNT_ID}/projects/{DBT_CLOUD_PROJECT_ID}/environment-variables/job?job_definition_id={DBT_CLOUD_JOB_ID}",
json=sample_get_environment_variables(
environment_variable_id=environment_variable_id,
name=DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR,
value=cached_compile_run_id,
),
)
responses.get(
url=f"{dbt_cloud_service.api_v2_base_url}{DBT_CLOUD_ACCOUNT_ID}/runs/{cached_compile_run_id}/?include_related=trigger,run_steps",
json={
"data": {
"trigger": {"generate_docs_override": False},
"run_steps": [{}],
"status_humanized": "Running",
"href": "https://cloud.getdbt.com",
}
},
)

dbt_cloud_cacheable_assets = load_assets_from_dbt_cloud_job(
dbt_cloud=dbt_cloud, job_id=DBT_CLOUD_JOB_ID
)

with pytest.raises(DagsterDbtCloudJobInvariantViolationError):
dbt_cloud_cacheable_assets.compute_cacheable_data()


@responses.activate
@pytest.mark.parametrize(
"invalid_dbt_commands",
Expand Down

0 comments on commit a5411a0

Please sign in to comment.