diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index e64a3092b9..e643c2e3d5 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -491,10 +491,11 @@ def render_audit_query( pass if self.time_column: - where = self.time_column.column.between( - self.convert_to_time_column(start or c.EPOCH, columns_to_types), - self.convert_to_time_column(end or c.EPOCH, columns_to_types), - ) + low, high = [ + self.convert_to_time_column(dt, columns_to_types) + for dt in make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect) + ] + where = self.time_column.column.between(low, high) else: where = None diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 75aa66c8b8..810e27ee52 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -555,7 +555,7 @@ def audit( if wap_id is not None: logger.info( - "Publishing evalaution results for snapshot %s, WAP ID '%s'", + "Publishing evaluation results for snapshot %s, WAP ID '%s'", snapshot.snapshot_id, wap_id, ) diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 4593e2ca33..c3ddc4618c 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1922,3 +1922,73 @@ def create_log_view(evaluator, view_name): # Validate the schema is retrieved using resolve_template for the environment-specific schema assert log_schema["my_schema"][0] == "db__dev" + + +def test_plan_audit_intervals(tmp_path: pathlib.Path, capsys, caplog): + ctx = Context( + paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + ) + + ctx.upsert_model( + load_sql_based_model( + parse( + """ + MODEL ( + name sqlmesh_audit.date_example, + kind INCREMENTAL_BY_TIME_RANGE( + time_column(date_id, '%Y-%m-%d') + ), + cron '@daily', + partitioned_by (date_id), + audits [unique_combination_of_columns(columns=(date_id))] + ); + + WITH sample_table AS ( + SELECT + DATE('2025-02-01') as date_id, + ) + SELECT date_id FROM sample_table WHERE date_id BETWEEN @start_ds AND @end_ds + """ + ) + ) + ) + + ctx.upsert_model( + load_sql_based_model( + parse( + """ + MODEL ( + name sqlmesh_audit.timestamp_example, + kind INCREMENTAL_BY_TIME_RANGE( + time_column(timestamp_id, '%Y-%m-%d %H:%M:%S') + ), + cron '@daily', + partitioned_by (timestamp_id), + audits [unique_combination_of_columns(columns=(timestamp_id))] + ); + + WITH sample_table AS ( + SELECT + TIMESTAMP('2025-02-01') as timestamp_id, + ) + SELECT timestamp_id FROM sample_table WHERE timestamp_id BETWEEN @start_ts AND @end_ts + """ + ) + ) + ) + + ctx.plan( + environment="dev", auto_apply=True, no_prompts=True, start="2025-02-01", end="2025-02-01" + ) + + # Case 1: The timestamp audit should be in the inclusive range ['2025-02-01 00:00:00', '2025-02-01 23:59:59.999999'] + assert ( + """SELECT COUNT(*) FROM (SELECT ("timestamp_id") AS "timestamp_id" FROM (SELECT * FROM "sqlmesh__sqlmesh_audit"."sqlmesh_audit__timestamp_example__2797548448" AS "sqlmesh_audit__timestamp_example__2797548448" WHERE "timestamp_id" BETWEEN CAST('2025-02-01 00:00:00' AS TIMESTAMP) AND CAST('2025-02-01 23:59:59.999999' AS TIMESTAMP)) AS "_q_0" WHERE TRUE GROUP BY ("timestamp_id") HAVING COUNT(*) > 1) AS "audit\"""" + in caplog.text + ) + + # Case 2: The date audit should be in the inclusive range ['2025-02-01', '2025-02-01'] + assert ( + """SELECT COUNT(*) FROM (SELECT ("date_id") AS "date_id" FROM (SELECT * FROM "sqlmesh__sqlmesh_audit"."sqlmesh_audit__date_example__4100277424" AS "sqlmesh_audit__date_example__4100277424" WHERE "date_id" BETWEEN CAST('2025-02-01' AS DATE) AND CAST('2025-02-01' AS DATE)) AS "_q_0" WHERE TRUE GROUP BY ("date_id") HAVING COUNT(*) > 1) AS "audit\"""" + in caplog.text + )