From 82c60fa396639131c4be9f8d85a5852f12439be6 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Wed, 11 Dec 2024 16:57:42 -0600 Subject: [PATCH 1/3] Add AzureSQL adapter --- docs/integrations/engines/azuresql.md | 30 +++++++++++++++++++++++++ docs/integrations/overview.md | 1 + mkdocs.yml | 1 + setup.py | 1 + sqlmesh/core/config/__init__.py | 2 ++ sqlmesh/core/config/connection.py | 8 +++++++ sqlmesh/core/engine_adapter/__init__.py | 3 ++- sqlmesh/core/engine_adapter/azuresql.py | 8 +++++++ 8 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 docs/integrations/engines/azuresql.md create mode 100644 sqlmesh/core/engine_adapter/azuresql.py diff --git a/docs/integrations/engines/azuresql.md b/docs/integrations/engines/azuresql.md new file mode 100644 index 0000000000..e9b97abaa1 --- /dev/null +++ b/docs/integrations/engines/azuresql.md @@ -0,0 +1,30 @@ +# Azure SQL + +[Azure SQL](https://azure.microsoft.com/en-us/products/azure-sql) is "a family of managed, secure, and intelligent products that use the SQL Server database engine in the Azure cloud." + +The Azure SQL adapter only supports authentication with a username and password. It does not support authentication with Microsoft Entra or Azure Active Directory. + +## Local/Built-in Scheduler +**Engine Adapter Type**: `azuresql` + +### Installation +``` +pip install "sqlmesh[azuresql]" +``` + +### Connection options + +| Option | Description | Type | Required | +| ----------------- | ---------------------------------------------------------------- | :----------: | :------: | +| `type` | Engine type name - must be `azuresql` | string | Y | +| `host` | The hostname of the Azure SQL server | string | Y | +| `user` | The username to use for authentication with the Azure SQL server | string | N | +| `password` | The password to use for authentication with the Azure SQL server | string | N | +| `port` | The port number of the Azure SQL server | int | N | +| `database` | The target database | string | N | +| `charset` | The character set used for the connection | string | N | +| `timeout` | The query timeout in seconds. Default: no timeout | int | N | +| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | +| `appname` | The application name to use for the connection | string | N | +| `conn_properties` | The list of connection properties | list[string] | N | +| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | \ No newline at end of file diff --git a/docs/integrations/overview.md b/docs/integrations/overview.md index edb018c7df..07a01da56c 100644 --- a/docs/integrations/overview.md +++ b/docs/integrations/overview.md @@ -13,6 +13,7 @@ SQLMesh supports integrations with the following tools: SQLMesh supports the following execution engines for running SQLMesh projects: * [Athena](./engines/athena.md) +* [Azure SQL](./engines/azuresql.md) * [BigQuery](./engines/bigquery.md) * [ClickHouse](./engines/clickhouse.md) * [Databricks](./engines/databricks.md) diff --git a/mkdocs.yml b/mkdocs.yml index 285ff4c625..8da5541250 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -73,6 +73,7 @@ nav: - integrations/github.md - Execution engines: - integrations/engines/athena.md + - integrations/engines/azuresql.md - integrations/engines/bigquery.md - integrations/engines/clickhouse.md - integrations/engines/databricks.md diff --git a/setup.py b/setup.py index cb6db7f560..d45480c13e 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ ], extras_require={ "athena": ["PyAthena[Pandas]"], + "azuresql": ["pymssql"], "bigquery": [ "google-cloud-bigquery[pandas]", "google-cloud-bigquery-storage", diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index 4ce402c71d..ccf38e84fa 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -4,6 +4,8 @@ ) from sqlmesh.core.config.common import EnvironmentSuffixTarget as EnvironmentSuffixTarget from sqlmesh.core.config.connection import ( + AthenaConnectionConfig as AthenaConnectionConfig, + AzureSQLConnectionConfig as AzureSQLConnectionConfig, BaseDuckDBConnectionConfig as BaseDuckDBConnectionConfig, BigQueryConnectionConfig as BigQueryConnectionConfig, ConnectionConfig as ConnectionConfig, diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 8e30e58d42..7e546c7e3f 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1290,6 +1290,14 @@ def _connection_factory(self) -> t.Callable: return pymssql.connect +class AzureSQLConnectionConfig(MSSQLConnectionConfig): + type_: t.Literal["azuresql"] = Field(alias="type", default="azuresql") # type: ignore + + @property + def _engine_adapter(self) -> t.Type[EngineAdapter]: + return engine_adapter.AzureSQLEngineAdapter + + class SparkConnectionConfig(ConnectionConfig): """ Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks. diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index 25c45d2e19..c376325658 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -6,6 +6,8 @@ EngineAdapter, EngineAdapterWithIndexSupport, ) +from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter +from sqlmesh.core.engine_adapter.azuresql import AzureSQLEngineAdapter # noqa: F401 from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter @@ -17,7 +19,6 @@ from sqlmesh.core.engine_adapter.snowflake import SnowflakeEngineAdapter from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter -from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter DIALECT_TO_ENGINE_ADAPTER = { "hive": SparkEngineAdapter, diff --git a/sqlmesh/core/engine_adapter/azuresql.py b/sqlmesh/core/engine_adapter/azuresql.py new file mode 100644 index 0000000000..0cfd4438b2 --- /dev/null +++ b/sqlmesh/core/engine_adapter/azuresql.py @@ -0,0 +1,8 @@ +"""Contains AzureSQLEngineAdapter.""" + +from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter +from sqlmesh.core.engine_adapter.shared import CatalogSupport + + +class AzureSQLEngineAdapter(MSSQLEngineAdapter): + CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY From 79f01e294b06afe840d4fb2d1fe59bb241ada98c Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 12 Dec 2024 10:32:21 -0600 Subject: [PATCH 2/3] Remove azure adapter, make catalog support a property --- .circleci/continue_config.yml | 20 +++++++++---------- setup.py | 1 - sqlmesh/core/config/__init__.py | 1 - sqlmesh/core/config/connection.py | 9 +++++++-- sqlmesh/core/context.py | 2 +- sqlmesh/core/engine_adapter/__init__.py | 3 +-- sqlmesh/core/engine_adapter/athena.py | 11 ++++++---- sqlmesh/core/engine_adapter/azuresql.py | 8 -------- sqlmesh/core/engine_adapter/base.py | 9 ++++++--- sqlmesh/core/engine_adapter/base_postgres.py | 5 ++++- sqlmesh/core/engine_adapter/bigquery.py | 5 ++++- sqlmesh/core/engine_adapter/databricks.py | 5 ++++- sqlmesh/core/engine_adapter/duckdb.py | 5 ++++- sqlmesh/core/engine_adapter/mssql.py | 8 +++++++- sqlmesh/core/engine_adapter/shared.py | 2 +- sqlmesh/core/engine_adapter/snowflake.py | 5 ++++- sqlmesh/core/engine_adapter/spark.py | 5 ++++- sqlmesh/core/engine_adapter/trino.py | 5 ++++- tests/conftest.py | 8 +++++++- .../integration/test_integration.py | 8 ++++---- 20 files changed, 79 insertions(+), 46 deletions(-) delete mode 100644 sqlmesh/core/engine_adapter/azuresql.py diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index a23dc3dd53..7da9560ddf 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -313,10 +313,10 @@ workflows: - airflow_docker_tests: requires: - style_and_slow_tests - filters: - branches: - only: - - main + # filters: + # branches: + # only: + # - main - engine_tests_docker: name: engine_<< matrix.engine >> matrix: @@ -334,8 +334,8 @@ workflows: name: cloud_engine_<< matrix.engine >> context: - sqlmesh_cloud_database_integration - requires: - - engine_tests_docker + # requires: + # - engine_tests_docker matrix: parameters: engine: @@ -345,10 +345,10 @@ workflows: - bigquery - clickhouse-cloud - athena - filters: - branches: - only: - - main + # filters: + # branches: + # only: + # - main - trigger_private_tests: requires: - style_and_slow_tests diff --git a/setup.py b/setup.py index d45480c13e..cb6db7f560 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,6 @@ ], extras_require={ "athena": ["PyAthena[Pandas]"], - "azuresql": ["pymssql"], "bigquery": [ "google-cloud-bigquery[pandas]", "google-cloud-bigquery-storage", diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index ccf38e84fa..b72a271f9d 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -5,7 +5,6 @@ from sqlmesh.core.config.common import EnvironmentSuffixTarget as EnvironmentSuffixTarget from sqlmesh.core.config.connection import ( AthenaConnectionConfig as AthenaConnectionConfig, - AzureSQLConnectionConfig as AzureSQLConnectionConfig, BaseDuckDBConnectionConfig as BaseDuckDBConnectionConfig, BigQueryConnectionConfig as BigQueryConnectionConfig, ConnectionConfig as ConnectionConfig, diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 7e546c7e3f..39515cc553 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -20,6 +20,7 @@ concurrent_tasks_validator, http_headers_validator, ) +from sqlmesh.core.engine_adapter.shared import CatalogSupport from sqlmesh.core.engine_adapter import EngineAdapter from sqlmesh.utils.errors import ConfigError from sqlmesh.utils.pydantic import ( @@ -1289,13 +1290,17 @@ def _connection_factory(self) -> t.Callable: return pymssql.connect + @property + def _extra_engine_config(self) -> t.Dict[str, t.Any]: + return {"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG} + class AzureSQLConnectionConfig(MSSQLConnectionConfig): type_: t.Literal["azuresql"] = Field(alias="type", default="azuresql") # type: ignore @property - def _engine_adapter(self) -> t.Type[EngineAdapter]: - return engine_adapter.AzureSQLEngineAdapter + def _extra_engine_config(self) -> t.Dict[str, t.Any]: + return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY} class SparkConnectionConfig(ConnectionConfig): diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 521f397e6d..85fb54db29 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -396,7 +396,7 @@ def __init__( if ( self.config.environment_catalog_mapping - and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported + and not self.engine_adapter.catalog_support.is_multi_catalog_supported ): raise SQLMeshError( "Environment catalog mapping is only supported for engine adapters that support multiple catalogs" diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index c376325658..25c45d2e19 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -6,8 +6,6 @@ EngineAdapter, EngineAdapterWithIndexSupport, ) -from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter -from sqlmesh.core.engine_adapter.azuresql import AzureSQLEngineAdapter # noqa: F401 from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter @@ -19,6 +17,7 @@ from sqlmesh.core.engine_adapter.snowflake import SnowflakeEngineAdapter from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter +from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter DIALECT_TO_ENGINE_ADAPTER = { "hive": SparkEngineAdapter, diff --git a/sqlmesh/core/engine_adapter/athena.py b/sqlmesh/core/engine_adapter/athena.py index f257b6a5d9..5dc67d5b16 100644 --- a/sqlmesh/core/engine_adapter/athena.py +++ b/sqlmesh/core/engine_adapter/athena.py @@ -33,10 +33,6 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin): DIALECT = "athena" SUPPORTS_TRANSACTIONS = False SUPPORTS_REPLACE_TABLE = False - # Athena has the concept of catalogs but the current catalog is set in the connection parameters with no way to query or change it after that - # It also cant create new catalogs, you have to configure them in AWS. Typically, catalogs that are not "awsdatacatalog" - # are pointers to the "awsdatacatalog" of other AWS accounts - CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY # Athena's support for table and column comments is too patchy to consider "supported" # Hive tables: Table + Column comments are supported # Iceberg tables: Column comments only @@ -74,6 +70,13 @@ def s3_warehouse_location_or_raise(self) -> str: raise SQLMeshError("s3_warehouse_location was expected to be populated; it isnt") + @property + def catalog_support(self) -> CatalogSupport: + # Athena has the concept of catalogs but the current catalog is set in the connection parameters with no way to query or change it after that + # It also cant create new catalogs, you have to configure them in AWS. Typically, catalogs that are not "awsdatacatalog" + # are pointers to the "awsdatacatalog" of other AWS accounts + return CatalogSupport.SINGLE_CATALOG_ONLY + def create_state_table( self, table_name: str, diff --git a/sqlmesh/core/engine_adapter/azuresql.py b/sqlmesh/core/engine_adapter/azuresql.py deleted file mode 100644 index 0cfd4438b2..0000000000 --- a/sqlmesh/core/engine_adapter/azuresql.py +++ /dev/null @@ -1,8 +0,0 @@ -"""Contains AzureSQLEngineAdapter.""" - -from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter -from sqlmesh.core.engine_adapter.shared import CatalogSupport - - -class AzureSQLEngineAdapter(MSSQLEngineAdapter): - CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index c9bdefbf52..b03999dc44 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -97,7 +97,6 @@ class EngineAdapter: SUPPORTS_MANAGED_MODELS = False SCHEMA_DIFFER = SchemaDiffer() SUPPORTS_TUPLE_IN = True - CATALOG_SUPPORT = CatalogSupport.UNSUPPORTED HAS_VIEW_BINDING = False SUPPORTS_REPLACE_TABLE = True DEFAULT_CATALOG_TYPE = DIALECT @@ -165,6 +164,10 @@ def snowpark(self) -> t.Optional[SnowparkSession]: def comments_enabled(self) -> bool: return self._register_comments and self.COMMENT_CREATION_TABLE.is_supported + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.UNSUPPORTED + @classmethod def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[exp.Alias]: return [ @@ -174,7 +177,7 @@ def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[ @property def default_catalog(self) -> t.Optional[str]: - if self.CATALOG_SUPPORT.is_unsupported: + if self.catalog_support.is_unsupported: return None default_catalog = self._default_catalog or self.get_current_catalog() if not default_catalog: @@ -293,7 +296,7 @@ def get_catalog_type(self, catalog: t.Optional[str]) -> str: """Intended to be overridden for data virtualization systems like Trino that, depending on the target catalog, require slightly different properties to be set when creating / updating tables """ - if self.CATALOG_SUPPORT.is_unsupported: + if self.catalog_support.is_unsupported: raise UnsupportedCatalogOperationError( f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}" ) diff --git a/sqlmesh/core/engine_adapter/base_postgres.py b/sqlmesh/core/engine_adapter/base_postgres.py index 51840d755e..c1026c91df 100644 --- a/sqlmesh/core/engine_adapter/base_postgres.py +++ b/sqlmesh/core/engine_adapter/base_postgres.py @@ -22,7 +22,6 @@ class BasePostgresEngineAdapter(EngineAdapter): DEFAULT_BATCH_SIZE = 400 - CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY @@ -60,6 +59,10 @@ def columns( for column_name, data_type in resp } + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.SINGLE_CATALOG_ONLY + def table_exists(self, table_name: TableName) -> bool: """ Postgres doesn't support describe so I'm using what the redshift cursor does to check if a table diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index a599f030f4..98256c0b2c 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -59,7 +59,6 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, Row SUPPORTS_TRANSACTIONS = False SUPPORTS_MATERIALIZED_VIEWS = True SUPPORTS_CLONING = True - CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT MAX_TABLE_COMMENT_LENGTH = 1024 MAX_COLUMN_COMMENT_LENGTH = 1024 @@ -120,6 +119,10 @@ def _job_params(self) -> t.Dict[str, t.Any]: params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed") return params + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.FULL_SUPPORT + def _df_to_source_queries( self, df: DF, diff --git a/sqlmesh/core/engine_adapter/databricks.py b/sqlmesh/core/engine_adapter/databricks.py index 3f1df68dc2..184e1e3194 100644 --- a/sqlmesh/core/engine_adapter/databricks.py +++ b/sqlmesh/core/engine_adapter/databricks.py @@ -46,7 +46,6 @@ class DatabricksEngineAdapter(SparkEngineAdapter): exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(10, 0), (0,)], }, ) - CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT def __init__(self, *args: t.Any, **kwargs: t.Any): super().__init__(*args, **kwargs) @@ -139,6 +138,10 @@ def spark(self) -> PySparkSession: self.set_current_catalog(catalog) return self._spark + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.FULL_SUPPORT + def _df_to_source_queries( self, df: DF, diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index b81d463f85..003c873b6d 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -30,7 +30,6 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, RowDiffMixin): DIALECT = "duckdb" SUPPORTS_TRANSACTIONS = False - CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT SCHEMA_DIFFER = SchemaDiffer( parameterized_type_defaults={ exp.DataType.build("DECIMAL", dialect=DIALECT).this: [(18, 3), (0,)], @@ -44,6 +43,10 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, else (CommentCreationTable.COMMENT_COMMAND_ONLY, CommentCreationView.COMMENT_COMMAND_ONLY) ) + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.FULL_SUPPORT + def set_current_catalog(self, catalog: str) -> None: """Sets the catalog name of the current connection.""" self.execute(exp.Use(this=exp.to_identifier(catalog))) diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 51ca27888c..f09e5b8a44 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -46,7 +46,6 @@ class MSSQLEngineAdapter( DIALECT: str = "tsql" SUPPORTS_TUPLE_IN = False SUPPORTS_MATERIALIZED_VIEWS = False - CATALOG_SUPPORT = CatalogSupport.REQUIRES_SET_CATALOG CURRENT_CATALOG_EXPRESSION = exp.func("db_name") COMMENT_CREATION_TABLE = CommentCreationTable.UNSUPPORTED COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED @@ -72,6 +71,13 @@ class MSSQLEngineAdapter( ) VARIABLE_LENGTH_DATA_TYPES = {"binary", "varbinary", "char", "varchar", "nchar", "nvarchar"} + @property + def catalog_support(self) -> CatalogSupport: + # MSSQL and AzureSQL both use this engine adapter, but they differ in catalog support. + # Therefore, we specify the catalog support in the connection config `_extra_engine_config` + # instead of in the adapter itself. + return self._extra_config["catalog_support"] + def columns( self, table_name: TableName, diff --git a/sqlmesh/core/engine_adapter/shared.py b/sqlmesh/core/engine_adapter/shared.py index 0dada4162f..fb860754fb 100644 --- a/sqlmesh/core/engine_adapter/shared.py +++ b/sqlmesh/core/engine_adapter/shared.py @@ -296,7 +296,7 @@ def internal_wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: # Need to convert args to list in order to later do assignment to the object list_args = list(args) engine_adapter = list_args[0] - catalog_support = override or engine_adapter.CATALOG_SUPPORT + catalog_support = override or engine_adapter.catalog_support # If there is full catalog support then we have nothing to do if catalog_support.is_full_support: return func(*list_args, **kwargs) diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 5494861e2a..a09aa54ae2 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -50,7 +50,6 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True SUPPORTS_CLONING = True SUPPORTS_MANAGED_MODELS = True - CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT CURRENT_CATALOG_EXPRESSION = exp.func("current_database") SCHEMA_DIFFER = SchemaDiffer( parameterized_type_defaults={ @@ -109,6 +108,10 @@ def snowpark(self) -> t.Optional[SnowparkSession]: ).getOrCreate() return None + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.FULL_SUPPORT + def create_managed_table( self, table_name: TableName, diff --git a/sqlmesh/core/engine_adapter/spark.py b/sqlmesh/core/engine_adapter/spark.py index a6aa046848..da55d21d19 100644 --- a/sqlmesh/core/engine_adapter/spark.py +++ b/sqlmesh/core/engine_adapter/spark.py @@ -51,7 +51,6 @@ class SparkEngineAdapter( DIALECT = "spark" SUPPORTS_TRANSACTIONS = False INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.INSERT_OVERWRITE - CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS # Note: Some formats (like Delta and Iceberg) support REPLACE TABLE but since we don't @@ -84,6 +83,10 @@ def _use_spark_session(self) -> bool: def use_serverless(self) -> bool: return False + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.FULL_SUPPORT + @classproperty def _sqlglot_to_spark_primitive_mapping(self) -> t.Dict[t.Any, t.Any]: from pyspark.sql import types as spark_types diff --git a/sqlmesh/core/engine_adapter/trino.py b/sqlmesh/core/engine_adapter/trino.py index 45286d1701..48bc6ac446 100644 --- a/sqlmesh/core/engine_adapter/trino.py +++ b/sqlmesh/core/engine_adapter/trino.py @@ -41,7 +41,6 @@ class TrinoEngineAdapter( ): DIALECT = "trino" INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.INTO_IS_OVERWRITE - CATALOG_SUPPORT = CatalogSupport.FULL_SUPPORT # Trino does technically support transactions but it doesn't work correctly with partition overwrite so we # disable transactions. If we need to get them enabled again then we would need to disable auto commit on the # connector and then figure out how to get insert/overwrite to work correctly without it. @@ -64,6 +63,10 @@ class TrinoEngineAdapter( # and even if you have a TIMESTAMP(6) the date formatting functions still only support millisecond precision MAX_TIMESTAMP_PRECISION = 3 + @property + def catalog_support(self) -> CatalogSupport: + return CatalogSupport.FULL_SUPPORT + def set_current_catalog(self, catalog: str) -> None: """Sets the catalog name of the current connection.""" self.execute(exp.Use(this=schema_(db="information_schema", catalog=catalog))) diff --git a/tests/conftest.py b/tests/conftest.py index c41a616600..d5b370d3e6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,7 @@ from sqlmesh.core.config import DuckDBConnectionConfig from sqlmesh.core.context import Context -from sqlmesh.core.engine_adapter import SparkEngineAdapter +from sqlmesh.core.engine_adapter import MSSQLEngineAdapter, SparkEngineAdapter from sqlmesh.core.engine_adapter.base import EngineAdapter from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core import lineage @@ -38,6 +38,7 @@ ) from sqlmesh.utils import random_id from sqlmesh.utils.date import TimeLike, to_date +from sqlmesh.core.engine_adapter.shared import CatalogSupport pytest_plugins = ["tests.common_fixtures"] @@ -441,6 +442,11 @@ def _make_function( "sqlmesh.engines.spark.db_api.spark_session.SparkSessionConnection._spark_major_minor", new_callable=PropertyMock(return_value=(3, 5)), ) + if isinstance(adapter, MSSQLEngineAdapter): + mocker.patch( + "sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.catalog_support", + new_callable=PropertyMock(return_value=CatalogSupport.REQUIRES_SET_CATALOG), + ) return adapter return _make_function diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index bc0d32a2e4..1308e4a790 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -249,8 +249,8 @@ def test_connection(ctx: TestContext): def test_catalog_operations(ctx: TestContext): if ( - ctx.engine_adapter.CATALOG_SUPPORT.is_unsupported - or ctx.engine_adapter.CATALOG_SUPPORT.is_single_catalog_only + ctx.engine_adapter.catalog_support.is_unsupported + or ctx.engine_adapter.catalog_support.is_single_catalog_only ): pytest.skip( f"Engine adapter {ctx.engine_adapter.dialect} doesn't support catalog operations" @@ -306,7 +306,7 @@ def create_objects_and_validate(schema_name: str): assert len(results.materialized_views) == 0 assert len(results.non_temp_tables) == 2 - if ctx.engine_adapter.CATALOG_SUPPORT.is_unsupported: + if ctx.engine_adapter.catalog_support.is_unsupported: pytest.skip( f"Engine adapter {ctx.engine_adapter.dialect} doesn't support catalog operations" ) @@ -326,7 +326,7 @@ def create_objects_and_validate(schema_name: str): ctx.create_catalog(catalog_name) schema = ctx.schema("drop_schema_catalog_test", catalog_name) - if ctx.engine_adapter.CATALOG_SUPPORT.is_single_catalog_only: + if ctx.engine_adapter.catalog_support.is_single_catalog_only: drop_schema_and_validate(schema) assert "requires that all catalog operations be against a single catalog" in caplog.text return From 4d808027257de81e7362db33c1a6b54f2824b2b6 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 12 Dec 2024 10:49:41 -0600 Subject: [PATCH 3/3] Add azuresql extra --- .circleci/continue_config.yml | 20 ++++++++++---------- setup.py | 1 + 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 7da9560ddf..a23dc3dd53 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -313,10 +313,10 @@ workflows: - airflow_docker_tests: requires: - style_and_slow_tests - # filters: - # branches: - # only: - # - main + filters: + branches: + only: + - main - engine_tests_docker: name: engine_<< matrix.engine >> matrix: @@ -334,8 +334,8 @@ workflows: name: cloud_engine_<< matrix.engine >> context: - sqlmesh_cloud_database_integration - # requires: - # - engine_tests_docker + requires: + - engine_tests_docker matrix: parameters: engine: @@ -345,10 +345,10 @@ workflows: - bigquery - clickhouse-cloud - athena - # filters: - # branches: - # only: - # - main + filters: + branches: + only: + - main - trigger_private_tests: requires: - style_and_slow_tests diff --git a/setup.py b/setup.py index cb6db7f560..d45480c13e 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ ], extras_require={ "athena": ["PyAthena[Pandas]"], + "azuresql": ["pymssql"], "bigquery": [ "google-cloud-bigquery[pandas]", "google-cloud-bigquery-storage",