Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docs/integrations/engines/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ SQLMesh connects to Databricks with the [Databricks SQL Connector](https://docs.

The SQL Connector is bundled with SQLMesh and automatically installed when you include the `databricks` extra in the command `pip install "sqlmesh[databricks]"`.

The SQL Connector has all the functionality needed for SQLMesh to execute SQL models on Databricks and Python models locally (the default SQLMesh approach).
The SQL Connector has all the functionality needed for SQLMesh to execute SQL models on Databricks and Python models that do not return PySpark DataFrames.

The SQL Connector does not support Databricks Serverless Compute. If you require Serverless Compute then you must use the Databricks Connect library.
If you have Python models returning PySpark DataFrames, check out the [Databricks Connect](#databricks-connect-1) section.

### Databricks Connect

Expand Down Expand Up @@ -229,7 +229,9 @@ If you want Databricks to process PySpark DataFrames in SQLMesh Python models, t

SQLMesh **DOES NOT** include/bundle the Databricks Connect library. You must [install the version of Databricks Connect](https://docs.databricks.com/en/dev-tools/databricks-connect/python/install.html) that matches the Databricks Runtime used in your Databricks cluster.

SQLMesh's Databricks Connect implementation supports Databricks Runtime 13.0 or higher. If SQLMesh detects that you have Databricks Connect installed, then it will use it for all Python models (both Pandas and PySpark DataFrames).
If SQLMesh detects that you have Databricks Connect installed, then it will automatically configure the connection and use it for all Python models that return a Pandas or PySpark DataFrame.

To have databricks-connect installed but ignored by SQLMesh, set `disable_databricks_connect` to `true` in the connection configuration.

Databricks Connect can execute SQL and DataFrame operations on different clusters by setting the SQLMesh `databricks_connect_*` connection options. For example, these options could configure SQLMesh to run SQL on a [Databricks SQL Warehouse](https://docs.databricks.com/sql/admin/create-sql-warehouse.html) while still routing DataFrame operations to a normal Databricks Cluster.

Expand Down Expand Up @@ -259,7 +261,7 @@ The only relevant SQLMesh configuration parameter is the optional `catalog` para
| `databricks_connect_server_hostname` | Databricks Connect Only: Databricks Connect server hostname. Uses `server_hostname` if not set. | string | N |
| `databricks_connect_access_token` | Databricks Connect Only: Databricks Connect access token. Uses `access_token` if not set. | string | N |
| `databricks_connect_cluster_id` | Databricks Connect Only: Databricks Connect cluster ID. Uses `http_path` if not set. Cannot be a Databricks SQL Warehouse. | string | N |
| `databricks_connect_use_serverless` | Databricks Connect Only: Use a serverless cluster for Databricks Connect. If using serverless then SQL connector is disabled since Serverless is not supported for SQL Connector | bool | N |
| `databricks_connect_use_serverless` | Databricks Connect Only: Use a serverless cluster for Databricks Connect instead of `databricks_connect_cluster_id`. | bool | N |
| `force_databricks_connect` | When running locally, force the use of Databricks Connect for all model operations (so don't use SQL Connector for SQL models) | bool | N |
| `disable_databricks_connect` | When running locally, disable the use of Databricks Connect for all model operations (so use SQL Connector for all models) | bool | N |
| `disable_spark_session` | Do not use SparkSession if it is available (like when running in a notebook). | bool | N |
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ def configure_logging(
log_limit: int = c.DEFAULT_LOG_LIMIT,
log_file_dir: t.Optional[t.Union[str, Path]] = None,
) -> None:
# Remove noisy grpc logs that are not useful for users
os.environ["GRPC_VERBOSITY"] = os.environ.get("GRPC_VERBOSITY", "NONE")

logger = logging.getLogger()
debug = force_debug or debug_mode_enabled()

Expand Down
21 changes: 12 additions & 9 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,12 @@ class DatabricksConnectionConfig(ConnectionConfig):

@model_validator(mode="before")
def _databricks_connect_validator(cls, data: t.Any) -> t.Any:
# SQLQueryContextLogger will output any error SQL queries even if they are in a try/except block.
# Disabling this allows SQLMesh to determine what should be shown to the user.
# Ex: We describe a table to see if it exists and therefore that execution can fail but we don't need to show
# the user since it is expected if the table doesn't exist. Without this change the user would see the error.
logging.getLogger("SQLQueryContextLogger").setLevel(logging.CRITICAL)

if not isinstance(data, dict):
return data

Expand All @@ -641,10 +647,6 @@ def _databricks_connect_validator(cls, data: t.Any) -> t.Any:
data.get("auth_type"),
)

if databricks_connect_use_serverless:
data["force_databricks_connect"] = True
data["disable_databricks_connect"] = False

if (not server_hostname or not http_path or not access_token) and (
not databricks_connect_use_serverless and not auth_type
):
Expand All @@ -666,11 +668,12 @@ def _databricks_connect_validator(cls, data: t.Any) -> t.Any:
data["databricks_connect_access_token"] = access_token
if not data.get("databricks_connect_server_hostname"):
data["databricks_connect_server_hostname"] = f"https://{server_hostname}"
if not databricks_connect_use_serverless:
if not data.get("databricks_connect_cluster_id"):
if t.TYPE_CHECKING:
assert http_path is not None
data["databricks_connect_cluster_id"] = http_path.split("/")[-1]
if not databricks_connect_use_serverless and not data.get(
"databricks_connect_cluster_id"
):
if t.TYPE_CHECKING:
assert http_path is not None
data["databricks_connect_cluster_id"] = http_path.split("/")[-1]

if auth_type:
from databricks.sql.auth.auth import AuthType
Expand Down
10 changes: 8 additions & 2 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@
from sqlmesh.utils import columns_to_types_all_known, random_id
from sqlmesh.utils.connection_pool import create_connection_pool
from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column
from sqlmesh.utils.errors import SQLMeshError, UnsupportedCatalogOperationError
from sqlmesh.utils.errors import (
SQLMeshError,
UnsupportedCatalogOperationError,
MissingDefaultCatalogError,
)
from sqlmesh.utils.pandas import columns_to_types_from_df

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -186,7 +190,9 @@ def default_catalog(self) -> t.Optional[str]:
return None
default_catalog = self._default_catalog or self.get_current_catalog()
if not default_catalog:
raise SQLMeshError("Could not determine a default catalog despite it being supported.")
raise MissingDefaultCatalogError(
"Could not determine a default catalog despite it being supported."
)
return default_catalog

@property
Expand Down
131 changes: 71 additions & 60 deletions sqlmesh/core/engine_adapter/databricks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

import logging
import os
import typing as t
from functools import partial

import pandas as pd
from sqlglot import exp
Expand All @@ -17,7 +17,8 @@
from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
from sqlmesh.core.node import IntervalUnit
from sqlmesh.core.schema_diff import SchemaDiffer
from sqlmesh.utils.errors import SQLMeshError
from sqlmesh.engines.spark.db_api.spark_session import connection, SparkSessionConnection
from sqlmesh.utils.errors import SQLMeshError, MissingDefaultCatalogError

if t.TYPE_CHECKING:
from sqlmesh.core._typing import SchemaName, TableName
Expand Down Expand Up @@ -49,7 +50,7 @@ class DatabricksEngineAdapter(SparkEngineAdapter):

def __init__(self, *args: t.Any, **kwargs: t.Any):
super().__init__(*args, **kwargs)
self._spark: t.Optional[PySparkSession] = None
self._set_spark_engine_adapter_if_needed()

@classmethod
def can_access_spark_session(cls, disable_spark_session: bool) -> bool:
Expand Down Expand Up @@ -93,21 +94,42 @@ def _use_spark_session(self) -> bool:
)

@property
def use_serverless(self) -> bool:
from sqlmesh import RuntimeEnv
from sqlmesh.utils import str_to_bool
def is_spark_session_connection(self) -> bool:
return isinstance(self.connection, SparkSessionConnection)

if not self._use_spark_session:
return False
return (
RuntimeEnv.get().is_databricks and str_to_bool(os.environ.get("IS_SERVERLESS", "False"))
) or bool(self._extra_config["databricks_connect_use_serverless"])
def _set_spark_engine_adapter_if_needed(self) -> None:
self._spark_engine_adapter = None

@property
def is_spark_session_cursor(self) -> bool:
from sqlmesh.engines.spark.db_api.spark_session import SparkSessionCursor
if not self._use_spark_session or self.is_spark_session_connection:
return

from databricks.connect import DatabricksSession

connect_kwargs = dict(
host=self._extra_config["databricks_connect_server_hostname"],
token=self._extra_config["databricks_connect_access_token"],
)
if "databricks_connect_use_serverless" in self._extra_config:
connect_kwargs["serverless"] = True
else:
connect_kwargs["cluster_id"] = self._extra_config["databricks_connect_cluster_id"]

catalog = self._extra_config.get("catalog")
spark = (
DatabricksSession.builder.remote(**connect_kwargs).userAgent("sqlmesh").getOrCreate()
)
self._spark_engine_adapter = SparkEngineAdapter(
partial(connection, spark=spark, catalog=catalog)
)

return isinstance(self.cursor, SparkSessionCursor)
@property
def cursor(self) -> t.Any:
if (
self._connection_pool.get_attribute("use_spark_engine_adapter")
and not self.is_spark_session_connection
):
return self._spark_engine_adapter.cursor # type: ignore
return super().cursor

@property
def spark(self) -> PySparkSession:
Expand All @@ -117,31 +139,17 @@ def spark(self) -> PySparkSession:
"Either run from a Databricks Notebook or "
"install `databricks-connect` and configure it to connect to your Databricks cluster."
)

if self.is_spark_session_cursor:
return self._connection_pool.get().spark

from databricks.connect import DatabricksSession

if self._spark is None:
self._spark = (
DatabricksSession.builder.remote(
host=self._extra_config["databricks_connect_server_hostname"],
token=self._extra_config["databricks_connect_access_token"],
cluster_id=self._extra_config["databricks_connect_cluster_id"],
)
.userAgent("sqlmesh")
.getOrCreate()
)
catalog = self._extra_config.get("catalog")
if catalog:
self.set_current_catalog(catalog)
return self._spark
if self.is_spark_session_connection:
return self.connection.spark
return self._spark_engine_adapter.spark # type: ignore

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

def _end_session(self) -> None:
self._connection_pool.set_attribute("use_spark_engine_adapter", False)

def _df_to_source_queries(
self,
df: DF,
Expand All @@ -157,14 +165,8 @@ def _df_to_source_queries(

def query_factory() -> Query:
temp_table = self._get_temp_table(target_table or "spark", table_only=True)
if self.use_serverless:
# Global temp views are not supported on Databricks Serverless
# This also means we can't mix Python SQL Connection and DB Connect since they wouldn't
# share the same temp objects.
df.createOrReplaceTempView(temp_table.sql(dialect=self.dialect)) # type: ignore
else:
df.createOrReplaceGlobalTempView(temp_table.sql(dialect=self.dialect)) # type: ignore
temp_table.set("db", "global_temp")
df.createOrReplaceTempView(temp_table.sql(dialect=self.dialect))
self._connection_pool.set_attribute("use_spark_engine_adapter", True)
return exp.select(*self._casted_columns(columns_to_types)).from_(temp_table)

if self._use_spark_session:
Expand All @@ -175,16 +177,12 @@ def _fetch_native_df(
self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
) -> DF:
"""Fetches a DataFrame that can be either Pandas or PySpark from the cursor"""
if self.is_spark_session_cursor:
if self.is_spark_session_connection:
return super()._fetch_native_df(query, quote_identifiers=quote_identifiers)
if self._use_spark_session:
sql = (
self._to_sql(query, quote=quote_identifiers)
if isinstance(query, exp.Expression)
else query
return self._spark_engine_adapter._fetch_native_df( # type: ignore
query, quote_identifiers=quote_identifiers
)
self._log_sql(sql)
return self.spark.sql(sql)
self.execute(query)
return self.cursor.fetchall_arrow().to_pandas()

Expand All @@ -200,34 +198,38 @@ def fetchdf(
return df

def get_current_catalog(self) -> t.Optional[str]:
# Update the Dataframe API if we have a spark session
if self._use_spark_session:
pyspark_catalog = None
sql_connector_catalog = None
if self._spark_engine_adapter:
from py4j.protocol import Py4JError
from pyspark.errors.exceptions.connect import SparkConnectGrpcException

try:
# Note: Spark 3.4+ Only API
return self.spark.catalog.currentCatalog()
pyspark_catalog = self._spark_engine_adapter.get_current_catalog()
except (Py4JError, SparkConnectGrpcException):
pass
result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
if result:
return result[0]
return None
if not self.is_spark_session_connection:
result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION))
sql_connector_catalog = result[0] if result else None
if self._spark_engine_adapter and pyspark_catalog != sql_connector_catalog:
logger.warning(
f"Current catalog mismatch between Databricks SQL Connector and Databricks-Connect: `{sql_connector_catalog}` != `{pyspark_catalog}`. Set `catalog` connection property to make them the same."
)
return pyspark_catalog or sql_connector_catalog

def set_current_catalog(self, catalog_name: str) -> None:
# Since Databricks splits commands across the Dataframe API and the SQL Connector
# (depending if databricks-connect is installed and a Dataframe is used) we need to ensure both
# are set to the same catalog since they maintain their default catalog seperately
# are set to the same catalog since they maintain their default catalog separately
self.execute(exp.Use(this=exp.to_identifier(catalog_name), kind="CATALOG"))
# Update the Dataframe API is we have a spark session
if self._use_spark_session:
from py4j.protocol import Py4JError
from pyspark.errors.exceptions.connect import SparkConnectGrpcException

try:
# Note: Spark 3.4+ Only API
self.spark.catalog.setCurrentCatalog(catalog_name)
self._spark_engine_adapter.set_current_catalog(catalog_name) # type: ignore
except (Py4JError, SparkConnectGrpcException):
pass

Expand Down Expand Up @@ -257,6 +259,15 @@ def clone_table(
def wap_supported(self, table_name: TableName) -> bool:
return False

@property
def default_catalog(self) -> t.Optional[str]:
try:
return super().default_catalog
except MissingDefaultCatalogError as e:
raise MissingDefaultCatalogError(
"Could not determine default catalog. Define the connection property `catalog` since it can't be inferred from your connection. See SQLMesh Databricks documentation for details"
) from e

def _build_table_properties_exp(
self,
catalog_name: t.Optional[str] = None,
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ class PythonModelEvalError(SQLMeshError):
pass


class MissingDefaultCatalogError(SQLMeshError):
pass


def raise_config_error(
msg: str,
location: t.Optional[str | Path] = None,
Expand Down