Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,17 @@ def create(
deployability_index: Determines snapshots that are deployable in the context of this creation.
on_complete: A callback to call on each successfully created snapshot.
"""
snapshots_with_table_names: t.List[t.Tuple[Snapshot, str]] = []
snapshots_with_table_names = defaultdict(set)
tables_by_schema = defaultdict(set)
for snapshot in target_snapshots:
if not snapshot.is_model or snapshot.is_symbolic:
continue
table = exp.to_table(snapshot.table_name(False), dialect=snapshot.model.dialect)
snapshots_with_table_names.append((snapshot, table.name))
tables_by_schema[d.schema_(table.db, catalog=table.catalog)].add(table.name)
for is_deployable in (True, False):
table = exp.to_table(
snapshot.table_name(is_deployable), dialect=snapshot.model.dialect
)
snapshots_with_table_names[snapshot].add(table.name)
tables_by_schema[d.schema_(table.db, catalog=table.catalog)].add(table.name)

existing_objects: t.Set[str] = set()
for schema, object_names in tables_by_schema.items():
Expand All @@ -248,8 +251,8 @@ def create(
existing_objects.update(obj.name for obj in objs)

snapshots_to_create = []
for snapshot, table_name in snapshots_with_table_names:
if table_name not in existing_objects:
for snapshot, table_names in snapshots_with_table_names.items():
if table_names - existing_objects:
snapshots_to_create.append(snapshot)
elif on_complete:
on_complete(snapshot)
Expand Down
69 changes: 66 additions & 3 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,7 @@ def test_evaluate_incremental_unmanaged(
)


def test_create_object_exists(mocker: MockerFixture, adapter_mock, make_snapshot):

def test_create_tables_exists(mocker: MockerFixture, adapter_mock, make_snapshot):
model = load_sql_based_model(
parse( # type: ignore
"""
Expand All @@ -547,13 +546,77 @@ def test_create_object_exists(mocker: MockerFixture, adapter_mock, make_snapshot
schema="sqlmesh__test_schema",
type=DataObjectType.VIEW,
),
DataObject(
name=f"test_schema__test_model__{snapshot.version}",
schema="sqlmesh__test_schema",
type=DataObjectType.VIEW,
),
]
evaluator = SnapshotEvaluator(adapter_mock)

evaluator.create([snapshot], {})
adapter_mock.create_view.assert_not_called()
adapter_mock.get_data_objects.assert_called_once_with(
schema_("sqlmesh__test_schema"), {f"test_schema__test_model__{snapshot.version}__temp"}
schema_("sqlmesh__test_schema"),
{
f"test_schema__test_model__{snapshot.version}__temp",
f"test_schema__test_model__{snapshot.version}",
},
)


def test_create_only_dev_table_exists(mocker: MockerFixture, adapter_mock, make_snapshot):
model = load_sql_based_model(
parse( # type: ignore
"""
MODEL (
name test_schema.test_model,
kind VIEW
);

SELECT a::int FROM tbl;
"""
),
)

snapshot = make_snapshot(model)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

adapter_mock.get_data_objects.return_value = [
DataObject(
name=f"test_schema__test_model__{snapshot.version}__temp",
schema="sqlmesh__test_schema",
type=DataObjectType.VIEW,
),
]
evaluator = SnapshotEvaluator(adapter_mock)

evaluator.create([snapshot], {})

common_kwargs = dict(
materialized=False,
table_properties={},
table_description=None,
)
rendered_query = model.render_query()
adapter_mock.create_view.assert_has_calls(
[
call(
snapshot.table_name(False),
rendered_query,
column_descriptions=None,
**common_kwargs,
),
call(snapshot.table_name(), rendered_query, column_descriptions={}, **common_kwargs),
]
)

adapter_mock.get_data_objects.assert_called_once_with(
schema_("sqlmesh__test_schema"),
{
f"test_schema__test_model__{snapshot.version}__temp",
f"test_schema__test_model__{snapshot.version}",
},
)


Expand Down