Skip to content

Commit

Permalink
[dagster-dbt] small tweaks (#7967)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 18, 2022
1 parent 0424134 commit 2a56fa2
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@
"your dbt project configuration file."
),
),
"docs_url": Field(
config=StringSource,
is_required=False,
description="The url for where dbt docs are being served for this project.",
),
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ def __init__(
ignore_handled_error: bool,
target_path: str,
logger: Optional[Any] = None,
docs_url: Optional[str] = None,
):
self._default_flags = default_flags
self._executable = executable
self._warn_error = warn_error
self._ignore_handled_error = ignore_handled_error
self._target_path = target_path
self._docs_url = docs_url
super().__init__(logger)

@property
Expand Down Expand Up @@ -87,6 +89,7 @@ def cli(self, command: str, **kwargs) -> DbtCliOutput:
warn_error=self._warn_error,
ignore_handled_error=self._ignore_handled_error,
target_path=self._target_path,
docs_url=self._docs_url,
)

def compile(
Expand Down Expand Up @@ -352,4 +355,5 @@ def dbt_cli_pipeline():
ignore_handled_error=context.resource_config["ignore_handled_error"],
target_path=context.resource_config["target_path"],
logger=context.log,
docs_url=context.resource_config.get("docs_url"),
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

import dagster._check as check

Expand All @@ -20,6 +20,7 @@ class DbtCliOutput(DbtOutput):
result (Optional[Dict[str, Any]]): Dictionary containing dbt-reported result information
contained in run_results.json. Some dbt commands do not produce results, and will
therefore have result = None.
docs_url (Optional[str]): Hostname where dbt docs are being served for this project.
"""

def __init__(
Expand All @@ -29,11 +30,13 @@ def __init__(
raw_output: str,
logs: List[Dict[str, Any]],
result: Dict[str, Any],
docs_url: Optional[str] = None,
):
self._command = check.str_param(command, "command")
self._return_code = check.int_param(return_code, "return_code")
self._raw_output = check.str_param(raw_output, "raw_output")
self._logs = check.list_param(logs, "logs", of_type=dict)
self._docs_url = check.opt_str_param(docs_url, "docs_url")
super().__init__(result)

@property
Expand All @@ -51,3 +54,7 @@ def raw_output(self) -> str:
@property
def logs(self) -> List[Dict[str, Any]]:
return self._logs

@property
def docs_url(self) -> Optional[str]:
return self._docs_url
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import os
import subprocess
from typing import Any, Dict
from typing import Any, Dict, Optional

import dagster._check as check
from dagster.core.utils import coerce_valid_log_level
Expand All @@ -23,6 +23,7 @@ def execute_cli(
warn_error: bool,
ignore_handled_error: bool,
target_path: str,
docs_url: Optional[str] = None,
) -> DbtCliOutput:
"""Executes a command on the dbt CLI in a subprocess."""
check.str_param(executable, "executable")
Expand Down Expand Up @@ -111,6 +112,7 @@ def execute_cli(
raw_output=raw_output,
logs=logs,
result=run_results,
docs_url=docs_url,
)


Expand Down
4 changes: 2 additions & 2 deletions python_modules/libraries/dagster-dbt/dagster_dbt/cloud/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ def dbt_cloud_run_op(context):
.. code-block:: python
from dagster import job
from dagster_dbt import dbt_cloud_resource, run_dbt_cloud_op
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op
my_dbt_cloud_resource = dbt_cloud_resource.configured(
{"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"}, "account_id": 77777}
)
run_dbt_nightly_sync = run_dbt_cloud_op.configured(
run_dbt_nightly_sync = dbt_cloud_run_op.configured(
{"job_id": 54321}, name="run_dbt_nightly_sync"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ def run_job_and_poll(
default_value=0.25,
description="Time (in seconds) to wait between each request retry.",
),
"dbt_cloud_host": Field(
config=StringSource,
default_value=DBT_DEFAULT_HOST,
description="The hostname where dbt cloud is being hosted (e.g. https://my_org.cloud.getdbt.com/).",
),
},
description="This resource helps interact with dbt Cloud connectors",
)
Expand Down Expand Up @@ -530,4 +535,5 @@ def my_dbt_cloud_job():
request_max_retries=context.resource_config["request_max_retries"],
request_retry_delay=context.resource_config["request_retry_delay"],
log=context.log,
dbt_cloud_host=context.resource_config["dbt_cloud_host"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ def my_dbt_solid(context):
assert len(dbt_result.result["results"]) == 1


def test_docs_url_run(
dbt_seed, conn_string, test_project_dir, dbt_config_dir
): # pylint: disable=unused-argument
@solid(required_resource_keys={"dbt"})
def my_dbt_solid(context):
return context.resources.dbt.run()

context = get_dbt_solid_context(test_project_dir, dbt_config_dir, docs_url="foo.com")
dbt_result = my_dbt_solid(context)
assert len(dbt_result.result["results"]) == 4
assert dbt_result.docs_url == "foo.com"


def test_models_override_run(
dbt_seed, conn_string, test_project_dir, dbt_config_dir
): # pylint: disable=unused-argument
Expand Down

0 comments on commit 2a56fa2

Please sign in to comment.