Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import pandas as pd
from sqlglot import Dialect, exp, parse_one
from sqlglot.errors import ErrorLevel

from sqlmesh.core.dialect import pandas_to_sql
from sqlmesh.core.engine_adapter._typing import (
Expand Down Expand Up @@ -617,12 +618,20 @@ def rename_table(
) -> None:
self.execute(exp.rename_table(old_table_name, new_table_name))

def fetchone(self, query: t.Union[exp.Expression, str]) -> t.Tuple:
self.execute(query)
def fetchone(
self,
query: t.Union[exp.Expression, str],
ignore_unsupported_errors: bool = False,
) -> t.Tuple:
self.execute(query, ignore_unsupported_errors=ignore_unsupported_errors)
return self.cursor.fetchone()

def fetchall(self, query: t.Union[exp.Expression, str]) -> t.List[t.Tuple]:
self.execute(query)
def fetchall(
self,
query: t.Union[exp.Expression, str],
ignore_unsupported_errors: bool = False,
) -> t.List[t.Tuple]:
self.execute(query, ignore_unsupported_errors=ignore_unsupported_errors)
return self.cursor.fetchall()

def _fetch_native_df(self, query: t.Union[exp.Expression, str]) -> DF:
Expand Down Expand Up @@ -669,9 +678,23 @@ def supports_transactions(self, transaction_type: TransactionType) -> bool:
"""Whether or not the engine adapter supports transactions for the given transaction type."""
return True

def execute(self, sql: t.Union[str, exp.Expression], **kwargs: t.Any) -> None:
def execute(
self,
sql: t.Union[str, exp.Expression],
ignore_unsupported_errors: bool = False,
**kwargs: t.Any,
) -> None:
"""Execute a sql query."""
sql = self._to_sql(sql) if isinstance(sql, exp.Expression) else sql
to_sql_kwargs = (
{"unsupported_level": ErrorLevel.IGNORE}
if ignore_unsupported_errors
else {}
)
sql = (
self._to_sql(sql, **to_sql_kwargs)
if isinstance(sql, exp.Expression)
else sql
)
logger.debug(f"Executing SQL:\n{sql}")
self.cursor.execute(sql, **kwargs)

Expand Down
13 changes: 11 additions & 2 deletions sqlmesh/core/engine_adapter/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ def _fetch_native_df(self, query: t.Union[exp.Expression, str]) -> DF:
self.execute(query)
return self.cursor._query_job.to_dataframe()

def execute(self, sql: t.Union[str, exp.Expression], **kwargs: t.Any) -> None:
def execute(
self,
sql: t.Union[str, exp.Expression],
ignore_unsupported_errors: bool = False,
**kwargs: t.Any,
) -> None:
from google.cloud import bigquery # type: ignore

create_session = isinstance(sql, exp.Transaction) and self._session_id is None
Expand All @@ -122,7 +127,11 @@ def execute(self, sql: t.Union[str, exp.Expression], **kwargs: t.Any) -> None:
)
],
)
super().execute(sql, **{**kwargs, "job_config": job_config})
super().execute(
sql,
ignore_unsupported_errors=ignore_unsupported_errors,
**{**kwargs, "job_config": job_config},
)

def supports_transactions(self, transaction_type: TransactionType) -> bool:
if transaction_type.is_dml:
Expand Down
13 changes: 9 additions & 4 deletions sqlmesh/core/state_sync/engine_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ def get_environments(self) -> t.List[Environment]:
"""
return [
self._environment_from_row(row)
for row in self.engine_adapter.fetchall(self._environments_query())
for row in self.engine_adapter.fetchall(
self._environments_query(), ignore_unsupported_errors=True
)
]

def _environment_from_row(self, row: t.Tuple[str, ...]) -> Environment:
Expand Down Expand Up @@ -268,7 +270,7 @@ def _get_snapshots(
snapshots: t.Dict[SnapshotId, Snapshot] = {}
duplicates: t.Dict[SnapshotId, Snapshot] = {}

for row in self.engine_adapter.fetchall(query):
for row in self.engine_adapter.fetchall(query, ignore_unsupported_errors=True):
snapshot = Snapshot.parse_raw(row[0])
snapshot_id = snapshot.snapshot_id
if snapshot_id in snapshots:
Expand Down Expand Up @@ -313,7 +315,9 @@ def _get_snapshots_with_same_version(
if lock_for_update:
query = query.lock(copy=False)

snapshot_rows = self.engine_adapter.fetchall(query)
snapshot_rows = self.engine_adapter.fetchall(
query, ignore_unsupported_errors=True
)
return [Snapshot(**json.loads(row[0])) for row in snapshot_rows]

def _get_environment(
Expand All @@ -335,7 +339,8 @@ def _get_environment(
expression=exp.Literal.string(environment),
),
lock_for_update=lock_for_update,
)
),
ignore_unsupported_errors=True,
)

if not row:
Expand Down