Skip to content

Commit 2d10923

Browse files
authored
Fix dates (#623)
* fix dates * remove leftover code * fixes * fix quotes * fixes * cache interval unit
1 parent ad006c8 commit 2d10923

File tree

5 files changed

+70
-32
lines changed

5 files changed

+70
-32
lines changed

sqlmesh/core/model/meta.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class ModelMeta(PydanticModel):
6161
audits: t.List[AuditReference] = []
6262

6363
_croniter: t.Optional[croniter] = None
64+
_interval_unit: t.Optional[IntervalUnit] = None
6465

6566
_model_kind_validator = model_kind_validator
6667

@@ -271,14 +272,17 @@ def interval_unit(self, sample_size: int = 10) -> IntervalUnit:
271272
Returns:
272273
The IntervalUnit enum.
273274
"""
274-
schedule = croniter(self.cron)
275-
samples = [schedule.get_next() for _ in range(sample_size)]
276-
min_interval = min(b - a for a, b in zip(samples, samples[1:]))
277-
if min_interval >= 86400:
278-
return IntervalUnit.DAY
279-
elif min_interval >= 3600:
280-
return IntervalUnit.HOUR
281-
return IntervalUnit.MINUTE
275+
if not self._interval_unit:
276+
schedule = croniter(self.cron)
277+
samples = [schedule.get_next() for _ in range(sample_size)]
278+
min_interval = min(b - a for a, b in zip(samples, samples[1:]))
279+
if min_interval >= 86400:
280+
self._interval_unit = IntervalUnit.DAY
281+
elif min_interval >= 3600:
282+
self._interval_unit = IntervalUnit.HOUR
283+
else:
284+
self._interval_unit = IntervalUnit.MINUTE
285+
return self._interval_unit
282286

283287
def normalized_cron(self) -> str:
284288
"""Returns the UTC normalized cron based on sampling heuristics.

sqlmesh/core/plan/definition.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from sqlmesh.core.config import CategorizerConfig
99
from sqlmesh.core.context_diff import ContextDiff
1010
from sqlmesh.core.environment import Environment
11+
from sqlmesh.core.model.meta import IntervalUnit
1112
from sqlmesh.core.snapshot import (
1213
Intervals,
1314
Snapshot,
@@ -23,6 +24,7 @@
2324
TimeLike,
2425
make_inclusive,
2526
now,
27+
to_date,
2628
to_ds,
2729
to_timestamp,
2830
validate_date_range,
@@ -139,6 +141,8 @@ def uncategorized(self) -> t.List[Snapshot]:
139141
@property
140142
def start(self) -> TimeLike:
141143
"""Returns the start of the plan or the earliest date of all snapshots."""
144+
if not self.override_start and not self._missing_intervals:
145+
return scheduler.earliest_start_date(self.snapshots)
142146
return self._start or (
143147
min(
144148
start
@@ -153,6 +157,7 @@ def start(self) -> TimeLike:
153157
def start(self, new_start: TimeLike) -> None:
154158
self._ensure_valid_date_range(new_start, self._end)
155159
self.set_start(new_start)
160+
self.override_start = True
156161

157162
def set_start(self, new_start: TimeLike) -> None:
158163
self._start = new_start
@@ -162,14 +167,20 @@ def set_start(self, new_start: TimeLike) -> None:
162167
def end(self) -> TimeLike:
163168
"""Returns the end of the plan or now."""
164169
if not self._end or not self.override_end:
165-
if self.missing_intervals:
166-
return max(
167-
end
168-
for intervals_per_model in self._missing_intervals.values()
169-
for _, end in intervals_per_model
170+
if self._missing_intervals:
171+
end, interval_unit = max(
172+
[
173+
(end, snapshot.model.interval_unit())
174+
for snapshot in self.snapshots
175+
if snapshot.version_get_or_generate() in self._missing_intervals
176+
for _, end in self._missing_intervals[snapshot.version_get_or_generate()]
177+
]
170178
)
179+
if interval_unit == IntervalUnit.DAY:
180+
return to_date(make_inclusive(self.start, end)[1])
181+
return end
171182
else:
172-
return self._end or now()
183+
return scheduler.latest_end_date(self.snapshots)
173184
return self._end
174185

175186
@end.setter

sqlmesh/core/scheduler.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import logging
44
import typing as t
5-
from datetime import datetime
5+
from datetime import date, datetime
66

77
from sqlmesh.core.console import Console, get_console
8+
from sqlmesh.core.model.meta import IntervalUnit
89
from sqlmesh.core.snapshot import (
910
Snapshot,
1011
SnapshotEvaluator,
@@ -17,7 +18,9 @@
1718
from sqlmesh.utils.dag import DAG
1819
from sqlmesh.utils.date import (
1920
TimeLike,
21+
make_inclusive,
2022
now,
23+
to_date,
2124
to_datetime,
2225
validate_date_range,
2326
yesterday,
@@ -370,13 +373,35 @@ def earliest_start_date(snapshots: t.Iterable[Snapshot]) -> datetime:
370373
Args:
371374
snapshots: Snapshots to find earliest start date.
372375
Returns:
373-
The earliest start date or yesterday if none is found."""
376+
The earliest start date or yesterday if none is found.
377+
"""
374378
snapshots = list(snapshots)
375379
if snapshots:
376380
return min(start_date(snapshot, snapshots) or yesterday() for snapshot in snapshots)
377381
return yesterday()
378382

379383

384+
def latest_end_date(snapshots: t.Iterable[Snapshot]) -> datetime | date:
385+
"""Get the latest end date from a collection of snapshots.
386+
387+
Args:
388+
snapshots: Snapshots to find latest end date.
389+
Returns:
390+
The latest end date or now if none is found.
391+
"""
392+
end_date_and_intervals = [
393+
(snapshot.intervals[-1][1], snapshot.model.interval_unit())
394+
for snapshot in snapshots
395+
if snapshot.intervals
396+
]
397+
if end_date_and_intervals:
398+
end_date, interval = max(end_date_and_intervals)
399+
if interval == IntervalUnit.DAY:
400+
return to_date(make_inclusive(end_date, end_date)[1])
401+
return to_datetime(end_date)
402+
return now()
403+
404+
380405
def _batched_intervals(params: SnapshotToBatches) -> SnapshotToBatches:
381406
batches = {}
382407

tests/core/test_plan.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from datetime import timedelta
2+
13
import pytest
24
from pytest_mock.plugin import MockerFixture
35
from sqlglot import parse_one
@@ -12,7 +14,7 @@
1214
SnapshotFingerprint,
1315
)
1416
from sqlmesh.utils.dag import DAG
15-
from sqlmesh.utils.date import to_datetime, to_timestamp
17+
from sqlmesh.utils.date import to_date, to_datetime, to_timestamp
1618
from sqlmesh.utils.errors import PlanError
1719

1820

@@ -60,7 +62,7 @@ def test_forward_only_dev(make_snapshot, mocker: MockerFixture):
6062
)
6163
)
6264

63-
expected_start = "2022-01-01"
65+
expected_start = to_datetime("2022-01-01")
6466
expected_end = to_datetime("2022-01-02")
6567

6668
dag = DAG[str]({"a": set()})
@@ -73,10 +75,10 @@ def test_forward_only_dev(make_snapshot, mocker: MockerFixture):
7375

7476
state_reader_mock = mocker.Mock()
7577

76-
yesterday_ds_mock = mocker.patch("sqlmesh.core.plan.definition.yesterday_ds")
78+
yesterday_ds_mock = mocker.patch("sqlmesh.core.scheduler.yesterday")
7779
yesterday_ds_mock.return_value = expected_start
7880

79-
now_ds_mock = mocker.patch("sqlmesh.core.plan.definition.now")
81+
now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now")
8082
now_ds_mock.return_value = expected_end
8183
state_reader_mock.missing_intervals.return_value = {}
8284

@@ -389,7 +391,8 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture):
389391
)
390392

391393
expected_start = to_datetime("2022-01-01")
392-
expected_end = to_datetime("2022-01-03")
394+
end_date = to_datetime("2022-01-03")
395+
expected_end = to_date(end_date) - timedelta(days=1)
393396
now = to_datetime("2022-01-30")
394397

395398
dag = DAG[str]({"a": set()})
@@ -402,13 +405,13 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture):
402405

403406
state_reader_mock = mocker.Mock()
404407

405-
now_ds_mock = mocker.patch("sqlmesh.core.plan.definition.now")
408+
now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now")
406409
now_ds_mock.return_value = now
407410
state_reader_mock.missing_intervals.return_value = {
408-
snapshot_a: [(to_timestamp(expected_start), to_timestamp(expected_end))]
411+
snapshot_a: [(to_timestamp(expected_start), to_timestamp(end_date))]
409412
}
410413

411414
plan = Plan(context_diff_mock, dag, state_reader_mock, is_dev=True)
412415

413416
assert plan.start == to_timestamp(expected_start)
414-
assert plan.end == to_timestamp(expected_end)
417+
assert plan.end == expected_end

web/server/api/endpoints/plan.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY
99

1010
from sqlmesh.core.context import Context
11-
from sqlmesh.utils.date import make_inclusive, to_date, to_ds, to_timestamp
11+
from sqlmesh.utils.date import make_inclusive, to_ds
1212
from sqlmesh.utils.errors import PlanError
1313
from web.server import models
1414
from web.server.settings import get_loaded_context
@@ -62,15 +62,10 @@ async def run_plan(
6262
detail=str(e),
6363
)
6464

65-
# TODO: This needs to be removed to support time in the UI
66-
# We need to figure out how to communicate dates between the UI and API while maintaining
67-
# precision in order to do inclusivity ranges correctly.
68-
start, end = make_inclusive(to_timestamp(to_date(plan.start)), to_timestamp(to_date(plan.end)))
69-
7065
payload = models.ContextEnvironment(
7166
environment=plan.environment.name,
72-
start=start,
73-
end=end,
67+
start=plan.start,
68+
end=plan.end,
7469
)
7570

7671
if plan.context_diff.has_changes:

0 commit comments

Comments
 (0)