Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,26 @@ def _get_data_objects(schema: exp.Table, gateway: t.Optional[str] = None) -> t.S

if not snapshots_to_create:
return

self._create_schemas(tables_by_schema, gateway_by_schema)
self._create_snapshots(
snapshots_to_create,
snapshots,
target_deployability_flags,
deployability_index,
on_complete,
allow_destructive_snapshots,
)

def _create_snapshots(
self,
snapshots_to_create: t.Iterable[Snapshot],
snapshots: t.Dict[SnapshotId, Snapshot],
target_deployability_flags: t.Dict[str, t.List[bool]],
deployability_index: t.Optional[DeployabilityIndex],
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
allow_destructive_snapshots: t.Set[str],
) -> None:
"""Internal method to create tables in parrallel."""
with self.concurrent_context():
concurrent_apply_to_snapshots(
snapshots_to_create,
Expand Down Expand Up @@ -365,7 +383,10 @@ def migrate(
concurrent_apply_to_snapshots(
target_snapshots,
lambda s: self._migrate_snapshot(
s, snapshots, allow_destructive_snapshots, self._get_adapter(s.model_gateway)
s,
snapshots,
allow_destructive_snapshots,
self._get_adapter(s.model_gateway),
),
self.ddl_concurrent_tasks,
)
Expand Down Expand Up @@ -446,7 +467,9 @@ def audit(
)
wap_table_name = adapter.wap_table_name(original_table_name, wap_id)
logger.info(
"Auditing WAP table '%s', snapshot %s", wap_table_name, snapshot.snapshot_id
"Auditing WAP table '%s', snapshot %s",
wap_table_name,
snapshot.snapshot_id,
)

table_mapping = kwargs.get("table_mapping") or {}
Expand Down Expand Up @@ -659,7 +682,10 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
# and not SQL expressions.
elif (
adapter.INSERT_OVERWRITE_STRATEGY
in (InsertOverwriteStrategy.INSERT_OVERWRITE, InsertOverwriteStrategy.REPLACE_WHERE)
in (
InsertOverwriteStrategy.INSERT_OVERWRITE,
InsertOverwriteStrategy.REPLACE_WHERE,
)
and snapshot.is_incremental_by_time_range
):
query_or_df = reduce(
Expand Down Expand Up @@ -1712,7 +1738,9 @@ def migrate(
self.adapter.create_view(
target_table_name,
model.render_query_or_raise(
execution_time=now(), snapshots=kwargs["snapshots"], engine_adapter=self.adapter
execution_time=now(),
snapshots=kwargs["snapshots"],
engine_adapter=self.adapter,
),
model.columns_to_types,
materialized=self._is_materialized_view(model),
Expand Down Expand Up @@ -1864,7 +1892,9 @@ def insert(
# Snapshot isnt deployable; update the preview table instead
# If the snapshot was deployable, then data would have already been loaded in create() because a managed table would have been created
logger.info(
"Updating preview table: %s (for managed model: %s)", table_name, model.name
"Updating preview table: %s (for managed model: %s)",
table_name,
model.name,
)
self._replace_query_for_model(model=model, name=table_name, query_or_df=query_or_df)

Expand Down