diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index c108656213..0173911d4c 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -15,6 +15,7 @@ import pandas as pd from sqlglot import Dialect, exp, parse_one +from sqlglot.errors import ErrorLevel from sqlmesh.core.dialect import pandas_to_sql from sqlmesh.core.engine_adapter._typing import ( @@ -617,12 +618,20 @@ def rename_table( ) -> None: self.execute(exp.rename_table(old_table_name, new_table_name)) - def fetchone(self, query: t.Union[exp.Expression, str]) -> t.Tuple: - self.execute(query) + def fetchone( + self, + query: t.Union[exp.Expression, str], + ignore_unsupported_errors: bool = False, + ) -> t.Tuple: + self.execute(query, ignore_unsupported_errors=ignore_unsupported_errors) return self.cursor.fetchone() - def fetchall(self, query: t.Union[exp.Expression, str]) -> t.List[t.Tuple]: - self.execute(query) + def fetchall( + self, + query: t.Union[exp.Expression, str], + ignore_unsupported_errors: bool = False, + ) -> t.List[t.Tuple]: + self.execute(query, ignore_unsupported_errors=ignore_unsupported_errors) return self.cursor.fetchall() def _fetch_native_df(self, query: t.Union[exp.Expression, str]) -> DF: @@ -669,9 +678,23 @@ def supports_transactions(self, transaction_type: TransactionType) -> bool: """Whether or not the engine adapter supports transactions for the given transaction type.""" return True - def execute(self, sql: t.Union[str, exp.Expression], **kwargs: t.Any) -> None: + def execute( + self, + sql: t.Union[str, exp.Expression], + ignore_unsupported_errors: bool = False, + **kwargs: t.Any, + ) -> None: """Execute a sql query.""" - sql = self._to_sql(sql) if isinstance(sql, exp.Expression) else sql + to_sql_kwargs = ( + {"unsupported_level": ErrorLevel.IGNORE} + if ignore_unsupported_errors + else {} + ) + sql = ( + self._to_sql(sql, **to_sql_kwargs) + if isinstance(sql, exp.Expression) + else sql + ) logger.debug(f"Executing SQL:\n{sql}") self.cursor.execute(sql, **kwargs) diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 137ad21023..8e65e8f812 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -106,7 +106,12 @@ def _fetch_native_df(self, query: t.Union[exp.Expression, str]) -> DF: self.execute(query) return self.cursor._query_job.to_dataframe() - def execute(self, sql: t.Union[str, exp.Expression], **kwargs: t.Any) -> None: + def execute( + self, + sql: t.Union[str, exp.Expression], + ignore_unsupported_errors: bool = False, + **kwargs: t.Any, + ) -> None: from google.cloud import bigquery # type: ignore create_session = isinstance(sql, exp.Transaction) and self._session_id is None @@ -122,7 +127,11 @@ def execute(self, sql: t.Union[str, exp.Expression], **kwargs: t.Any) -> None: ) ], ) - super().execute(sql, **{**kwargs, "job_config": job_config}) + super().execute( + sql, + ignore_unsupported_errors=ignore_unsupported_errors, + **{**kwargs, "job_config": job_config}, + ) def supports_transactions(self, transaction_type: TransactionType) -> bool: if transaction_type.is_dml: diff --git a/sqlmesh/core/state_sync/engine_adapter.py b/sqlmesh/core/state_sync/engine_adapter.py index 657f5ce31c..539d7b38b7 100644 --- a/sqlmesh/core/state_sync/engine_adapter.py +++ b/sqlmesh/core/state_sync/engine_adapter.py @@ -219,7 +219,9 @@ def get_environments(self) -> t.List[Environment]: """ return [ self._environment_from_row(row) - for row in self.engine_adapter.fetchall(self._environments_query()) + for row in self.engine_adapter.fetchall( + self._environments_query(), ignore_unsupported_errors=True + ) ] def _environment_from_row(self, row: t.Tuple[str, ...]) -> Environment: @@ -268,7 +270,7 @@ def _get_snapshots( snapshots: t.Dict[SnapshotId, Snapshot] = {} duplicates: t.Dict[SnapshotId, Snapshot] = {} - for row in self.engine_adapter.fetchall(query): + for row in self.engine_adapter.fetchall(query, ignore_unsupported_errors=True): snapshot = Snapshot.parse_raw(row[0]) snapshot_id = snapshot.snapshot_id if snapshot_id in snapshots: @@ -313,7 +315,9 @@ def _get_snapshots_with_same_version( if lock_for_update: query = query.lock(copy=False) - snapshot_rows = self.engine_adapter.fetchall(query) + snapshot_rows = self.engine_adapter.fetchall( + query, ignore_unsupported_errors=True + ) return [Snapshot(**json.loads(row[0])) for row in snapshot_rows] def _get_environment( @@ -335,7 +339,8 @@ def _get_environment( expression=exp.Literal.string(environment), ), lock_for_update=lock_for_update, - ) + ), + ignore_unsupported_errors=True, ) if not row: