Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions sqlmesh/core/model/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ModelMeta(PydanticModel):
audits: t.List[AuditReference] = []

_croniter: t.Optional[croniter] = None
_interval_unit: t.Optional[IntervalUnit] = None

_model_kind_validator = model_kind_validator

Expand Down Expand Up @@ -271,14 +272,17 @@ def interval_unit(self, sample_size: int = 10) -> IntervalUnit:
Returns:
The IntervalUnit enum.
"""
schedule = croniter(self.cron)
samples = [schedule.get_next() for _ in range(sample_size)]
min_interval = min(b - a for a, b in zip(samples, samples[1:]))
if min_interval >= 86400:
return IntervalUnit.DAY
elif min_interval >= 3600:
return IntervalUnit.HOUR
return IntervalUnit.MINUTE
if not self._interval_unit:
schedule = croniter(self.cron)
samples = [schedule.get_next() for _ in range(sample_size)]
min_interval = min(b - a for a, b in zip(samples, samples[1:]))
if min_interval >= 86400:
self._interval_unit = IntervalUnit.DAY
elif min_interval >= 3600:
self._interval_unit = IntervalUnit.HOUR
else:
self._interval_unit = IntervalUnit.MINUTE
return self._interval_unit

def normalized_cron(self) -> str:
"""Returns the UTC normalized cron based on sampling heuristics.
Expand Down
23 changes: 17 additions & 6 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlmesh.core.config import CategorizerConfig
from sqlmesh.core.context_diff import ContextDiff
from sqlmesh.core.environment import Environment
from sqlmesh.core.model.meta import IntervalUnit
from sqlmesh.core.snapshot import (
Intervals,
Snapshot,
Expand All @@ -23,6 +24,7 @@
TimeLike,
make_inclusive,
now,
to_date,
to_ds,
to_timestamp,
validate_date_range,
Expand Down Expand Up @@ -139,6 +141,8 @@ def uncategorized(self) -> t.List[Snapshot]:
@property
def start(self) -> TimeLike:
"""Returns the start of the plan or the earliest date of all snapshots."""
if not self.override_start and not self._missing_intervals:
return scheduler.earliest_start_date(self.snapshots)
return self._start or (
min(
start
Expand All @@ -153,6 +157,7 @@ def start(self) -> TimeLike:
def start(self, new_start: TimeLike) -> None:
self._ensure_valid_date_range(new_start, self._end)
self.set_start(new_start)
self.override_start = True

def set_start(self, new_start: TimeLike) -> None:
self._start = new_start
Expand All @@ -162,14 +167,20 @@ def set_start(self, new_start: TimeLike) -> None:
def end(self) -> TimeLike:
"""Returns the end of the plan or now."""
if not self._end or not self.override_end:
if self.missing_intervals:
return max(
end
for intervals_per_model in self._missing_intervals.values()
for _, end in intervals_per_model
if self._missing_intervals:
end, interval_unit = max(
[
(end, snapshot.model.interval_unit())
for snapshot in self.snapshots
if snapshot.version_get_or_generate() in self._missing_intervals
for _, end in self._missing_intervals[snapshot.version_get_or_generate()]
]
)
if interval_unit == IntervalUnit.DAY:
return to_date(make_inclusive(self.start, end)[1])
return end
else:
return self._end or now()
return scheduler.latest_end_date(self.snapshots)
return self._end

@end.setter
Expand Down
29 changes: 27 additions & 2 deletions sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import logging
import typing as t
from datetime import datetime
from datetime import date, datetime

from sqlmesh.core.console import Console, get_console
from sqlmesh.core.model.meta import IntervalUnit
from sqlmesh.core.snapshot import (
Snapshot,
SnapshotEvaluator,
Expand All @@ -17,7 +18,9 @@
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.date import (
TimeLike,
make_inclusive,
now,
to_date,
to_datetime,
validate_date_range,
yesterday,
Expand Down Expand Up @@ -370,13 +373,35 @@ def earliest_start_date(snapshots: t.Iterable[Snapshot]) -> datetime:
Args:
snapshots: Snapshots to find earliest start date.
Returns:
The earliest start date or yesterday if none is found."""
The earliest start date or yesterday if none is found.
"""
snapshots = list(snapshots)
if snapshots:
return min(start_date(snapshot, snapshots) or yesterday() for snapshot in snapshots)
return yesterday()


def latest_end_date(snapshots: t.Iterable[Snapshot]) -> datetime | date:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can probably be dried up , looks like there's duplicate logic

"""Get the latest end date from a collection of snapshots.

Args:
snapshots: Snapshots to find latest end date.
Returns:
The latest end date or now if none is found.
"""
end_date_and_intervals = [
(snapshot.intervals[-1][1], snapshot.model.interval_unit())
for snapshot in snapshots
if snapshot.intervals
]
if end_date_and_intervals:
end_date, interval = max(end_date_and_intervals)
if interval == IntervalUnit.DAY:
return to_date(make_inclusive(end_date, end_date)[1])
return to_datetime(end_date)
return now()


def _batched_intervals(params: SnapshotToBatches) -> SnapshotToBatches:
batches = {}

Expand Down
19 changes: 11 additions & 8 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

import pytest
from pytest_mock.plugin import MockerFixture
from sqlglot import parse_one
Expand All @@ -12,7 +14,7 @@
SnapshotFingerprint,
)
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.date import to_datetime, to_timestamp
from sqlmesh.utils.date import to_date, to_datetime, to_timestamp
from sqlmesh.utils.errors import PlanError


Expand Down Expand Up @@ -60,7 +62,7 @@ def test_forward_only_dev(make_snapshot, mocker: MockerFixture):
)
)

expected_start = "2022-01-01"
expected_start = to_datetime("2022-01-01")
expected_end = to_datetime("2022-01-02")

dag = DAG[str]({"a": set()})
Expand All @@ -73,10 +75,10 @@ def test_forward_only_dev(make_snapshot, mocker: MockerFixture):

state_reader_mock = mocker.Mock()

yesterday_ds_mock = mocker.patch("sqlmesh.core.plan.definition.yesterday_ds")
yesterday_ds_mock = mocker.patch("sqlmesh.core.scheduler.yesterday")
yesterday_ds_mock.return_value = expected_start

now_ds_mock = mocker.patch("sqlmesh.core.plan.definition.now")
now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now")
now_ds_mock.return_value = expected_end
state_reader_mock.missing_intervals.return_value = {}

Expand Down Expand Up @@ -389,7 +391,8 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture):
)

expected_start = to_datetime("2022-01-01")
expected_end = to_datetime("2022-01-03")
end_date = to_datetime("2022-01-03")
expected_end = to_date(end_date) - timedelta(days=1)
now = to_datetime("2022-01-30")

dag = DAG[str]({"a": set()})
Expand All @@ -402,13 +405,13 @@ def test_end_from_missing_instead_of_now(make_snapshot, mocker: MockerFixture):

state_reader_mock = mocker.Mock()

now_ds_mock = mocker.patch("sqlmesh.core.plan.definition.now")
now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now")
now_ds_mock.return_value = now
state_reader_mock.missing_intervals.return_value = {
snapshot_a: [(to_timestamp(expected_start), to_timestamp(expected_end))]
snapshot_a: [(to_timestamp(expected_start), to_timestamp(end_date))]
}

plan = Plan(context_diff_mock, dag, state_reader_mock, is_dev=True)

assert plan.start == to_timestamp(expected_start)
assert plan.end == to_timestamp(expected_end)
assert plan.end == expected_end
11 changes: 3 additions & 8 deletions web/server/api/endpoints/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY

from sqlmesh.core.context import Context
from sqlmesh.utils.date import make_inclusive, to_date, to_ds, to_timestamp
from sqlmesh.utils.date import make_inclusive, to_ds
from sqlmesh.utils.errors import PlanError
from web.server import models
from web.server.settings import get_loaded_context
Expand Down Expand Up @@ -62,15 +62,10 @@ async def run_plan(
detail=str(e),
)

# TODO: This needs to be removed to support time in the UI
# We need to figure out how to communicate dates between the UI and API while maintaining
# precision in order to do inclusivity ranges correctly.
start, end = make_inclusive(to_timestamp(to_date(plan.start)), to_timestamp(to_date(plan.end)))

payload = models.ContextEnvironment(
environment=plan.environment.name,
start=start,
end=end,
start=plan.start,
end=plan.end,
)

if plan.context_diff.has_changes:
Expand Down