diff --git a/sqlmesh/cli/example_project.py b/sqlmesh/cli/example_project.py index 86baf2afc2..5754310421 100644 --- a/sqlmesh/cli/example_project.py +++ b/sqlmesh/cli/example_project.py @@ -141,6 +141,7 @@ def init_example_project( return _create_folders([audits_path, macros_path, models_path, tests_path]) + _create_macros(macros_path) _create_audits(audits_path) _create_models(models_path) _create_tests(tests_path) @@ -159,6 +160,10 @@ def _create_config(config_path: Path, template: ProjectTemplate) -> None: ) +def _create_macros(macros_path: Path) -> None: + (macros_path / "__init__.py").touch() + + def _create_audits(audits_path: Path) -> None: _write_file(audits_path / "example_full_model.sql", EXAMPLE_AUDIT) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index e695c336ce..22fc9f7065 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -155,6 +155,15 @@ def create_table( table_name, query_or_columns_to_types, exists, **kwargs ) + def create_snapshots_table( + self, snapshots_table_name: str, snapshots_columns: t.Dict[str, exp.DataType] + ) -> None: + """Create a table to store snapshots.""" + self.create_table( + snapshots_table_name, + snapshots_columns, + ) + def _create_table_from_columns( self, table_name: TableName, diff --git a/sqlmesh/core/engine_adapter/base_databricks.py b/sqlmesh/core/engine_adapter/base_databricks.py new file mode 100644 index 0000000000..ab6a504257 --- /dev/null +++ b/sqlmesh/core/engine_adapter/base_databricks.py @@ -0,0 +1,19 @@ +import typing as t + +from sqlglot import exp + +from sqlmesh.core.engine_adapter.base_spark import BaseSparkEngineAdapter + + +class BaseDatabricks(BaseSparkEngineAdapter): + DIALECT = "databricks" + + def create_snapshots_table( + self, snapshots_table_name: str, snapshots_columns: t.Dict[str, exp.DataType] + ) -> None: + """Create a table to store snapshots.""" + self.create_table( + snapshots_table_name, + snapshots_columns, + partitioned_by=["name", "identifier"], + ) diff --git a/sqlmesh/core/engine_adapter/databricks.py b/sqlmesh/core/engine_adapter/databricks.py index 5d51c91cc5..defdab3f88 100644 --- a/sqlmesh/core/engine_adapter/databricks.py +++ b/sqlmesh/core/engine_adapter/databricks.py @@ -1,7 +1,8 @@ from __future__ import annotations +from sqlmesh.core.engine_adapter.base_databricks import BaseDatabricks from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter -class DatabricksSparkSessionEngineAdapter(SparkEngineAdapter): - DIALECT = "databricks" +class DatabricksSparkSessionEngineAdapter(BaseDatabricks, SparkEngineAdapter): + pass diff --git a/sqlmesh/core/engine_adapter/databricks_api.py b/sqlmesh/core/engine_adapter/databricks_api.py index 862fe05853..6a005fecc6 100644 --- a/sqlmesh/core/engine_adapter/databricks_api.py +++ b/sqlmesh/core/engine_adapter/databricks_api.py @@ -4,15 +4,13 @@ from sqlglot import exp -from sqlmesh.core.engine_adapter.base_spark import BaseSparkEngineAdapter +from sqlmesh.core.engine_adapter.base_databricks import BaseDatabricks if t.TYPE_CHECKING: from sqlmesh.core.engine_adapter._typing import DF -class DatabricksAPIEngineAdapter(BaseSparkEngineAdapter): - DIALECT = "databricks" - +class DatabricksAPIEngineAdapter(BaseDatabricks): def _fetch_native_df(self, query: t.Union[exp.Expression, str]) -> DF: """ Returns a Pandas DataFrame from a query or expression. diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index fe50165c1c..dec2259614 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -425,9 +425,10 @@ def _ensure_no_paused_forward_only_upstream( self, model_name: str, upstream_model_names: t.Iterable[str] ) -> None: for upstream in upstream_model_names: - upstream_snapshot = self.context_diff.snapshots[upstream] + upstream_snapshot = self.context_diff.snapshots.get(upstream) if ( - upstream_snapshot.version + upstream_snapshot + and upstream_snapshot.version and upstream_snapshot.is_forward_only and upstream_snapshot.is_paused ): diff --git a/sqlmesh/core/state_sync/engine_adapter.py b/sqlmesh/core/state_sync/engine_adapter.py index 4be31b805e..896296a33a 100644 --- a/sqlmesh/core/state_sync/engine_adapter.py +++ b/sqlmesh/core/state_sync/engine_adapter.py @@ -85,7 +85,7 @@ def init_schema(self) -> None: """Creates the schema and table to store state.""" self.engine_adapter.create_schema(self.snapshots_table) - self.engine_adapter.create_table( + self.engine_adapter.create_snapshots_table( self.snapshots_table, self.snapshot_columns_to_types ) self.engine_adapter.create_table(