From b57f7cc640bb4bd90c67d58a11bec0c8c2748d75 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 13 Jan 2025 20:01:36 -0800 Subject: [PATCH] Fix: The inference of the default plan start --- sqlmesh/core/context.py | 36 ++++++++++++++++------- tests/core/test_context.py | 2 +- tests/core/test_integration.py | 54 ++++++++++++++++++++++++++++++---- 3 files changed, 76 insertions(+), 16 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index d597e18a26..0f7c218b55 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -41,7 +41,6 @@ import traceback import typing as t import unittest.result -from datetime import date, timedelta from functools import cached_property from io import StringIO from pathlib import Path @@ -112,7 +111,7 @@ from sqlmesh.core.user import User from sqlmesh.utils import UniqueKeyDict, sys_path from sqlmesh.utils.dag import DAG -from sqlmesh.utils.date import TimeLike, now_ds, to_date +from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp from sqlmesh.utils.errors import ( CircuitBreakerError, ConfigError, @@ -1266,6 +1265,10 @@ def plan_builder( or (backfill_models is not None and not backfill_models), ensure_finalized_snapshots=self.config.plan.use_finalized_state, ) + modified_model_names = { + *context_diff.modified_snapshots, + *[s.name for s in context_diff.added], + } if ( is_dev @@ -1275,15 +1278,12 @@ def plan_builder( ): # Only backfill modified and added models. # This ensures that no models outside the impacted sub-DAG(s) will be backfilled unexpectedly. - backfill_models = { - *context_diff.modified_snapshots, - *[s.name for s in context_diff.added], - } or None + backfill_models = modified_model_names or None # If no end date is specified, use the max interval end from prod # to prevent unintended evaluation of the entire DAG. default_end: t.Optional[int] = None - default_start: t.Optional[date] = None + default_start: t.Optional[int] = None max_interval_end_per_model: t.Optional[t.Dict[str, int]] = None if not run and not end: models_for_interval_end: t.Optional[t.Set[str]] = None @@ -1308,9 +1308,25 @@ def plan_builder( ) if max_interval_end_per_model: default_end = max(max_interval_end_per_model.values()) - default_start = to_date(min(max_interval_end_per_model.values())) - timedelta( - days=1 - ) + # Infer the default start by finding the smallest interval start that corresponds to the default end. + for model_name in ( + backfill_models or modified_model_names or max_interval_end_per_model + ): + if model_name not in snapshots: + continue + interval_unit = snapshots[model_name].node.interval_unit + default_start = min( + default_start or sys.maxsize, + to_timestamp( + interval_unit.cron_prev( + interval_unit.cron_floor( + max_interval_end_per_model.get(model_name, default_end), + estimate=True, + ), + estimate=True, + ) + ), + ) return self.PLAN_BUILDER_TYPE( context_diff=context_diff, diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 17702f6181..49f98b704a 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -852,7 +852,7 @@ def test_plan_default_end(sushi_context_pre_scheduling: Context): ).build() assert forward_only_dev_plan.end is not None assert to_date(make_inclusive_end(forward_only_dev_plan.end)) == plan_end - assert forward_only_dev_plan.start == plan_end + assert to_timestamp(forward_only_dev_plan.start) == to_timestamp(plan_end) @pytest.mark.slow diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 9c0929d31c..f7e22fb076 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -100,7 +100,7 @@ def test_forward_only_plan_with_effective_date(context_fixture: Context, request plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category == SnapshotChangeCategory.FORWARD_ONLY ) - assert plan.start == to_date("2023-01-07") + assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ SnapshotIntervals( snapshot_id=top_waiters_snapshot.snapshot_id, @@ -335,7 +335,7 @@ def test_forward_only_model_regular_plan_preview_enabled(init_and_plan_context: plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category == SnapshotChangeCategory.FORWARD_ONLY ) - assert plan.start == to_date("2023-01-07") + assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ SnapshotIntervals( snapshot_id=top_waiters_snapshot.snapshot_id, @@ -472,7 +472,7 @@ def test_full_history_restatement_model_regular_plan_preview_enabled( == SnapshotChangeCategory.FORWARD_ONLY ) - assert plan.start == to_date("2023-01-07") + assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ SnapshotIntervals( snapshot_id=active_customers_snapshot.snapshot_id, @@ -690,6 +690,50 @@ def test_cron_not_aligned_with_day_boundary( ] +@time_machine.travel("2023-01-08 00:00:00 UTC") +def test_forward_only_monthly_model(init_and_plan_context: t.Callable): + context, _ = init_and_plan_context("examples/sushi") + + model = context.get_model("sushi.waiter_revenue_by_day") + model = SqlModel.parse_obj( + { + **model.dict(), + "kind": model.kind.copy(update={"forward_only": True}), + "cron": "0 0 1 * *", + "start": "2022-01-01", + "audits": [], + } + ) + context.upsert_model(model) + + plan = context.plan_builder("prod", skip_tests=True).build() + context.apply(plan) + + waiter_revenue_by_day_snapshot = context.get_snapshot(model.name, raise_if_missing=True) + assert waiter_revenue_by_day_snapshot.intervals == [ + (to_timestamp("2022-01-01"), to_timestamp("2023-01-01")) + ] + + model = add_projection_to_model(t.cast(SqlModel, model), literal=True) + context.upsert_model(model) + + waiter_revenue_by_day_snapshot = context.get_snapshot( + "sushi.waiter_revenue_by_day", raise_if_missing=True + ) + + plan = context.plan_builder( + "dev", select_models=[model.name], skip_tests=True, enable_preview=True + ).build() + assert to_timestamp(plan.start) == to_timestamp("2022-12-01") + assert to_timestamp(plan.end) == to_timestamp("2023-01-08") + assert plan.missing_intervals == [ + SnapshotIntervals( + snapshot_id=waiter_revenue_by_day_snapshot.snapshot_id, + intervals=[(to_timestamp("2022-12-01"), to_timestamp("2023-01-01"))], + ), + ] + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_forward_only_parent_created_in_dev_child_created_in_prod( init_and_plan_context: t.Callable, @@ -915,7 +959,7 @@ def test_non_breaking_change_after_forward_only_in_dev( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category == SnapshotChangeCategory.FORWARD_ONLY ) - assert plan.start == to_date("2023-01-07") + assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ SnapshotIntervals( snapshot_id=top_waiters_snapshot.snapshot_id, @@ -947,7 +991,7 @@ def test_non_breaking_change_after_forward_only_in_dev( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING ) - assert plan.start == to_timestamp("2023-01-01") + assert to_timestamp(plan.start) == to_timestamp("2023-01-01") assert plan.missing_intervals == [ SnapshotIntervals( snapshot_id=top_waiters_snapshot.snapshot_id,