Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/integrations/engines/azuresql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Azure SQL

[Azure SQL](https://azure.microsoft.com/en-us/products/azure-sql) is "a family of managed, secure, and intelligent products that use the SQL Server database engine in the Azure cloud."

The Azure SQL adapter only supports authentication with a username and password. It does not support authentication with Microsoft Entra or Azure Active Directory.

## Local/Built-in Scheduler
**Engine Adapter Type**: `azuresql`

### Installation
```
pip install "sqlmesh[azuresql]"
```

### Connection options

| Option | Description | Type | Required |
| ----------------- | ---------------------------------------------------------------- | :----------: | :------: |
| `type` | Engine type name - must be `azuresql` | string | Y |
| `host` | The hostname of the Azure SQL server | string | Y |
| `user` | The username to use for authentication with the Azure SQL server | string | N |
| `password` | The password to use for authentication with the Azure SQL server | string | N |
| `port` | The port number of the Azure SQL server | int | N |
| `database` | The target database | string | N |
| `charset` | The character set used for the connection | string | N |
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
| `appname` | The application name to use for the connection | string | N |
| `conn_properties` | The list of connection properties | list[string] | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
1 change: 1 addition & 0 deletions docs/integrations/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ SQLMesh supports integrations with the following tools:
SQLMesh supports the following execution engines for running SQLMesh projects:

* [Athena](./engines/athena.md)
* [Azure SQL](./engines/azuresql.md)
* [BigQuery](./engines/bigquery.md)
* [ClickHouse](./engines/clickhouse.md)
* [Databricks](./engines/databricks.md)
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ nav:
- integrations/github.md
- Execution engines:
- integrations/engines/athena.md
- integrations/engines/azuresql.md
- integrations/engines/bigquery.md
- integrations/engines/clickhouse.md
- integrations/engines/databricks.md
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
],
extras_require={
"athena": ["PyAthena[Pandas]"],
"azuresql": ["pymssql"],
"bigquery": [
"google-cloud-bigquery[pandas]",
"google-cloud-bigquery-storage",
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
)
from sqlmesh.core.config.common import EnvironmentSuffixTarget as EnvironmentSuffixTarget
from sqlmesh.core.config.connection import (
AthenaConnectionConfig as AthenaConnectionConfig,
BaseDuckDBConnectionConfig as BaseDuckDBConnectionConfig,
BigQueryConnectionConfig as BigQueryConnectionConfig,
ConnectionConfig as ConnectionConfig,
Expand Down
13 changes: 13 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
concurrent_tasks_validator,
http_headers_validator,
)
from sqlmesh.core.engine_adapter.shared import CatalogSupport
from sqlmesh.core.engine_adapter import EngineAdapter
from sqlmesh.utils.errors import ConfigError
from sqlmesh.utils.pydantic import (
Expand Down Expand Up @@ -1289,6 +1290,18 @@ def _connection_factory(self) -> t.Callable:

return pymssql.connect

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG}


class AzureSQLConnectionConfig(MSSQLConnectionConfig):
type_: t.Literal["azuresql"] = Field(alias="type", default="azuresql") # type: ignore

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY}


class SparkConnectionConfig(ConnectionConfig):
"""
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def __init__(

if (
self.config.environment_catalog_mapping
and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported
and not self.engine_adapter.catalog_support.is_multi_catalog_supported
):
raise SQLMeshError(
"Environment catalog mapping is only supported for engine adapters that support multiple catalogs"
Expand Down
11 changes: 7 additions & 4 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin):
DIALECT = "athena"
SUPPORTS_TRANSACTIONS = False
SUPPORTS_REPLACE_TABLE = False
# Athena has the concept of catalogs but the current catalog is set in the connection parameters with no way to query or change it after that
# It also cant create new catalogs, you have to configure them in AWS. Typically, catalogs that are not "awsdatacatalog"
# are pointers to the "awsdatacatalog" of other AWS accounts
CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY
# Athena's support for table and column comments is too patchy to consider "supported"
# Hive tables: Table + Column comments are supported
# Iceberg tables: Column comments only
Expand Down Expand Up @@ -74,6 +70,13 @@ def s3_warehouse_location_or_raise(self) -> str:

raise SQLMeshError("s3_warehouse_location was expected to be populated; it isnt")

@property
def catalog_support(self) -> CatalogSupport:
# Athena has the concept of catalogs but the current catalog is set in the connection parameters with no way to query or change it after that
# It also cant create new catalogs, you have to configure them in AWS. Typically, catalogs that are not "awsdatacatalog"
# are pointers to the "awsdatacatalog" of other AWS accounts
return CatalogSupport.SINGLE_CATALOG_ONLY

def create_state_table(
self,
table_name: str,
Expand Down
9 changes: 6 additions & 3 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class EngineAdapter:
SUPPORTS_MANAGED_MODELS = False
SCHEMA_DIFFER = SchemaDiffer()
SUPPORTS_TUPLE_IN = True
CATALOG_SUPPORT = CatalogSupport.UNSUPPORTED
HAS_VIEW_BINDING = False
SUPPORTS_REPLACE_TABLE = True
DEFAULT_CATALOG_TYPE = DIALECT
Expand Down Expand Up @@ -165,6 +164,10 @@ def snowpark(self) -> t.Optional[SnowparkSession]:
def comments_enabled(self) -> bool:
return self._register_comments and self.COMMENT_CREATION_TABLE.is_supported

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.UNSUPPORTED

@classmethod
def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[exp.Alias]:
return [
Expand All @@ -174,7 +177,7 @@ def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[

@property
def default_catalog(self) -> t.Optional[str]:
if self.CATALOG_SUPPORT.is_unsupported:
if self.catalog_support.is_unsupported:
return None
default_catalog = self._default_catalog or self.get_current_catalog()
if not default_catalog:
Expand Down Expand Up @@ -293,7 +296,7 @@ def get_catalog_type(self, catalog: t.Optional[str]) -> str:
"""Intended to be overridden for data virtualization systems like Trino that,
depending on the target catalog, require slightly different properties to be set when creating / updating tables
"""
if self.CATALOG_SUPPORT.is_unsupported:
if self.catalog_support.is_unsupported:
raise UnsupportedCatalogOperationError(
f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}"
)
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/engine_adapter/base_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

class BasePostgresEngineAdapter(EngineAdapter):
DEFAULT_BATCH_SIZE = 400
CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY

Expand Down Expand Up @@ -60,6 +59,10 @@ def columns(
for column_name, data_type in resp
}

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.SINGLE_CATALOG_ONLY

def table_exists(self, table_name: TableName) -> bool:
"""
Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/engine_adapter/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, Row
SUPPORTS_TRANSACTIONS = False
SUPPORTS_MATERIALIZED_VIEWS = True
SUPPORTS_CLONING = True
CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT
MAX_TABLE_COMMENT_LENGTH = 1024
MAX_COLUMN_COMMENT_LENGTH = 1024

Expand Down Expand Up @@ -120,6 +119,10 @@ def _job_params(self) -> t.Dict[str, t.Any]:
params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed")
return params

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

def _df_to_source_queries(
self,
df: DF,
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/engine_adapter/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class DatabricksEngineAdapter(SparkEngineAdapter):
exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(10, 0), (0,)],
},
)
CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT

def __init__(self, *args: t.Any, **kwargs: t.Any):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -139,6 +138,10 @@ def spark(self) -> PySparkSession:
self.set_current_catalog(catalog)
return self._spark

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

def _df_to_source_queries(
self,
df: DF,
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/engine_adapter/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, RowDiffMixin):
DIALECT = "duckdb"
SUPPORTS_TRANSACTIONS = False
CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT
SCHEMA_DIFFER = SchemaDiffer(
parameterized_type_defaults={
exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(18, 3), (0,)],
Expand All @@ -44,6 +43,10 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin,
else (CommentCreationTable.COMMENT_COMMAND_ONLY, CommentCreationView.COMMENT_COMMAND_ONLY)
)

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

def set_current_catalog(self, catalog: str) -> None:
"""Sets the catalog name of the current connection."""
self.execute(exp.Use(this=exp.to_identifier(catalog)))
Expand Down
8 changes: 7 additions & 1 deletion sqlmesh/core/engine_adapter/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class MSSQLEngineAdapter(
DIALECT: str = "tsql"
SUPPORTS_TUPLE_IN = False
SUPPORTS_MATERIALIZED_VIEWS = False
CATALOG_SUPPORT = CatalogSupport.REQUIRES_SET_CATALOG
CURRENT_CATALOG_EXPRESSION = exp.func("db_name")
COMMENT_CREATION_TABLE = CommentCreationTable.UNSUPPORTED
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
Expand All @@ -72,6 +71,13 @@ class MSSQLEngineAdapter(
)
VARIABLE_LENGTH_DATA_TYPES = {"binary", "varbinary", "char", "varchar", "nchar", "nvarchar"}

@property
def catalog_support(self) -> CatalogSupport:
# MSSQL and AzureSQL both use this engine adapter, but they differ in catalog support.
# Therefore, we specify the catalog support in the connection config `_extra_engine_config`
# instead of in the adapter itself.
return self._extra_config["catalog_support"]

def columns(
self,
table_name: TableName,
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/engine_adapter/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def internal_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any:
# Need to convert args to list in order to later do assignment to the object
list_args = list(args)
engine_adapter = list_args[0]
catalog_support = override or engine_adapter.CATALOG_SUPPORT
catalog_support = override or engine_adapter.catalog_support
# If there is full catalog support then we have nothing to do
if catalog_support.is_full_support:
return func(*list_args, **kwargs)
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/engine_adapter/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi
SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True
SUPPORTS_CLONING = True
SUPPORTS_MANAGED_MODELS = True
CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT
CURRENT_CATALOG_EXPRESSION = exp.func("current_database")
SCHEMA_DIFFER = SchemaDiffer(
parameterized_type_defaults={
Expand Down Expand Up @@ -109,6 +108,10 @@ def snowpark(self) -> t.Optional[SnowparkSession]:
).getOrCreate()
return None

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

def create_managed_table(
self,
table_name: TableName,
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/engine_adapter/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class SparkEngineAdapter(
DIALECT = "spark"
SUPPORTS_TRANSACTIONS = False
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.INSERT_OVERWRITE
CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT
COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS
# Note: Some formats (like Delta and Iceberg) support REPLACE TABLE but since we don't
Expand Down Expand Up @@ -84,6 +83,10 @@ def _use_spark_session(self) -> bool:
def use_serverless(self) -> bool:
return False

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

@classproperty
def _sqlglot_to_spark_primitive_mapping(self) -> t.Dict[t.Any, t.Any]:
from pyspark.sql import types as spark_types
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/engine_adapter/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class TrinoEngineAdapter(
):
DIALECT = "trino"
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.INTO_IS_OVERWRITE
CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT
# Trino does technically support transactions but it doesn't work correctly with partition overwrite so we
# disable transactions. If we need to get them enabled again then we would need to disable auto commit on the
# connector and then figure out how to get insert/overwrite to work correctly without it.
Expand All @@ -64,6 +63,10 @@ class TrinoEngineAdapter(
# and even if you have a TIMESTAMP(6) the date formatting functions still only support millisecond precision
MAX_TIMESTAMP_PRECISION = 3

@property
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

def set_current_catalog(self, catalog: str) -> None:
"""Sets the catalog name of the current connection."""
self.execute(exp.Use(this=schema_(db="information_schema", catalog=catalog)))
Expand Down
8 changes: 7 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from sqlmesh.core.config import DuckDBConnectionConfig
from sqlmesh.core.context import Context
from sqlmesh.core.engine_adapter import SparkEngineAdapter
from sqlmesh.core.engine_adapter import MSSQLEngineAdapter, SparkEngineAdapter
from sqlmesh.core.engine_adapter.base import EngineAdapter
from sqlmesh.core.environment import EnvironmentNamingInfo
from sqlmesh.core import lineage
Expand All @@ -38,6 +38,7 @@
)
from sqlmesh.utils import random_id
from sqlmesh.utils.date import TimeLike, to_date
from sqlmesh.core.engine_adapter.shared import CatalogSupport

pytest_plugins = ["tests.common_fixtures"]

Expand Down Expand Up @@ -441,6 +442,11 @@ def _make_function(
"sqlmesh.engines.spark.db_api.spark_session.SparkSessionConnection._spark_major_minor",
new_callable=PropertyMock(return_value=(3, 5)),
)
if isinstance(adapter, MSSQLEngineAdapter):
mocker.patch(
"sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.catalog_support",
new_callable=PropertyMock(return_value=CatalogSupport.REQUIRES_SET_CATALOG),
)
return adapter

return _make_function
Expand Down
8 changes: 4 additions & 4 deletions tests/core/engine_adapter/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ def test_connection(ctx: TestContext):

def test_catalog_operations(ctx: TestContext):
if (
ctx.engine_adapter.CATALOG_SUPPORT.is_unsupported
or ctx.engine_adapter.CATALOG_SUPPORT.is_single_catalog_only
ctx.engine_adapter.catalog_support.is_unsupported
or ctx.engine_adapter.catalog_support.is_single_catalog_only
):
pytest.skip(
f"Engine adapter {ctx.engine_adapter.dialect} doesn't support catalog operations"
Expand Down Expand Up @@ -306,7 +306,7 @@ def create_objects_and_validate(schema_name: str):
assert len(results.materialized_views) == 0
assert len(results.non_temp_tables) == 2

if ctx.engine_adapter.CATALOG_SUPPORT.is_unsupported:
if ctx.engine_adapter.catalog_support.is_unsupported:
pytest.skip(
f"Engine adapter {ctx.engine_adapter.dialect} doesn't support catalog operations"
)
Expand All @@ -326,7 +326,7 @@ def create_objects_and_validate(schema_name: str):
ctx.create_catalog(catalog_name)

schema = ctx.schema("drop_schema_catalog_test", catalog_name)
if ctx.engine_adapter.CATALOG_SUPPORT.is_single_catalog_only:
if ctx.engine_adapter.catalog_support.is_single_catalog_only:
drop_schema_and_validate(schema)
assert "requires that all catalog operations be against a single catalog" in caplog.text
return
Expand Down