Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/concepts/macros/macro_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Postfixes:
* date - A python date object that converts into a native SQL Date.
* ds - A date string with the format: '%Y-%m-%d'
* ts - An ISO 8601 datetime formatted string: '%Y-%m-%d %H:%M:%S'.
* tstz - An ISO 8601 datetime formatted string with timezone: '%Y-%m-%d %H:%M:%S%z'.
* epoch - An integer representing seconds since Unix epoch.
* millis - An integer representing milliseconds since Unix epoch.

Expand All @@ -77,6 +78,11 @@ All predefined macro variables:
* @end_ts
* @execution_ts

* tstz
* @start_tstz
* @end_tstz
* @execution_tstz

* epoch
* @start_epoch
* @end_epoch
Expand Down
30 changes: 4 additions & 26 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import logging
import sys
import typing as t
from datetime import datetime, timezone
from functools import partial

import pandas as pd
Expand Down Expand Up @@ -42,7 +41,7 @@
from sqlmesh.core.schema_diff import SchemaDiffer
from sqlmesh.utils import columns_to_types_all_known, double_escape, random_id
from sqlmesh.utils.connection_pool import create_connection_pool
from sqlmesh.utils.date import TimeLike, make_inclusive, to_ts
from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column
from sqlmesh.utils.errors import SQLMeshError, UnsupportedCatalogOperationError
from sqlmesh.utils.pandas import columns_to_types_from_df

Expand Down Expand Up @@ -151,25 +150,6 @@ def comments_enabled(self) -> bool:
def is_pandas_df(cls, value: t.Any) -> bool:
return isinstance(value, pd.DataFrame)

@classmethod
def _to_utc_timestamp(
cls, col: t.Union[str, exp.Literal, exp.Column, exp.Null], time_data_type: exp.DataType
) -> exp.Cast:
def ensure_utc_exp(
ts: t.Union[str, exp.Literal, exp.Column, exp.Null]
) -> t.Union[exp.Literal, exp.Column, exp.Null]:
if not isinstance(ts, (str, exp.Literal)):
return ts
if isinstance(ts, exp.Literal):
if not ts.is_string:
raise SQLMeshError("Timestamp literal must be a string")
ts = ts.name
return exp.Literal.string(
datetime.fromisoformat(ts).astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
)

return exp.cast(exp.cast(ensure_utc_exp(col), "TIMESTAMP"), time_data_type)

@classmethod
def _casted_columns(cls, columns_to_types: t.Dict[str, exp.DataType]) -> t.List[exp.Alias]:
return [
Expand Down Expand Up @@ -1305,7 +1285,7 @@ def _scd_type_2(
# column names and then remove them from the unmanaged_columns
if check_columns and check_columns == exp.Star():
check_columns = [exp.column(col) for col in unmanaged_columns]
execution_ts = self._to_utc_timestamp(to_ts(execution_time), time_data_type)
execution_ts = to_time_column(execution_time, time_data_type)
if updated_at_as_valid_from:
if not updated_at_name:
raise SQLMeshError(
Expand All @@ -1315,9 +1295,7 @@ def _scd_type_2(
elif execution_time_as_valid_from:
update_valid_from_start = execution_ts
else:
update_valid_from_start = self._to_utc_timestamp(
"1970-01-01 00:00:00+00:00", time_data_type
)
update_valid_from_start = to_time_column("1970-01-01 00:00:00+00:00", time_data_type)
insert_valid_from_start = execution_ts if check_columns else exp.column(updated_at_name) # type: ignore
if check_columns:
row_check_conditions = []
Expand Down Expand Up @@ -1521,7 +1499,7 @@ def _scd_type_2(
exp.select(
*unmanaged_columns,
insert_valid_from_start.as_(valid_from_name),
self._to_utc_timestamp(exp.null(), time_data_type).as_(valid_to_name),
to_time_column(exp.null(), time_data_type).as_(valid_to_name),
)
.from_("joined")
.where(updated_row_filter),
Expand Down
12 changes: 12 additions & 0 deletions sqlmesh/utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
)
DAY_SHORTCUT_EXPRESSIONS = {"today", "yesterday", "tomorrow"}
TIME_UNITS = {"hours", "minutes", "seconds"}
TEMPORAL_TZ_TYPES = {
exp.DataType.Type.TIMETZ,
exp.DataType.Type.TIMESTAMPTZ,
exp.DataType.Type.TIMESTAMPLTZ,
}


def now(minute_floor: bool = True) -> datetime:
Expand Down Expand Up @@ -228,6 +233,11 @@ def to_ds(obj: TimeLike) -> str:

def to_ts(obj: TimeLike) -> str:
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS formatted string."""
return to_datetime(obj).replace(tzinfo=None).isoformat(sep=" ")


def to_tstz(obj: TimeLike) -> str:
"""Converts a TimeLike object into YYYY-MM-DD HH:MM:SS+00:00 formatted string."""
return to_datetime(obj).isoformat(sep=" ")


Expand Down Expand Up @@ -314,6 +324,8 @@ def to_time_column(
return exp.cast(time_column, to=time_column_type)
if time_column_type.is_type(exp.DataType.Type.DATE):
return exp.cast(exp.Literal.string(to_ds(time_column)), to="date")
if time_column_type.this in TEMPORAL_TZ_TYPES:
return exp.cast(exp.Literal.string(to_tstz(time_column)), to=time_column_type.this)
if time_column_type.this in exp.DataType.TEMPORAL_TYPES:
return exp.cast(exp.Literal.string(to_ts(time_column)), to=time_column_type.this)

Expand Down
28 changes: 14 additions & 14 deletions tests/core/engine_adapter/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1102,14 +1102,14 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable):
ELSE "test_updated_at"
END
WHEN "t_test_valid_from" IS NULL
THEN CAST(CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)
THEN CAST('1970-01-01 00:00:00' AS TIMESTAMP)
ELSE "t_test_valid_from"
END AS "test_valid_from",
CASE
WHEN "test_updated_at" > "t_test_updated_at"
THEN "test_updated_at"
WHEN "joined"."_exists" IS NULL
THEN CAST(CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)
THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP)
ELSE "t_test_valid_to"
END AS "test_valid_to"
FROM "joined"
Expand All @@ -1122,7 +1122,7 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable):
"price",
"test_updated_at",
"test_updated_at" AS "test_valid_from",
CAST(CAST(NULL AS TIMESTAMP) AS TIMESTAMP) AS "test_valid_to"
CAST(NULL AS TIMESTAMP) AS "test_valid_to"
FROM "joined"
WHERE
"test_updated_at" > "t_test_updated_at"
Expand Down Expand Up @@ -1300,14 +1300,14 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable):
ELSE "test_updated_at"
END
WHEN "t_test_valid_from" IS NULL
THEN CAST(CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMPTZ)
THEN CAST('1970-01-01 00:00:00+00:00' AS TIMESTAMPTZ)
ELSE "t_test_valid_from"
END AS "test_valid_from",
CASE
WHEN "test_updated_at" > "t_test_updated_at"
THEN "test_updated_at"
WHEN "joined"."_exists" IS NULL
THEN CAST(CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMPTZ)
THEN CAST('2020-01-01 00:00:00+00:00' AS TIMESTAMPTZ)
ELSE "t_test_valid_to"
END AS "test_valid_to"
FROM "joined"
Expand All @@ -1322,7 +1322,7 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable):
"price",
"test_updated_at",
"test_updated_at" AS "test_valid_from",
CAST(CAST(NULL AS TIMESTAMP) AS TIMESTAMPTZ) AS "test_valid_to"
CAST(NULL AS TIMESTAMPTZ) AS "test_valid_to"
FROM "joined"
WHERE
"test_updated_at" > "t_test_updated_at"
Expand Down Expand Up @@ -1454,7 +1454,7 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
COALESCE("joined"."t_id", "joined"."id") AS "id",
COALESCE("joined"."t_name", "joined"."name") AS "name",
COALESCE("joined"."t_price", "joined"."price") AS "price",
COALESCE("t_test_valid_from", CAST(CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)) AS "test_valid_from",
COALESCE("t_test_valid_from", CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AS "test_valid_from",
CASE
WHEN "joined"."_exists" IS NULL
OR (
Expand All @@ -1478,7 +1478,7 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
)
)
)
THEN CAST(CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)
THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP)
ELSE "t_test_valid_to"
END AS "test_valid_to"
FROM "joined"
Expand All @@ -1489,8 +1489,8 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
"id",
"name",
"price",
CAST(CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP) AS "test_valid_from",
CAST(CAST(NULL AS TIMESTAMP) AS TIMESTAMP) AS "test_valid_to"
CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_valid_from",
CAST(NULL AS TIMESTAMP) AS "test_valid_to"
FROM "joined"
WHERE
(
Expand Down Expand Up @@ -1640,7 +1640,7 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable)
COALESCE("joined"."t_id", "joined"."id") AS "id",
COALESCE("joined"."t_name", "joined"."name") AS "name",
COALESCE("joined"."t_price", "joined"."price") AS "price",
COALESCE("t_test_valid_from", CAST(CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)) AS "test_valid_from",
COALESCE("t_test_valid_from", CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AS "test_valid_from",
CASE
WHEN "joined"."_exists" IS NULL
OR (
Expand Down Expand Up @@ -1671,7 +1671,7 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable)
)
)
)
THEN CAST(CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)
THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP)
ELSE "t_test_valid_to"
END AS "test_valid_to"
FROM "joined"
Expand All @@ -1682,8 +1682,8 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable)
"id",
"name",
"price",
CAST(CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP) AS "test_valid_from",
CAST(CAST(NULL AS TIMESTAMP) AS TIMESTAMP) AS "test_valid_to"
CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_valid_from",
CAST(NULL AS TIMESTAMP) AS "test_valid_to"
FROM "joined"
WHERE
(
Expand Down
99 changes: 97 additions & 2 deletions tests/core/engine_adapter/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pathlib
import sys
import typing as t
from datetime import timedelta
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
Expand All @@ -17,7 +17,7 @@
from sqlmesh.core.dialect import normalize_model_name
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
from sqlmesh.utils import random_id
from sqlmesh.utils.date import now, to_date, to_ds, yesterday
from sqlmesh.utils.date import now, to_date, to_ds, to_time_column, yesterday
from sqlmesh.utils.errors import UnsupportedCatalogOperationError
from sqlmesh.utils.pydantic import PydanticModel
from tests.conftest import SushiDataValidator
Expand Down Expand Up @@ -1898,3 +1898,98 @@ def test_dialects(ctx: TestContext):
pd.testing.assert_frame_equal(
df, pd.DataFrame([[1, 1, 1, 1]], columns=expected_columns), check_dtype=False
)


@pytest.mark.parametrize(
"time_column, time_column_type, time_column_format, result",
[
(
exp.null(),
exp.DataType.build("TIMESTAMP"),
None,
{
"default": None,
"bigquery": pd.NaT,
"databricks": pd.NaT,
"duckdb": pd.NaT,
"motherduck": pd.NaT,
"snowflake": pd.NaT,
"spark": pd.NaT,
},
),
(
"2020-01-01 00:00:00+00:00",
exp.DataType.build("DATE"),
None,
{
"default": datetime(2020, 1, 1).date(),
"duckdb": pd.Timestamp("2020-01-01"),
},
),
(
"2020-01-01 00:00:00+00:00",
exp.DataType.build("TIMESTAMPTZ"),
None,
{
"default": pd.Timestamp("2020-01-01 00:00:00+00:00"),
# https://github.com/pymssql/pymssql/issues/649
"tsql": b"\x00\x00\x00\x00\x00\x00\x00\x005\xab\x00\x00\x00\x00\x07\xe0",
"mysql": pd.Timestamp("2020-01-01 00:00:00"),
"spark": pd.Timestamp("2020-01-01 00:00:00"),
},
),
(
"2020-01-01 00:00:00+00:00",
exp.DataType.build("TIMESTAMP"),
None,
{
"default": pd.Timestamp("2020-01-01 00:00:00"),
# Databricks' timestamp type is tz-aware:
# "Represents values comprising values of fields year, month, day, hour, minute, and second,
# with the session local time-zone.
# The timestamp value represents an absolute point in time."
# https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-type.html
#
# They are adding a non-aware version TIMESTAMP_NTZ that's currently in public preview -
# you have to specify a table option to use it:
# "Feature support is enabled automatically when you create a new Delta table with a column of
# TIMESTAMP_NTZ type. It is not enabled automatically when you add a column of
# TIMESTAMP_NTZ type to an existing table.
# To enable support for TIMESTAMP_NTZ columns, support for the feature must be explicitly enabled for
# the existing table."
# https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html
"databricks": pd.Timestamp("2020-01-01 00:00:00+00:00"),
},
),
(
"2020-01-01 00:00:00+00:00",
exp.DataType.build("TEXT"),
"%Y-%m-%dT%H:%M:%S%z",
{
"default": "2020-01-01T00:00:00+0000",
},
),
(
"2020-01-01 00:00:00+00:00",
exp.DataType.build("INT"),
"%Y%m%d",
{
"default": 20200101,
},
),
],
)
def test_to_time_column(
ctx: TestContext, time_column, time_column_type, time_column_format, result
):
if ctx.test_type != "query":
pytest.skip("Time column tests only need to run for query")

time_column = to_time_column(time_column, time_column_type, time_column_format)
df = ctx.engine_adapter.fetchdf(exp.select(time_column).as_("the_col"))
expected = result.get(ctx.dialect, result.get("default"))
col_name = "THE_COL" if ctx.dialect == "snowflake" else "the_col"
if expected is pd.NaT or expected is None:
assert df[col_name][0] is expected
else:
assert df[col_name][0] == expected
6 changes: 3 additions & 3 deletions tests/core/engine_adapter/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,14 +719,14 @@ def check_table_exists(table_name: exp.Table) -> bool:
ELSE `test_updated_at`
END
WHEN `t_test_valid_from` IS NULL
THEN CAST(CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)
THEN CAST('1970-01-01 00:00:00' AS TIMESTAMP)
ELSE `t_test_valid_from`
END AS `test_valid_from`,
CASE
WHEN `test_updated_at` > `t_test_updated_at`
THEN `test_updated_at`
WHEN `joined`.`_exists` IS NULL
THEN CAST(CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS TIMESTAMP)
THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP)
ELSE `t_test_valid_to`
END AS `test_valid_to`
FROM `joined`
Expand All @@ -739,7 +739,7 @@ def check_table_exists(table_name: exp.Table) -> bool:
`price`,
`test_updated_at`,
`test_updated_at` AS `test_valid_from`,
CAST(CAST(NULL AS TIMESTAMP) AS TIMESTAMP) AS `test_valid_to`
CAST(NULL AS TIMESTAMP) AS `test_valid_to`
FROM `joined`
WHERE
`test_updated_at` > `t_test_updated_at`
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ def test_convert_to_time_column():
)
model = load_sql_based_model(expressions)
assert model.convert_to_time_column("2022-01-01") == d.parse_one(
"CAST('2022-01-01 00:00:00+00:00' AS TIMESTAMP)"
"CAST('2022-01-01 00:00:00' AS TIMESTAMP)"
)


Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ def test_environment_start_as_timestamp(

stored_env = state_sync.get_environment(env.name)
assert stored_env
assert stored_env.start_at == to_datetime(now_ts).isoformat(sep=" ")
assert stored_env.start_at == to_datetime(now_ts).replace(tzinfo=None).isoformat(sep=" ")


def test_unpause_snapshots(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable):
Expand Down
Loading