Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 5 additions & 82 deletions docs/guides/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,85 +38,8 @@ sqlmesh --test-connection local_db plan

## Supported engines

### BigQuery
TBD

See the [engine configuration reference](../integrations/engines.md#bigquery---localbuilt-in-scheduler) for more details.

### Databricks

A Databricks connection should be configured as follows:
```yaml linenums="1"
connections:
my_databricks_connection:
type: databricks
server_hostname: [server hostname]
access_token: [access token]
http_headers: [optional, list of key-value pairs]
session_configuration: [optional, key-value mapping]
concurrent_tasks: [optional, should be greater than 0]
```

See the [engine configuration reference](../integrations/engines.md#databricks---localbuilt-in-scheduler) for more details.

### DuckDB

A DuckDB connection should be configured as follows:
```yaml linenums="1"
connections:
my_duckdb_connection:
type: duckdb
database: [optional, path to the database file]
```

See the [engine configuration reference](../reference/configuration.md#duckdb) for more details.

### Redshift

A Redshift connection should be configured as follows:
```yaml linenums="1"
connections:
my_redshift_connection:
type: redshift
user: [optional, username]
password: [optional, password]
database: [optional, database]
host: [optional, hostname]
port: [optional, port]
ssl: [optional, boolean flag which determines whether SSL is enabled]
sslmode: [optional, the security of the connection to the Amazon Redshift cluster]
timeout: [optional, connection timeout]
tcp_keepalive: [optional, boolean flag which determines whether to use TCP Keepalives]
application_name: [optional, the application name]
preferred_role: [optional, the IAM role]
principal_arn: [optional, the ARN for the IAM entity (user or role)]
credentials_provider: [optional, the class name of the IdP that will be used for authentication]
region: [optional, the AWS region]
cluster_identifier: [optional, the cluster identifier]
iam: [optional, boolean flag which determines whether the IAM authentication should be used]
is_serverless: [optional, whether the Redshift endpoint is serverless or provisional]
serverless_acct_id: [optional, serverless account ID]
serverless_work_group: [optional, serverless work group]
concurrent_tasks: [optional, should be greater than 0]
```

See the [engine configuration reference](../integrations/engines.md#redshift---localbuilt-in-scheduler) for more details.

### Snowflake

A Snowflake connection should be configured as follows:
```yaml linenums="1"
connections:
my_snowflake_connection:
type: snowflake
user: [required if not using Okta, username]
password: [required if using password]
authenticator: [required if using externalbrowser]
account: [required, account ID]
warehouse: [optional, warehouse name]
database: [optional, database name]
role: [optional, user role]
concurrent_tasks: [optional, should be greater than 0]
```

See the [engine configuration reference](../integrations/engines.md#snowflake---localbuilt-in-scheduler) for more details.
* [BigQuery](../integrations/engines.md#bigquery---localbuilt-in-scheduler)
* [Databricks](../integrations/engines.md#databricks---localbuilt-in-scheduler)
* [Redshift](../integrations/engines.md#redshift---localbuilt-in-scheduler)
* [Snowflake](../integrations/engines.md#snowflake---localbuilt-in-scheduler)
* [Spark](../integrations/engines.md#spark---localbuilt-in-scheduler)
15 changes: 13 additions & 2 deletions docs/integrations/engines.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,19 @@

# BigQuery
## BigQuery - Local/Built-in Scheduler
Currently relies on local configuration of `gcloud` CLI to be authenticated in order to connect.
[Github issue to expand supported methods](https://github.com/TobikoData/sqlmesh/issues/270).
| Option | Description | Type | Required |
|-----------------|-----------------------------------------------------------------------------------------------|:------:|:--------:|
| `method` | Connection methods. Can be `oath`, `oauth-secrets`, `service-account`, `service-account-json` | string | N |
| `project` | The name of the GCP project | string | N |
| `location` | The location of for the datasets (can be regional or multi-regional) | string | Y |
| `keyfile` | Path to the keyfile to be used with service-account method | string | Y |
| `keyfile_json` | Keyfile information provided inline (not recommended) | dict | N |
| `token` | Oath secret auth token | string | N |
| `refresh_token` | Oath secret auth refresh_token | string | N |
| `client_id` | Oath secret auth client_id | string | N |
| `client_secret` | Oath secret auth client_secret | string | N |
| `token_uri` | Oath secret auth token_uri | string | N |
| `scopes` | Oath secret auth scopes | list | N |

## BigQuery - Airflow Scheduler
**Engine Name:** `bigquery`
Expand Down
1 change: 1 addition & 0 deletions examples/airflow/Dockerfile.template
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ RUN mkdir /opt/sqlmesh/sqlmesh
RUN chown -R airflow /opt/sqlmesh
USER airflow
RUN cd /opt/sqlmesh && pip install -e .
RUN pip install dbt-core
18 changes: 13 additions & 5 deletions examples/sushi_dbt/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@ sushi:
path: 'local.duckdb'
schema: sushi
snowflake:
account: "{{ env_var('SNOWFLAKE_ACCOUNT', '') }}"
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
database: sushi
password: "{{ env_var('SNOWFLAKE_PASSWORD', '') }}"
role: "{{ env_var('SNOWFLAKE_ROLE', '') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: "{{ env_var('SNOWFLAKE_ROLE') }}"
schema: sushi
threads: 1
type: snowflake
user: "{{ env_var('SNOWFLAKE_USER', '') }}"
warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE', '') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE') }}"
bigquery:
type: bigquery
method: service-account
project: "{{ env_var('BQ_PROJECT') }}"
dataset: "{{ env_var('BQ_SCHEMA') }}"
threads: 1
keyfile: "{{ env_var('BQ_KEYFILE') }}"
location: "{{ env_var('BQ_LOCATION') }}"
target: in_memory
64 changes: 62 additions & 2 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import abc
import sys
import typing as t
from enum import Enum

from pydantic import Field, root_validator

Expand Down Expand Up @@ -336,13 +337,33 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
return {}


class BigQueryConnectionMethod(str, Enum):
OAUTH = "oauth"
OAUTH_SECRETS = "oauth-secrets"
SERVICE_ACCOUNT = "service-account"
SERVICE_ACCOUNT_JSON = "service-account-json"


class BigQueryConnectionConfig(_ConnectionConfig):
"""
BigQuery Connection Configuration.

TODO: Need to update to support all the different authentication options
"""

method: BigQueryConnectionMethod = BigQueryConnectionMethod.OAUTH

project: t.Optional[str] = None
location: t.Optional[str] = None
# Keyfile Auth
keyfile: t.Optional[str] = None
keyfile_json: t.Optional[t.Dict[str, t.Any]] = None
# Oath Secret Auth
token: t.Optional[str] = None
refresh_token: t.Optional[str] = None
client_id: t.Optional[str] = None
client_secret: t.Optional[str] = None
token_uri: t.Optional[str] = None
scopes: t.Tuple[str, ...] = ("https://www.googleapis.com/auth/bigquery",)

concurrent_tasks: int = 4

type_: Literal["bigquery"] = Field(alias="type", default="bigquery")
Expand All @@ -355,6 +376,45 @@ def _connection_kwargs_keys(self) -> t.Set[str]:
def _engine_adapter(self) -> t.Type[EngineAdapter]:
return engine_adapter.BigQueryEngineAdapter

@property
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
"""The static connection kwargs for this connection"""
import google.auth
from google.api_core import client_info
from google.oauth2 import credentials, service_account

if self.method == BigQueryConnectionMethod.OAUTH:
creds, _ = google.auth.default(scopes=self.scopes)
elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT:
creds = service_account.Credentials.from_service_account_file(
self.keyfile, scopes=self.scopes
)
elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT_JSON:
creds = service_account.Credentials.from_service_account_info(
self.keyfile_json, scopes=self.scopes
)
elif self.method == BigQueryConnectionMethod.OAUTH_SECRETS:
creds = credentials.Credentials(
token=self.token,
refresh_token=self.refresh_token,
client_id=self.client_id,
client_secret=self.client_secret,
token_uri=self.token_uri,
scopes=self.scopes,
)
else:
raise ConfigError("Invalid BigQuery Connection Method")
client = google.cloud.bigquery.Client(
project=self.project,
credentials=creds,
location=self.location,
client_info=client_info.ClientInfo(user_agent="sqlmesh"),
)

return {
"client": client,
}

@property
def _connection_factory(self) -> t.Callable:
from google.cloud.bigquery.dbapi import connect
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def sqlmesh_config(project_root: t.Optional[Path] = None, **kwargs: t.Any) -> Co
profile = Profile.load(context)

return Config(
default_connection=profile.default_target,
connections=profile.to_sqlmesh(),
default_connection=profile.target_name,
connections={profile.target_name: profile.target.to_sqlmesh()},
loader=DbtLoader,
**kwargs,
)
Expand Down
47 changes: 23 additions & 24 deletions sqlmesh/dbt/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import typing as t
from pathlib import Path

from sqlmesh.core.config.connection import ConnectionConfig
from sqlmesh.dbt.common import PROJECT_FILENAME, DbtContext, load_yaml
from sqlmesh.dbt.target import TargetConfig
from sqlmesh.utils.errors import ConfigError
from sqlmesh.utils.yaml import dumps as dump_yaml


class Profile:
Expand All @@ -19,21 +19,21 @@ class Profile:
def __init__(
self,
path: Path,
targets: t.Dict[str, TargetConfig],
default_target: str,
target_name: str,
target: TargetConfig,
):
"""
Args:
path: Path to the profile file
targets: Dict of targets defined for the project
default_target: Name of the default target for the proejct
target_name: Name of the target loaded
target: TargetConfig for target_name
"""
self.path = path
self.targets = targets
self.default_target = default_target
self.target_name = target_name
self.target = target

@classmethod
def load(cls, context: DbtContext) -> Profile:
def load(cls, context: DbtContext, target_name: t.Optional[str] = None) -> Profile:
"""
Loads the profile for the specified project

Expand All @@ -59,8 +59,8 @@ def load(cls, context: DbtContext) -> Profile:
if not profile_filepath:
raise ConfigError(f"{cls.PROFILE_FILE} not found.")

targets, default_target = cls._read_profile(profile_filepath, context)
return Profile(profile_filepath, targets, default_target)
target_name, target = cls._read_profile(profile_filepath, context, target_name)
return Profile(profile_filepath, target_name, target)

@classmethod
def _find_profile(cls, project_root: Path) -> t.Optional[Path]:
Expand All @@ -77,28 +77,27 @@ def _find_profile(cls, project_root: Path) -> t.Optional[Path]:

@classmethod
def _read_profile(
cls, path: Path, context: DbtContext
) -> t.Tuple[t.Dict[str, TargetConfig], str]:
with open(path, "r", encoding="utf-8") as file:
source = file.read()
contents = load_yaml(context.render(source))

project_data = contents.get(context.profile_name)
cls, path: Path, context: DbtContext, target_name: t.Optional[str] = None
) -> t.Tuple[str, TargetConfig]:
project_data = load_yaml(path).get(context.profile_name)
if not project_data:
raise ConfigError(f"Profile '{context.profile_name}' not found in profiles.")

outputs = project_data.get("outputs")
if not outputs:
raise ConfigError(f"No outputs exist in profiles for '{context.profile_name}'.")

targets = {name: TargetConfig.load(name, output) for name, output in outputs.items()}
default_target = context.render(project_data.get("target"))
if default_target not in targets:
if not target_name:
if "target" not in project_data:
raise ConfigError(f"No target specified for '{context.profile_name}'.")
target_name = context.render(project_data.get("target"))

if target_name not in outputs:
raise ConfigError(
f"Default target '{default_target}' not specified in profiles for '{context.profile_name}'."
f"Target '{target_name}' not specified in profiles for '{context.profile_name}'."
)

return (targets, default_target)
target_fields = load_yaml(context.render(dump_yaml(outputs[target_name])))
target = TargetConfig.load(target_name, target_fields)

def to_sqlmesh(self) -> t.Dict[str, ConnectionConfig]:
return {self.default_target: self.targets[self.default_target].to_sqlmesh()}
return (target_name, target)
8 changes: 2 additions & 6 deletions sqlmesh/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ def load(cls, context: DbtContext) -> Project:
context.render(project_yaml.get("profile", "")) or context.project_name
)

profile = Profile.load(context)
context.target = (
profile.targets[context.target_name]
if context.target_name
else profile.targets[profile.default_target]
)
profile = Profile.load(context, context.target_name)
context.target = profile.target

packages = {}
root_loader = PackageLoader(context, ProjectConfig())
Expand Down
Loading