Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/concepts/plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ To create a forward-only plan, the `--forward-only` option has to be added to th
sqlmesh plan --forward-only
```

### Effective date
Changes that are part of the forward-only plan can also be applied retroactively to the production environment by specifying the effective date:
```bash
sqlmesh plan --forward-only --effective-from 2023-01-01
```
This way SQLMesh will know to recompute data intervals starting from the specified date once forward-only changes are deployed to production.

## Restatement plans
There are cases when models need to be re-evaluated for a given time range, even though changes may not have been made to those model definitions. This could be due to an upstream issue with a dataset defined outside the SQLMesh platform, or when a [forward-only plan](#forward-only-plans) change needs to be applied retroactively to a bounded interval of historical data.

Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
is_flag=True,
help="Create a plan for forward-only changes.",
)
@click.option(
"--effective-from",
type=str,
required=False,
help="The effective date from which to apply forward-only changes on production.",
)
@click.option(
"--no-prompts",
is_flag=True,
Expand Down
89 changes: 72 additions & 17 deletions sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
from sqlmesh.core.test import ModelTest
from sqlmesh.utils import rich as srich
from sqlmesh.utils.date import to_date
from sqlmesh.utils.date import to_date, yesterday_ds

if t.TYPE_CHECKING:
import ipywidgets as widgets
Expand Down Expand Up @@ -292,6 +292,9 @@ def plan(self, plan: Plan, auto_apply: bool) -> None:
plan.apply()

def _show_options_after_categorization(self, plan: Plan, auto_apply: bool) -> None:
if plan.forward_only and plan.new_snapshots:
self._prompt_effective_from(plan, auto_apply)

if plan.requires_backfill:
self._show_missing_dates(plan)
self._prompt_backfill(plan, auto_apply)
Expand Down Expand Up @@ -348,22 +351,43 @@ def _show_missing_dates(self, plan: Plan) -> None:
)
self._print(backfill)

def _prompt_effective_from(self, plan: Plan, auto_apply: bool) -> None:
if not plan.effective_from:
effective_from = self._prompt(
"Enter the effective date (eg. '1 year', '2020-01-01') to apply forward-only changes retroactively or blank to only apply them going forward once changes are deployed to prod"
)
if effective_from:
plan.effective_from = effective_from

if plan.is_dev and plan.effective_from:
plan.set_start(plan.effective_from)

def _prompt_backfill(self, plan: Plan, auto_apply: bool) -> None:
is_forward_only_dev = plan.is_dev and plan.forward_only
backfill_or_preview = "preview" if is_forward_only_dev else "backfill"

if plan.is_start_and_end_allowed:
if not plan.override_start:
blank_meaning = (
"to preview starting from yesterday"
if is_forward_only_dev
else "for the beginning of history"
)
if is_forward_only_dev:
if plan.effective_from:
blank_meaning = (
f"to preview starting from the effective date ('{plan.effective_from}')"
)
default_start = plan.effective_from
else:
blank_meaning = "to preview starting from yesterday"
default_start = yesterday_ds()
else:
blank_meaning = "to backfill from the beginning of history"
default_start = None

start = self._prompt(
f"Enter the {backfill_or_preview} start date (eg. '1 year', '2020-01-01') or blank {blank_meaning}",
)
if start:
plan.start = start
elif default_start:
plan.start = default_start

if not plan.override_end:
end = self._prompt(
Expand Down Expand Up @@ -533,6 +557,48 @@ def _prompt_promote(self, plan: Plan) -> None:
button.on_click(self._apply)
button.output = output

def _prompt_effective_from(self, plan: Plan, auto_apply: bool) -> None:
import ipywidgets as widgets

prompt = widgets.VBox()

def effective_from_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
plan.effective_from = change["new"]
self._show_options_after_categorization(plan, auto_apply)

def going_forward_change_callback(change: t.Dict[str, bool]) -> None:
checked = change["new"]
plan.effective_from = None if checked else yesterday_ds()
self._show_options_after_categorization(plan, auto_apply=auto_apply)

date_picker = widgets.DatePicker(
disabled=plan.effective_from is None,
value=to_date(plan.effective_from or yesterday_ds()),
layout={"width": "auto"},
)
date_picker.observe(effective_from_change_callback, "value")

going_forward_checkbox = widgets.Checkbox(
value=plan.effective_from is None,
description="Apply Going Forward Once Deployed To Prod",
disabled=False,
indent=False,
)
going_forward_checkbox.observe(going_forward_change_callback, "value")

add_to_layout_widget(
prompt,
widgets.HBox(
[
widgets.Label("Effective From Date:", layout={"width": "8rem"}),
date_picker,
going_forward_checkbox,
]
),
)

self._add_to_dynamic_options(prompt)

def _prompt_backfill(self, plan: Plan, auto_apply: bool) -> None:
import ipywidgets as widgets

Expand All @@ -552,17 +618,6 @@ def _date_picker(
picker.observe(on_change, "value")
return picker

def _checkbox(description: str, value: bool, on_change: t.Callable) -> widgets.Checkbox:
checkbox = widgets.Checkbox(
value=value,
description=description,
disabled=False,
indent=False,
)

checkbox.observe(on_change, "value")
return checkbox

def start_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
plan.start = change["new"]
self._show_options_after_categorization(plan, auto_apply)
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ def plan(
no_prompts: bool = False,
auto_apply: bool = False,
no_auto_categorization: t.Optional[bool] = None,
effective_from: t.Optional[TimeLike] = None,
) -> Plan:
"""Interactively create a migration plan.

Expand Down Expand Up @@ -616,6 +617,7 @@ def plan(
no_auto_categorization: Indicates whether to disable automatic categorization of model
changes (breaking / non-breaking). If not provided, then the corresponding configuration
option determines the behavior.
effective_from: The effective date from which to apply forward-only changes on production.

Returns:
The populated Plan object.
Expand Down Expand Up @@ -645,6 +647,7 @@ def plan(
environment_ttl=self.environment_ttl,
categorizer_config=self.auto_categorize_changes,
auto_categorization_enabled=not no_auto_categorization,
effective_from=effective_from,
)

if not no_prompts:
Expand Down
40 changes: 40 additions & 0 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Plan:
environment_ttl: The period of time that a development environment should exist before being deleted.
categorizer_config: Auto categorization settings.
auto_categorization_enabled: Whether to apply auto categorization.
effective_from: The effective date from which to apply forward-only changes on production.
"""

def __init__(
Expand All @@ -78,6 +79,7 @@ def __init__(
environment_ttl: t.Optional[str] = None,
categorizer_config: t.Optional[CategorizerConfig] = None,
auto_categorization_enabled: bool = True,
effective_from: t.Optional[TimeLike] = None,
):
self.context_diff = context_diff
self.override_start = start is not None
Expand All @@ -90,6 +92,7 @@ def __init__(
self.environment_ttl = environment_ttl
self.categorizer_config = categorizer_config or CategorizerConfig()
self.auto_categorization_enabled = auto_categorization_enabled
self._effective_from: t.Optional[TimeLike] = None
self._start = start if start or not (is_dev and forward_only) else yesterday_ds()
self._end = end if end or not is_dev else now()
self._latest = latest or now()
Expand Down Expand Up @@ -133,6 +136,9 @@ def __init__(
self._categorized: t.Optional[t.List[Snapshot]] = None
self._uncategorized: t.Optional[t.List[Snapshot]] = None

if effective_from:
self._set_effective_from(effective_from)

@property
def categorized(self) -> t.List[Snapshot]:
"""Returns the already categorized snapshots."""
Expand Down Expand Up @@ -319,6 +325,40 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> None
self._categorized = None
self._uncategorized = None

@property
def effective_from(self) -> t.Optional[TimeLike]:
"""The effective date for all new snapshots in the plan.

Note: this is only applicable for forward-only plans.

Returns:
The effective date.
"""
return self._effective_from

@effective_from.setter
def effective_from(self, effective_from: t.Optional[TimeLike]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have this and both _set_effective_from? why not just have the setter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that it doesn't look confusing when we set in the constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think it will be confusing?

if effective_from:
self.effective_from = effective_from is fine right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be more like:

self._effective_from = None
if effective_from:
    self.effective_from = effective_from is fine right?

So it looks like we're setting it twice, which is a confusing part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to initialize the first one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do, because plan.effective_from will return an attribute error if I never set it.

"""Sets the effective date for all new snapshots in the plan.

Note: this is only applicable for forward-only plans.

Args:
effective_from: The effective date to set.
"""
self._set_effective_from(effective_from)

def _set_effective_from(self, effective_from: t.Optional[TimeLike]) -> None:
if not self.forward_only:
raise PlanError("Effective date can only be set for a forward-only plan.")
if effective_from and to_datetime(effective_from) > now():
raise PlanError("Effective date cannot be in the future.")

self.__missing_intervals = None
self._effective_from = effective_from

for snapshot in self.new_snapshots:
snapshot.effective_from = effective_from

@property
def _missing_intervals(self) -> t.Dict[str, Intervals]:
if self.__missing_intervals is None:
Expand Down
13 changes: 12 additions & 1 deletion sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
change_category: User specified change category indicating which models require backfill from model changes made in this snapshot.
unpaused_ts: The timestamp which indicates when this snapshot was unpaused. Unpaused means that
this snapshot is evaluated on a recurring basis. None indicates that this snapshot is paused.
effective_from: The timestamp which indicates when this snapshot should be considered effective.
Applicable for forward-only snapshots only.
"""

name: str
Expand All @@ -315,6 +317,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
temp_version: t.Optional[str] = None
change_category: t.Optional[SnapshotChangeCategory] = None
unpaused_ts: t.Optional[int] = None
effective_from: t.Optional[TimeLike] = None

@validator("ttl")
@classmethod
Expand Down Expand Up @@ -499,8 +502,16 @@ def merge_intervals(self, other: Snapshot) -> None:
Args:
other: The target snapshot to inherit intervals from.
"""
effective_from_ts = to_timestamp(self.effective_from) if self.effective_from else 0
apply_effective_from = effective_from_ts > 0 and self.fingerprint != other.fingerprint

for start, end in other.intervals:
self.add_interval(start, end)
# If the effective_from is set, then intervals that come after it must come from
# the current snapshost.
if apply_effective_from and start < effective_from_ts:
end = min(end, effective_from_ts)
if not apply_effective_from or end <= effective_from_ts:
self.add_interval(start, end)

def missing_intervals(
self, start: TimeLike, end: TimeLike, latest: t.Optional[TimeLike] = None
Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ def test(self, line: str, test_def_raw: t.Optional[str] = None) -> None:
action="store_true",
help="Create a plan for forward-only changes.",
)
@argument(
"--effective-from",
type=str,
help="The effective date from which to apply forward-only changes on production.",
)
@argument(
"--no-prompts",
action="store_true",
Expand Down Expand Up @@ -253,6 +258,7 @@ def plan(self, line: str) -> None:
no_prompts=args.no_prompts,
auto_apply=args.auto_apply,
no_auto_categorization=args.no_auto_categorization,
effective_from=args.effective_from,
)
self._context.console = console

Expand Down
7 changes: 6 additions & 1 deletion sqlmesh/schedulers/airflow/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,18 @@ def create_plan_dag_spec(
snapshots_for_intervals.pop(sid)

if request.restatements:
snapshots_for_restatement = (
snapshots_for_intervals.values()
if not request.is_dev
else [snapshots_for_intervals[s.snapshot_id] for s in request.environment.snapshots]
)
state_sync.remove_interval(
[],
start=request.environment.start_at,
end=end,
all_snapshots=(
snapshot
for snapshot in snapshots_for_intervals.values()
for snapshot in snapshots_for_restatement
if snapshot.name in request.restatements
and snapshot.snapshot_id not in new_snapshots
),
Expand Down
1 change: 1 addition & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ def apply_to_environment(
plan = context.plan(
environment,
forward_only=choice == SnapshotChangeCategory.FORWARD_ONLY,
no_prompts=True,
)
plan.set_start(start(context))

Expand Down
42 changes: 41 additions & 1 deletion tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
SnapshotDataVersion,
SnapshotFingerprint,
)
from sqlmesh.utils.date import to_date, to_datetime, to_timestamp
from sqlmesh.utils.date import now, to_date, to_datetime, to_timestamp
from sqlmesh.utils.errors import PlanError


Expand Down Expand Up @@ -423,3 +423,43 @@ def test_broken_references(make_snapshot, mocker: MockerFixture):
match=r"Removed models {'a'} are referenced in model 'b'.*",
):
Plan(context_diff_mock, state_reader_mock)


def test_effective_from(make_snapshot, mocker: MockerFixture):
snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds FROM a")))
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)

context_diff_mock = mocker.Mock()
context_diff_mock.snapshots = {"a": snapshot}
context_diff_mock.added = set()
context_diff_mock.removed = set()
context_diff_mock.modified_snapshots = {}
context_diff_mock.new_snapshots = {snapshot.snapshot_id: snapshot}

state_reader_mock = mocker.Mock()

with pytest.raises(
PlanError,
match="Effective date can only be set for a forward-only plan.",
):
plan = Plan(context_diff_mock, state_reader_mock)
plan.effective_from = "2023-01-01"

plan = Plan(context_diff_mock, state_reader_mock, forward_only=True)

with pytest.raises(
PlanError,
match="Effective date cannot be in the future.",
):
plan.effective_from = now() + timedelta(days=1)

assert plan.effective_from is None
assert snapshot.effective_from is None

plan.effective_from = "2023-01-01"
assert plan.effective_from == "2023-01-01"
assert snapshot.effective_from == "2023-01-01"

plan.effective_from = None
assert plan.effective_from is None
assert snapshot.effective_from is None
Loading