Skip to content

Commit

Permalink
Drop Connection.schema use in DbtCloudHook (#29166)
Browse files Browse the repository at this point in the history
* Correct tenant eval within async logic of DbtCloudHook

Related: #28890 #29014

There was a recent enhancement of DbtCloudRunJobOperator to include deferrable/async functionality. Unfortunately the `tenant` evaluation in the DbtCloudHook was outdated and didn't include the most recent change to properly handle domain specification.

This PR consolidates the tenant eval logic to a common method to be used by both sync and async methods in the hook.

* Remove Connection.schema use

* Update provider.yaml and changelog
  • Loading branch information
josh-fell authored Feb 2, 2023
1 parent 6ec97dc commit 91c0ce7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
16 changes: 16 additions & 0 deletions airflow/providers/dbt/cloud/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@
Changelog
---------

3.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

Beginning with version 2.0.0, users could specify single-tenant dbt Cloud domains via the ``schema`` parameter
in an Airflow connection. Subsequently in version 2.3.1, users could also connect to the dbt Cloud instances
outside of the US region as well as private instances by using the ``host`` parameter of their Airflow
connection to specify the entire tenant domain. Backwards compatibility for using ``schema`` was left in
place. Version 3.0.0 removes support for using ``schema`` to specify the tenant domain of a dbt Cloud
instance. If you wish to connect to a single-tenant, instance outside of the US, or a private instance, you
must use the ``host`` parameter to specify the _entire_ tenant domain name in your Airflow connection.

* ``Drop Connection.schema use in DbtCloudHook (#29166)``

2.3.1
.....

Expand Down
29 changes: 8 additions & 21 deletions airflow/providers/dbt/cloud/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import json
import time
import warnings
from enum import Enum
from functools import wraps
from inspect import signature
Expand Down Expand Up @@ -181,33 +180,33 @@ def __init__(self, dbt_cloud_conn_id: str = default_conn_name, *args, **kwargs)
super().__init__(auth_type=TokenAuth)
self.dbt_cloud_conn_id = dbt_cloud_conn_id

@staticmethod
def _get_tenant_domain(conn: Connection) -> str:
return conn.host or "cloud.getdbt.com"

@staticmethod
def get_request_url_params(
tenant: str, endpoint: str, include_related: list[str] | None = None
) -> tuple[str, dict[str, Any]]:
"""
Form URL from base url and endpoint url
:param tenant: The tenant name which is need to be replaced in base url.
:param tenant: The tenant domain name which is need to be replaced in base url.
:param endpoint: Endpoint url to be requested.
:param include_related: Optional. List of related fields to pull with the run.
Valid values are "trigger", "job", "repository", and "environment".
"""
data: dict[str, Any] = {}
base_url = f"https://{tenant}.getdbt.com/api/v2/accounts/"
if include_related:
data = {"include_related": include_related}
if base_url and not base_url.endswith("/") and endpoint and not endpoint.startswith("/"):
url = base_url + "/" + endpoint
else:
url = (base_url or "") + (endpoint or "")
url = f"https://{tenant}/api/v2/accounts/{endpoint or ''}"
return url, data

async def get_headers_tenants_from_connection(self) -> tuple[dict[str, Any], str]:
"""Get Headers, tenants from the connection details"""
headers: dict[str, Any] = {}
connection: Connection = await sync_to_async(self.get_connection)(self.dbt_cloud_conn_id)
tenant: str = connection.schema if connection.schema else "cloud"
tenant = self._get_tenant_domain(connection)
package_name, provider_version = _get_provider_info()
headers["User-Agent"] = f"{package_name}-v{provider_version}"
headers["Content-Type"] = "application/json"
Expand Down Expand Up @@ -267,19 +266,7 @@ def connection(self) -> Connection:
return _connection

def get_conn(self, *args, **kwargs) -> Session:
if self.connection.schema:
warnings.warn(
"The `schema` parameter is deprecated and use within a dbt Cloud connection will be removed "
"in a future version. Please use `host` instead and specify the entire tenant domain name.",
category=DeprecationWarning,
stacklevel=2,
)
# Prior to deprecation, the connection.schema value could _only_ modify the third-level
# domain value while '.getdbt.com' was always used as the remainder of the domain name.
tenant = f"{self.connection.schema}.getdbt.com"
else:
tenant = self.connection.host or "cloud.getdbt.com"

tenant = self._get_tenant_domain(self.connection)
self.base_url = f"https://{tenant}/api/v2/accounts/"

session = Session()
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/dbt/cloud/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ description: |
`dbt Cloud <https://www.getdbt.com/product/what-is-dbt/>`__
versions:
- 3.0.0
- 2.3.1
- 2.3.0
- 2.2.0
Expand Down

0 comments on commit 91c0ce7

Please sign in to comment.