Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ def _scd_type_2(
select_source_columns: t.List[t.Union[str, exp.Alias]] = [
col for col in unmanaged_columns if col != updated_at_name
]
table_columns = [exp.column(c, quoted=True) for c in columns_to_types]
if updated_at_name:
select_source_columns.append(
exp.cast(updated_at_name, time_data_type).as_(updated_at_name)
Expand Down Expand Up @@ -1410,14 +1411,14 @@ def _scd_type_2(
# Historical Records that Do Not Change
.with_(
"static",
self._select_columns(columns_to_types)
exp.select(*table_columns)
.from_(target_table)
.where(f"{valid_to_name} IS NOT NULL"),
)
# Latest Records that can be updated
.with_(
"latest",
self._select_columns(columns_to_types)
exp.select(*table_columns)
.from_(target_table)
.where(f"{valid_to_name} IS NULL"),
)
Expand Down Expand Up @@ -1537,14 +1538,14 @@ def _scd_type_2(
.from_("joined")
.where(updated_row_filter),
)
.select("*")
.select(*table_columns)
.from_("static")
.union(
"SELECT * FROM updated_rows",
exp.select(*table_columns).from_("updated_rows"),
distinct=False,
)
.union(
"SELECT * FROM inserted_rows",
exp.select(*table_columns).from_("inserted_rows"),
distinct=False,
)
)
Expand Down
120 changes: 102 additions & 18 deletions tests/core/engine_adapter/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,15 +1128,30 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable):
"test_updated_at" > "t_test_updated_at"
)
SELECT
*
"id",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "static"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "updated_rows"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "inserted_rows"
"""
).sql()
Expand Down Expand Up @@ -1300,15 +1315,30 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte
"test_updated_at" > "t_test_updated_at"
)
SELECT
*
"id",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "static"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "updated_rows"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "inserted_rows"
"""
).sql()
Expand Down Expand Up @@ -1500,15 +1530,33 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable):
"test_updated_at" > "t_test_updated_at"
)
SELECT
*
"id1",
"id2",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "static"
UNION ALL
SELECT
*
"id1",
"id2",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "updated_rows"
UNION ALL
SELECT
*
"id1",
"id2",
"name",
"price",
"test_updated_at",
"test_valid_from",
"test_valid_to"
FROM "inserted_rows"
"""
).sql()
Expand Down Expand Up @@ -1686,15 +1734,27 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
)
)
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "static"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "updated_rows"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "inserted_rows"
"""
).sql()
Expand Down Expand Up @@ -1886,15 +1946,27 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable)
)
)
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "static"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "updated_rows"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "inserted_rows"
"""
).sql()
Expand Down Expand Up @@ -2071,15 +2143,27 @@ def test_scd_type_2_by_column_no_invalidate_hard_deletes(make_mocked_engine_adap
)
)
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "static"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "updated_rows"
UNION ALL
SELECT
*
"id",
"name",
"price",
"test_valid_from",
"test_valid_to"
FROM "inserted_rows"
"""
).sql()
Expand Down
32 changes: 19 additions & 13 deletions tests/core/engine_adapter/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,19 +759,25 @@ def check_table_exists(table_name: exp.Table) -> bool:
`test_updated_at`,
`test_valid_from`,
`test_valid_to`
FROM (
SELECT
*
FROM `static`
UNION ALL
SELECT
*
FROM `updated_rows`
UNION ALL
SELECT
*
FROM `inserted_rows`
) AS `_subquery`
FROM `static`
UNION ALL
SELECT
`id`,
`name`,
`price`,
`test_updated_at`,
`test_valid_from`,
`test_valid_to`
FROM `updated_rows`
UNION ALL
SELECT
`id`,
`name`,
`price`,
`test_updated_at`,
`test_valid_from`,
`test_valid_to`
FROM `inserted_rows`
""",
dialect="spark",
).sql(dialect="spark"),
Expand Down