diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 10796d3f7d..e8e999504e 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -785,17 +785,17 @@ def missing_intervals( interval_unit = self.node.interval_unit - if end_bounded: - upper_bound_ts = min(end_ts, to_timestamp(execution_time)) - end_ts = upper_bound_ts - elif allow_partials: - upper_bound_ts = to_timestamp(execution_time) - end_ts = min(end_ts, upper_bound_ts) - else: + if not allow_partials: upper_bound_ts = to_timestamp( self.node.cron_floor(execution_time) if not ignore_cron else execution_time ) end_ts = min(end_ts, to_timestamp(interval_unit.cron_floor(upper_bound_ts))) + else: + upper_bound_ts = to_timestamp(execution_time) + end_ts = min(end_ts, upper_bound_ts) + + if end_bounded: + upper_bound_ts = end_ts lookback = self.model.lookback if self.is_model else 0 diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index b864e5022b..34a9a2fe0e 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -286,6 +286,66 @@ def test_missing_intervals_partial(make_snapshot): (to_timestamp(start), to_timestamp("2023-01-02")), ] assert snapshot.missing_intervals(start, start, execution_time=start, ignore_cron=True) == [] + assert snapshot.missing_intervals(start, start, execution_time=end_ts, end_bounded=True) == [] + + +def test_missing_intervals_end_bounded_with_lookback(make_snapshot): + snapshot = make_snapshot( + SqlModel( + name="test_model", + kind=IncrementalByTimeRangeKind(time_column=TimeColumn(column="ds"), lookback=1), + owner="owner", + cron="@daily", + query=parse_one("SELECT 1, ds FROM name"), + ) + ) + + start = "2023-01-01" + end = "2023-01-02" + + snapshot.intervals = [(to_timestamp(start), to_timestamp(end))] + + execution_ts = to_timestamp("2023-01-03") + assert snapshot.missing_intervals(start, start, execution_time=execution_ts) == [ + (to_timestamp(start), to_timestamp(end)), + ] + assert ( + snapshot.missing_intervals(start, start, execution_time=execution_ts, end_bounded=True) + == [] + ) + + +def test_missing_intervals_end_bounded_with_ignore_cron(make_snapshot): + snapshot = make_snapshot( + SqlModel( + name="test_model", + kind=IncrementalByTimeRangeKind(time_column=TimeColumn(column="ds")), + owner="owner", + cron="1 0 * * *", + query=parse_one("SELECT 1, ds FROM name"), + ) + ) + + start = "2023-01-01" + end = "2023-01-03" + + snapshot.intervals = [(to_timestamp(start), to_timestamp("2023-01-02"))] + + execution_ts = to_timestamp(end) + assert snapshot.missing_intervals(start, end, execution_time=execution_ts) == [] + assert snapshot.missing_intervals( + start, end, execution_time=execution_ts, ignore_cron=True + ) == [ + (to_timestamp("2023-01-02"), to_timestamp(end)), + ] + assert ( + snapshot.missing_intervals(start, end, execution_time=execution_ts, end_bounded=True) == [] + ) + assert snapshot.missing_intervals( + start, end, execution_time=execution_ts, ignore_cron=True, end_bounded=True + ) == [ + (to_timestamp("2023-01-02"), to_timestamp(end)), + ] def test_incremental_time_self_reference(make_snapshot):