From 799992bc02ed4a31401a800aba617e8149760d6d Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:04:21 -0700 Subject: [PATCH 1/6] fix dates --- sqlmesh/core/plan/definition.py | 30 ++++++++++++--- sqlmesh/core/scheduler.py | 63 +++++++++++++++++++++++++++++++- tests/core/test_plan.py | 14 ++++--- web/server/api/endpoints/plan.py | 11 ++---- 4 files changed, 98 insertions(+), 20 deletions(-) diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index fbb6e7c52b..2fe543f7fa 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -8,6 +8,7 @@ from sqlmesh.core.config import CategorizerConfig from sqlmesh.core.context_diff import ContextDiff from sqlmesh.core.environment import Environment +from sqlmesh.core.model.meta import IntervalUnit from sqlmesh.core.snapshot import ( Intervals, Snapshot, @@ -23,6 +24,7 @@ TimeLike, make_inclusive, now, + to_date, to_ds, to_timestamp, validate_date_range, @@ -139,6 +141,8 @@ def uncategorized(self) -> t.List[Snapshot]: @property def start(self) -> TimeLike: """Returns the start of the plan or the earliest date of all snapshots.""" + if not self.override_start and not self._missing_intervals: + return scheduler.earliest_start_date(self.snapshots) return self._start or ( min( start @@ -153,6 +157,7 @@ def start(self) -> TimeLike: def start(self, new_start: TimeLike) -> None: self._ensure_valid_date_range(new_start, self._end) self.set_start(new_start) + self.override_start = True def set_start(self, new_start: TimeLike) -> None: self._start = new_start @@ -162,14 +167,29 @@ def set_start(self, new_start: TimeLike) -> None: def end(self) -> TimeLike: """Returns the end of the plan or now.""" if not self._end or not self.override_end: - if self.missing_intervals: - return max( - end - for intervals_per_model in self._missing_intervals.values() + if self._missing_intervals: + end, snapshot_version = max( + (end, snapshot_version) + for snapshot_version, intervals_per_model in self._missing_intervals.items() for _, end in intervals_per_model ) + end_and_interval: t.Tuple[int, t.Optional[IntervalUnit]] = (-1, None) + for snapshot in self.snapshots: + missing_interval = self._missing_intervals.get( + snapshot.version_get_or_generate() + ) + if missing_interval: + end_and_interval = max( + [ + end_and_interval, + (missing_interval[-1][1], snapshot.model.interval_unit()), + ] + ) + if end_and_interval[1] == IntervalUnit.DAY: + return to_date(make_inclusive(self.start, end)[1]) + return end_and_interval[0] else: - return self._end or now() + return scheduler.latest_end_date(self.snapshots) return self._end @end.setter diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 604afd8d54..09872cce1c 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -2,9 +2,10 @@ import logging import typing as t -from datetime import datetime +from datetime import date, datetime from sqlmesh.core.console import Console, get_console +from sqlmesh.core.model.meta import IntervalUnit from sqlmesh.core.snapshot import ( Snapshot, SnapshotEvaluator, @@ -17,7 +18,9 @@ from sqlmesh.utils.dag import DAG from sqlmesh.utils.date import ( TimeLike, + make_inclusive, now, + to_date, to_datetime, validate_date_range, yesterday, @@ -364,6 +367,44 @@ def start_date( return earliest +def end_date( + snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot] | t.Iterable[Snapshot] +) -> t.Optional[datetime]: + """Get the effective/inferred end date for a snapshot. + + Not all snapshots define a start date. In those cases, the model's start date + can be inferred from its parent's start date. + + Args: + snapshot: snapshot to infer start date. + snapshots: a catalog of available snapshots. + + Returns: + Start datetime object. + """ + + if snapshot.model.start: + return to_datetime(snapshot.model.start) + + if not isinstance(snapshots, dict): + snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} + + earliest = None + + for parent in snapshot.parents: + if parent not in snapshots: + continue + + start_dt = start_date(snapshots[parent], snapshots) + + if not earliest: + earliest = start_dt + elif start_dt: + earliest = min(earliest, start_dt) + + return earliest + + def earliest_start_date(snapshots: t.Iterable[Snapshot]) -> datetime: """Get the earliest start date from a collection of snapshots. @@ -377,6 +418,26 @@ def earliest_start_date(snapshots: t.Iterable[Snapshot]) -> datetime: return yesterday() +def latest_end_date(snapshots: t.Iterable[Snapshot]) -> datetime | date: + """Get the latest end date from a collection of snapshots. + + Args: + snapshots: Snapshots to find latest end date. + Returns: + The earliest start date or now if none is found.""" + snapshots = list(snapshots) + if not any(snapshot.intervals for snapshot in snapshots): + return now() + if snapshots: + end_date, interval = max( + (snapshot.intervals[-1][1], snapshot.model.interval_unit()) for snapshot in snapshots + ) + if interval == IntervalUnit.DAY: + return to_date(make_inclusive(end_date, end_date)[1]) + return to_datetime(end_date) + return now() + + def _batched_intervals(params: SnapshotToBatches) -> SnapshotToBatches: batches = {} diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index dd91b81b5b..8a94d25015 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -1,3 +1,5 @@ +from datetime import timedelta + import pytest from pytest_mock.plugin import MockerFixture from sqlglot import parse_one @@ -12,7 +14,7 @@ SnapshotFingerprint, ) from sqlmesh.utils.dag import DAG -from sqlmesh.utils.date import to_datetime, to_timestamp +from sqlmesh.utils.date import to_date, to_datetime, to_timestamp from sqlmesh.utils.errors import PlanError @@ -60,7 +62,7 @@ def test_forward_only_dev(make_snapshot, mocker: MockerFixture): ) ) - expected_start = "2022-01-01" + expected_start = to_datetime("2022-01-01") expected_end = to_datetime("2022-01-02") dag = DAG[str]({"a": set()}) @@ -73,10 +75,10 @@ def test_forward_only_dev(make_snapshot, mocker: MockerFixture): state_reader_mock = mocker.Mock() - yesterday_ds_mock = mocker.patch("sqlmesh.core.plan.definition.yesterday_ds") + yesterday_ds_mock = mocker.patch("sqlmesh.core.scheduler.yesterday") yesterday_ds_mock.return_value = expected_start - now_ds_mock = mocker.patch("sqlmesh.core.plan.definition.now") + now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now") now_ds_mock.return_value = expected_end state_reader_mock.missing_intervals.return_value = {} @@ -402,7 +404,7 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture): state_reader_mock = mocker.Mock() - now_ds_mock = mocker.patch("sqlmesh.core.plan.definition.now") + now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now") now_ds_mock.return_value = now state_reader_mock.missing_intervals.return_value = { snapshot_a: [(to_timestamp(expected_start), to_timestamp(expected_end))] @@ -411,4 +413,4 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture): plan = Plan(context_diff_mock, dag, state_reader_mock, is_dev=True) assert plan.start == to_timestamp(expected_start) - assert plan.end == to_timestamp(expected_end) + assert plan.end == to_date(expected_end) - timedelta(days=1) diff --git a/web/server/api/endpoints/plan.py b/web/server/api/endpoints/plan.py index 08c2bde88e..e095151e68 100644 --- a/web/server/api/endpoints/plan.py +++ b/web/server/api/endpoints/plan.py @@ -8,7 +8,7 @@ from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY from sqlmesh.core.context import Context -from sqlmesh.utils.date import make_inclusive, to_date, to_ds, to_timestamp +from sqlmesh.utils.date import make_inclusive, to_ds from sqlmesh.utils.errors import PlanError from web.server import models from web.server.settings import get_loaded_context @@ -62,15 +62,10 @@ async def run_plan( detail=str(e), ) - # TODO: This needs to be removed to support time in the UI - # We need to figure out how to communicate dates between the UI and API while maintaining - # precision in order to do inclusivity ranges correctly. - start, end = make_inclusive(to_timestamp(to_date(plan.start)), to_timestamp(to_date(plan.end))) - payload = models.ContextEnvironment( environment=plan.environment.name, - start=start, - end=end, + start=plan.start, + end=plan.end, ) if plan.context_diff.has_changes: From 1d80469754da24cd7e4fab0c63db1f4970810d83 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:13:12 -0700 Subject: [PATCH 2/6] remove leftover code --- sqlmesh/core/plan/definition.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 2fe543f7fa..20d8d28d31 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -168,11 +168,6 @@ def end(self) -> TimeLike: """Returns the end of the plan or now.""" if not self._end or not self.override_end: if self._missing_intervals: - end, snapshot_version = max( - (end, snapshot_version) - for snapshot_version, intervals_per_model in self._missing_intervals.items() - for _, end in intervals_per_model - ) end_and_interval: t.Tuple[int, t.Optional[IntervalUnit]] = (-1, None) for snapshot in self.snapshots: missing_interval = self._missing_intervals.get( @@ -186,7 +181,7 @@ def end(self) -> TimeLike: ] ) if end_and_interval[1] == IntervalUnit.DAY: - return to_date(make_inclusive(self.start, end)[1]) + return to_date(make_inclusive(self.start, end_and_interval[0])[1]) return end_and_interval[0] else: return scheduler.latest_end_date(self.snapshots) From 65b8baee4db1a2f16c3cc6b7942b1e8368cbb399 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:29:55 -0700 Subject: [PATCH 3/6] fixes --- sqlmesh/core/plan/definition.py | 26 +++++++++------------ sqlmesh/core/scheduler.py | 40 +-------------------------------- 2 files changed, 12 insertions(+), 54 deletions(-) diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 20d8d28d31..86c4ef6749 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -168,21 +168,17 @@ def end(self) -> TimeLike: """Returns the end of the plan or now.""" if not self._end or not self.override_end: if self._missing_intervals: - end_and_interval: t.Tuple[int, t.Optional[IntervalUnit]] = (-1, None) - for snapshot in self.snapshots: - missing_interval = self._missing_intervals.get( - snapshot.version_get_or_generate() - ) - if missing_interval: - end_and_interval = max( - [ - end_and_interval, - (missing_interval[-1][1], snapshot.model.interval_unit()), - ] - ) - if end_and_interval[1] == IntervalUnit.DAY: - return to_date(make_inclusive(self.start, end_and_interval[0])[1]) - return end_and_interval[0] + end, interval_unit = max( + [ + (end, snapshot.model.interval_unit()) + for snapshot in self.snapshots + if snapshot.version_get_or_generate() in self._missing_intervals + for _, end in self._missing_intervals[snapshot.version_get_or_generate()] + ] + ) + if interval_unit == IntervalUnit.DAY: + return to_date(make_inclusive(self.start, end)[1]) + return end else: return scheduler.latest_end_date(self.snapshots) return self._end diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 09872cce1c..05812e8a24 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -367,44 +367,6 @@ def start_date( return earliest -def end_date( - snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot] | t.Iterable[Snapshot] -) -> t.Optional[datetime]: - """Get the effective/inferred end date for a snapshot. - - Not all snapshots define a start date. In those cases, the model's start date - can be inferred from its parent's start date. - - Args: - snapshot: snapshot to infer start date. - snapshots: a catalog of available snapshots. - - Returns: - Start datetime object. - """ - - if snapshot.model.start: - return to_datetime(snapshot.model.start) - - if not isinstance(snapshots, dict): - snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} - - earliest = None - - for parent in snapshot.parents: - if parent not in snapshots: - continue - - start_dt = start_date(snapshots[parent], snapshots) - - if not earliest: - earliest = start_dt - elif start_dt: - earliest = min(earliest, start_dt) - - return earliest - - def earliest_start_date(snapshots: t.Iterable[Snapshot]) -> datetime: """Get the earliest start date from a collection of snapshots. @@ -424,7 +386,7 @@ def latest_end_date(snapshots: t.Iterable[Snapshot]) -> datetime | date: Args: snapshots: Snapshots to find latest end date. Returns: - The earliest start date or now if none is found.""" + The latest end date or now if none is found.""" snapshots = list(snapshots) if not any(snapshot.intervals for snapshot in snapshots): return now() From 43befe86544f9d7614bc03e7915f08a6c8ee1a17 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:34:29 -0700 Subject: [PATCH 4/6] fix quotes --- sqlmesh/core/scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 05812e8a24..c6fca7f186 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -373,7 +373,8 @@ def earliest_start_date(snapshots: t.Iterable[Snapshot]) -> datetime: Args: snapshots: Snapshots to find earliest start date. Returns: - The earliest start date or yesterday if none is found.""" + The earliest start date or yesterday if none is found. + """ snapshots = list(snapshots) if snapshots: return min(start_date(snapshot, snapshots) or yesterday() for snapshot in snapshots) @@ -386,7 +387,8 @@ def latest_end_date(snapshots: t.Iterable[Snapshot]) -> datetime | date: Args: snapshots: Snapshots to find latest end date. Returns: - The latest end date or now if none is found.""" + The latest end date or now if none is found. + """ snapshots = list(snapshots) if not any(snapshot.intervals for snapshot in snapshots): return now() From 739c5ba86e3c9e75f9f76f3f6f8954723036faa3 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:53:44 -0700 Subject: [PATCH 5/6] fixes --- sqlmesh/core/scheduler.py | 14 +++++++------- tests/core/test_plan.py | 7 ++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index c6fca7f186..5fe465bd7c 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -389,13 +389,13 @@ def latest_end_date(snapshots: t.Iterable[Snapshot]) -> datetime | date: Returns: The latest end date or now if none is found. """ - snapshots = list(snapshots) - if not any(snapshot.intervals for snapshot in snapshots): - return now() - if snapshots: - end_date, interval = max( - (snapshot.intervals[-1][1], snapshot.model.interval_unit()) for snapshot in snapshots - ) + end_date_and_intervals = [ + (snapshot.intervals[-1][1], snapshot.model.interval_unit()) + for snapshot in snapshots + if snapshot.intervals + ] + if end_date_and_intervals: + end_date, interval = max(end_date_and_intervals) if interval == IntervalUnit.DAY: return to_date(make_inclusive(end_date, end_date)[1]) return to_datetime(end_date) diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 8a94d25015..547bdb5d24 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -391,7 +391,8 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture): ) expected_start = to_datetime("2022-01-01") - expected_end = to_datetime("2022-01-03") + end_date = to_datetime("2022-01-03") + expected_end = to_date(end_date) - timedelta(days=1) now = to_datetime("2022-01-30") dag = DAG[str]({"a": set()}) @@ -407,10 +408,10 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture): now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now") now_ds_mock.return_value = now state_reader_mock.missing_intervals.return_value = { - snapshot_a: [(to_timestamp(expected_start), to_timestamp(expected_end))] + snapshot_a: [(to_timestamp(expected_start), to_timestamp(end_date))] } plan = Plan(context_diff_mock, dag, state_reader_mock, is_dev=True) assert plan.start == to_timestamp(expected_start) - assert plan.end == to_date(expected_end) - timedelta(days=1) + assert plan.end == expected_end From 4ab6a5e11fdb90f302611561e77f09f49d25fe27 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:57:30 -0700 Subject: [PATCH 6/6] cache interval unit --- sqlmesh/core/model/meta.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sqlmesh/core/model/meta.py b/sqlmesh/core/model/meta.py index 2b33581c5a..1445fbc43e 100644 --- a/sqlmesh/core/model/meta.py +++ b/sqlmesh/core/model/meta.py @@ -61,6 +61,7 @@ class ModelMeta(PydanticModel): audits: t.List[AuditReference] = [] _croniter: t.Optional[croniter] = None + _interval_unit: t.Optional[IntervalUnit] = None _model_kind_validator = model_kind_validator @@ -271,14 +272,17 @@ def interval_unit(self, sample_size: int = 10) -> IntervalUnit: Returns: The IntervalUnit enum. """ - schedule = croniter(self.cron) - samples = [schedule.get_next() for _ in range(sample_size)] - min_interval = min(b - a for a, b in zip(samples, samples[1:])) - if min_interval >= 86400: - return IntervalUnit.DAY - elif min_interval >= 3600: - return IntervalUnit.HOUR - return IntervalUnit.MINUTE + if not self._interval_unit: + schedule = croniter(self.cron) + samples = [schedule.get_next() for _ in range(sample_size)] + min_interval = min(b - a for a, b in zip(samples, samples[1:])) + if min_interval >= 86400: + self._interval_unit = IntervalUnit.DAY + elif min_interval >= 3600: + self._interval_unit = IntervalUnit.HOUR + else: + self._interval_unit = IntervalUnit.MINUTE + return self._interval_unit def normalized_cron(self) -> str: """Returns the UTC normalized cron based on sampling heuristics.