From dc19c5825c45b34a82df49f07f1c06378ea32273 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 29 Dec 2022 13:33:53 -0800 Subject: [PATCH 1/3] Introduce forward-only plans --- sqlmesh/cli/main.py | 5 ++ sqlmesh/core/console.py | 85 ++++++++-------------------- sqlmesh/core/context.py | 11 ++-- sqlmesh/core/plan/definition.py | 57 ++++++++++++++++--- sqlmesh/core/plan/evaluator.py | 11 +--- sqlmesh/core/snapshot/definition.py | 6 +- sqlmesh/magics.py | 6 ++ tests/core/test_integration.py | 22 ++++---- tests/core/test_plan.py | 86 +++++++++++++++++++++++++++++ tests/core/test_plan_evaluator.py | 1 + 10 files changed, 196 insertions(+), 94 deletions(-) create mode 100644 tests/core/test_plan.py diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index ceea2f2743..a1fe93ee5a 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -169,6 +169,11 @@ def diff(ctx, environment: t.Optional[str] = None) -> None: is_flag=True, help="Skip the backfill step.", ) +@click.option( + "--forward-only", + is_flag=True, + help="Create a plan for forward-only changes.", +) @click.option( "--no-prompts", is_flag=True, diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index e49600cd8f..ce837bca7e 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -13,7 +13,6 @@ from rich.syntax import Syntax from rich.tree import Tree -from sqlmesh.core import constants as c from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory from sqlmesh.core.test import ModelTest from sqlmesh.utils import rich as srich @@ -31,7 +30,7 @@ SNAPSHOT_CHANGE_CATEGORY_STR = { SnapshotChangeCategory.BREAKING: "Breaking", SnapshotChangeCategory.NON_BREAKING: "Non-breaking", - SnapshotChangeCategory.NO_CHANGE: "No change", + SnapshotChangeCategory.FORWARD_ONLY: "Forward-only", } @@ -231,9 +230,7 @@ def plan(self, plan: Plan, auto_apply: bool) -> None: plan: The plan to make choices for. auto_apply: Whether to automatically apply the plan after all choices have been made. """ - unbounded_end = ( - plan.context_diff.environment == c.PROD and plan.is_unbounded_end - ) + unbounded_end = not plan.is_dev and plan.is_unbounded_end self._prompt_categorize(plan, auto_apply) self._show_options_after_categorization( plan, auto_apply, unbounded_end=unbounded_end @@ -379,29 +376,16 @@ def loading_stop(self, id: uuid.UUID) -> None: def _get_snapshot_change_category( self, snapshot: Snapshot, plan: Plan, auto_apply: bool ) -> None: - if plan.indirectly_modified[snapshot.name]: - choices = self._snapshot_change_choices(snapshot) - response = Prompt.ask( - "\n".join( - [f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())] - ), - console=self.console, - show_choices=False, - choices=[f"{i+1}" for i in range(len(choices))], - ) - choice = list(choices)[int(response) - 1] - elif not snapshot.is_materialized: - choice = ( - SnapshotChangeCategory.BREAKING - if Confirm.ask( - f"Does this change require a backfill of [direct]{snapshot.name}[/direct]?", - console=self.console, - ) - else SnapshotChangeCategory.NO_CHANGE - ) - else: - choice = SnapshotChangeCategory.NON_BREAKING - + choices = self._snapshot_change_choices(snapshot) + response = Prompt.ask( + "\n".join( + [f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())] + ), + console=self.console, + show_choices=False, + choices=[f"{i+1}" for i in range(len(choices))], + ) + choice = list(choices)[int(response) - 1] plan.set_choice(snapshot, choice) def _snapshot_change_choices( @@ -421,13 +405,12 @@ def _snapshot_change_choices( elif snapshot.is_embedded_kind: choices = { SnapshotChangeCategory.BREAKING: f"Backfill {indirect}", - SnapshotChangeCategory.NO_CHANGE: f"Don't backfill {indirect}", + SnapshotChangeCategory.NON_BREAKING: f"Don't backfill {indirect}", } else: choices = { SnapshotChangeCategory.BREAKING: f"Backfill {direct} and {indirect}", SnapshotChangeCategory.NON_BREAKING: f"Backfill {direct} but not {indirect}", - SnapshotChangeCategory.NO_CHANGE: f"Don't backfill {direct} or {indirect}", } labeled_choices = { k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}" for k, v in choices.items() @@ -561,7 +544,7 @@ def unbounded_end_callback(change): unbounded_end_date_widget = ( [_checkbox("Unbounded End Date", unbounded_end, unbounded_end_callback)] - if plan.environment.name == c.PROD + if not plan.is_dev else [] ) @@ -618,41 +601,21 @@ def radio_button_selected(change): plan.set_choice(snapshot, choices[change["owner"].index]) self._show_options_after_categorization(plan) - if plan.indirectly_modified[snapshot.name]: - choice_mapping = self._snapshot_change_choices( - snapshot, use_rich_formatting=False - ) - radio = widgets.RadioButtons( - options=choice_mapping.values(), - layout={"width": "max-content"}, - disabled=False, - ) - else: - if snapshot.is_view_kind or snapshot.is_embedded_kind: - choice_mapping = { - SnapshotChangeCategory.NON_BREAKING: f"Update {snapshot.name}" - } - else: - choice_mapping = { - SnapshotChangeCategory.NON_BREAKING: f"Backfill {snapshot.name}", - SnapshotChangeCategory.NO_CHANGE: f"Don't backfill {snapshot.name}", - } - choice_mapping = { - k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}" - for k, v in choice_mapping.items() - } - radio = widgets.RadioButtons( - options=choice_mapping.values(), - layout={"width": "max-content"}, - disables=False, - ) - + choice_mapping = self._snapshot_change_choices( + snapshot, use_rich_formatting=False + ) choices = list(choice_mapping) + plan.set_choice(snapshot, choices[0]) + + radio = widgets.RadioButtons( + options=choice_mapping.values(), + layout={"width": "max-content"}, + disabled=False, + ) radio.observe( radio_button_selected, "value", ) - plan.set_choice(snapshot, choices[0]) self.display(radio) def log_test_results( diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 2099bc1a1f..83fc7ad536 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -67,7 +67,7 @@ from sqlmesh.core.user import User from sqlmesh.utils import UniqueKeyDict, sys_path from sqlmesh.utils.dag import DAG -from sqlmesh.utils.date import TimeLike, now, yesterday_ds +from sqlmesh.utils.date import TimeLike, yesterday_ds from sqlmesh.utils.errors import ( ConfigError, MissingDependencyError, @@ -572,6 +572,7 @@ def plan( restate_from: t.Optional[t.Iterable[str]] = None, no_gaps: bool = False, skip_backfill: bool = False, + forward_only: bool = False, no_prompts: bool = False, auto_apply: bool = False, ) -> Plan: @@ -595,6 +596,7 @@ def plan( part of the target environment have no data gaps when compared against previous snapshots for same models. skip_backfill: Whether to skip the backfill step. Default: False. + forward_only: Whether the purpose of the plan is to make forward only changes. no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False. @@ -637,13 +639,10 @@ def plan( restate_from=restate_from, no_gaps=no_gaps, skip_backfill=skip_backfill, + is_dev=environment != c.PROD, + forward_only=forward_only, ) - if environment != c.PROD and not end: - # Set default end after plan creation to make sure the prompt for the end date - # still shows up. - plan.end = now() - if not no_prompts: self.console.plan(plan, auto_apply) elif auto_apply: diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 82ffb8bc6b..052e5a881d 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -18,7 +18,7 @@ from sqlmesh.utils import random_id from sqlmesh.utils.dag import DAG from sqlmesh.utils.date import TimeLike, make_inclusive, now, to_ds, validate_date_range -from sqlmesh.utils.errors import SQLMeshError +from sqlmesh.utils.errors import PlanError, SQLMeshError from sqlmesh.utils.pydantic import PydanticModel SnapshotMapping = t.Dict[str, t.Set[str]] @@ -39,6 +39,8 @@ class Plan: part of the target environment have no data gaps when compared against previous snapshots for same models. skip_backfill: Whether to skip the backfill step. + is_dev: Whether this plan is for development purposes. + forward_only: Whether the purpose of the plan is to make forward only changes. """ def __init__( @@ -52,6 +54,8 @@ def __init__( restate_from: t.Optional[t.Iterable[str]] = None, no_gaps: bool = False, skip_backfill: bool = False, + is_dev: bool = False, + forward_only: bool = False, ): self.context_diff = context_diff self.override_start = start is not None @@ -60,8 +64,10 @@ def __init__( self.restatements = set() self.no_gaps = no_gaps self.skip_backfill = skip_backfill + self.is_dev = is_dev + self.forward_only = forward_only self._start = start - self._end = end + self._end = end if end or not is_dev else now() self._apply = apply self._dag = dag self._state_reader = state_reader @@ -205,9 +211,12 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> None """Sets a snapshot version based on the user choice. Args: - snapshot: The snapshot to version. - choice: The user decision on how to version the snapshot and it's children. + snapshot: The target snapshot. + choice: The user decision on how to version the target snapshot and its children. """ + if self.forward_only: + raise PlanError("Choice setting is not supported by a forward-only plan.") + snapshot.change_category = choice if choice in ( SnapshotChangeCategory.BREAKING, @@ -256,11 +265,11 @@ def snapshot_change_category(self, snapshot: Snapshot) -> SnapshotChangeCategory ) if snapshot.name not in self.context_diff.modified_snapshots: - return SnapshotChangeCategory.NO_CHANGE + raise SQLMeshError(f"Snapshot {snapshot.snapshot_id} has not been modified") current, previous = self.context_diff.modified_snapshots[snapshot.name] if current.version == previous.version: - return SnapshotChangeCategory.NO_CHANGE + return SnapshotChangeCategory.FORWARD_ONLY if current.data_hash_matches(previous): return SnapshotChangeCategory.BREAKING @@ -303,8 +312,19 @@ def _categorize_snapshots(self) -> t.Tuple[t.List[Snapshot], SnapshotMapping]: snapshot = self.context_diff.snapshots[model_name] if model_name in self.context_diff.modified_snapshots: + if self.forward_only: + # In case of the forward only plan any modifications result in reuse of the + # previous version. + snapshot.set_version(snapshot.previous_version) + + upstream_model_names = self._dag.upstream(model_name) + if self.context_diff.directly_modified(model_name): added_and_directly_modified.append(snapshot) + if not self.forward_only: + self._ensure_no_paused_forward_only_upstream( + model_name, upstream_model_names + ) else: all_indirectly_modified.add(model_name) @@ -314,11 +334,16 @@ def _categorize_snapshots(self) -> t.Tuple[t.List[Snapshot], SnapshotMapping]: if not snapshot.version and not any( self.context_diff.directly_modified(upstream) and not self.context_diff.snapshots[upstream].version - for upstream in self._dag.upstream(model_name) + for upstream in upstream_model_names ): snapshot.set_version() elif model_name in self.context_diff.added: + if self.forward_only: + raise PlanError( + "New models can't be added as part of the forward-only plan." + ) + snapshot.set_version() added_and_directly_modified.append(snapshot) @@ -334,6 +359,24 @@ def _categorize_snapshots(self) -> t.Tuple[t.List[Snapshot], SnapshotMapping]: indirectly_modified, ) + def _ensure_no_paused_forward_only_upstream( + self, model_name: str, upstream_model_names: t.Iterable[str] + ) -> None: + for upstream in upstream_model_names: + upstream_snapshot = self.context_diff.snapshots[upstream] + if ( + upstream_snapshot.version + and upstream_snapshot.is_forward_only + and upstream_snapshot.is_paused + ): + raise PlanError( + f"Modified model '{model_name}' depends on a paused forward-only snapshot {upstream_snapshot.snapshot_id}. " + "Possible remedies: " + "1) make sure your codebase is up-to-date; " + f"2) promote the snapshot {upstream_snapshot.snapshot_id} in the production environment; " + "3) recreate this plan in a forward-only mode." + ) + class PlanStatus(str, Enum): STARTED = "started" diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 4a30689868..d0f9771103 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -16,7 +16,6 @@ import abc import typing as t -from sqlmesh.core import constants as c from sqlmesh.core._typing import NotificationTarget from sqlmesh.core.console import Console, get_console from sqlmesh.core.plan.definition import Plan @@ -45,10 +44,6 @@ def evaluate(self, plan: Plan) -> None: plan: The plan to evaluate. """ - def _is_dev_plan(self, plan: Plan) -> bool: - """Returns True if the given plan is for development purposes.""" - return plan.environment.name != c.PROD - class BuiltInPlanEvaluator(PlanEvaluator): def __init__( @@ -85,7 +80,7 @@ def evaluate(self, plan: Plan) -> None: max_workers=self.backfill_concurrent_tasks, console=self.console, ) - scheduler.run(plan.start, plan.end, is_dev=self._is_dev_plan(plan)) + scheduler.run(plan.start, plan.end, is_dev=plan.is_dev) self._promote(plan) @@ -132,7 +127,7 @@ def _promote(self, plan: Plan) -> None: self.snapshot_evaluator.promote( added, environment=environment.name, - is_dev=self._is_dev_plan(plan), + is_dev=plan.is_dev, ) self.snapshot_evaluator.demote( removed, @@ -181,7 +176,7 @@ def evaluate(self, plan: Plan) -> None: backfill_concurrent_tasks=self.backfill_concurrent_tasks, ddl_concurrent_tasks=self.ddl_concurrent_tasks, users=self.users, - is_dev=self._is_dev_plan(plan), + is_dev=plan.is_dev, ) if self.blocking: diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 0cb77b6833..35def30a5c 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -36,7 +36,7 @@ class SnapshotChangeCategory(IntEnum): BREAKING = 1 NON_BREAKING = 2 - NO_CHANGE = 3 + FORWARD_ONLY = 3 class FingerprintMixin: @@ -594,6 +594,10 @@ def is_embedded_kind(self) -> bool: def is_materialized(self) -> bool: return self.model.kind.is_materialized + @property + def is_paused(self) -> bool: + return self.unpaused_ts is None + def _ensure_version(self) -> None: if not self.version: raise SQLMeshError( diff --git a/sqlmesh/magics.py b/sqlmesh/magics.py index f7c856d6d0..91bcc08e34 100644 --- a/sqlmesh/magics.py +++ b/sqlmesh/magics.py @@ -191,6 +191,11 @@ def test(self, line: str, test_def_raw: t.Optional[str] = None): action="store_true", help="Skip the backfill step.", ) + @argument( + "--forward-only", + action="store_true", + help="Create a plan for forward-only changes.", + ) @argument( "--no-prompts", action="store_true", @@ -220,6 +225,7 @@ def plan(self, line) -> None: restate_from=args.restate_from, no_gaps=args.no_gaps, skip_backfill=args.skip_backfill, + forward_only=args.forward_only, no_prompts=args.no_prompts, auto_apply=args.auto_apply, ) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 499657359c..5312fcfdcd 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -102,7 +102,7 @@ def test_no_change(sushi_context: Context): environment = "dev" initial_add(sushi_context, environment) validate_query_change( - sushi_context, environment, SnapshotChangeCategory.NO_CHANGE, True + sushi_context, environment, SnapshotChangeCategory.FORWARD_ONLY, True ) @@ -169,7 +169,7 @@ def validate_query_change( if change_category == SnapshotChangeCategory.BREAKING and not logical: models_same = not_modified models_different = directly_modified + indirectly_modified - elif change_category == SnapshotChangeCategory.NO_CHANGE: + elif change_category == SnapshotChangeCategory.FORWARD_ONLY: models_same = not_modified + directly_modified + indirectly_modified models_different = [] else: @@ -353,7 +353,7 @@ def test_environment_promotion(sushi_context: Context): DataType.Type.DOUBLE, DataType.Type.FLOAT, ) - apply_to_environment(sushi_context, "dev", SnapshotChangeCategory.NO_CHANGE) + apply_to_environment(sushi_context, "dev", SnapshotChangeCategory.FORWARD_ONLY) # Promote to prod def _validate_plan(context, plan): @@ -373,7 +373,7 @@ def _validate_plan(context, plan): plan.snapshot_change_category( plan.context_diff.modified_snapshots["sushi.customer_revenue_by_day"][0] ) - == SnapshotChangeCategory.NO_CHANGE + == SnapshotChangeCategory.FORWARD_ONLY ) apply_to_environment( @@ -548,23 +548,23 @@ def setup_rebase( @pytest.mark.parametrize( "change_categories, expected", [ - ([SnapshotChangeCategory.NO_CHANGE], SnapshotChangeCategory.NO_CHANGE), + ([SnapshotChangeCategory.FORWARD_ONLY], SnapshotChangeCategory.FORWARD_ONLY), ([SnapshotChangeCategory.NON_BREAKING], SnapshotChangeCategory.NON_BREAKING), ([SnapshotChangeCategory.BREAKING], SnapshotChangeCategory.BREAKING), ( - [SnapshotChangeCategory.NO_CHANGE, SnapshotChangeCategory.NO_CHANGE], - SnapshotChangeCategory.NO_CHANGE, + [SnapshotChangeCategory.FORWARD_ONLY, SnapshotChangeCategory.FORWARD_ONLY], + SnapshotChangeCategory.FORWARD_ONLY, ), ( - [SnapshotChangeCategory.NO_CHANGE, SnapshotChangeCategory.NON_BREAKING], + [SnapshotChangeCategory.FORWARD_ONLY, SnapshotChangeCategory.NON_BREAKING], SnapshotChangeCategory.NON_BREAKING, ), ( - [SnapshotChangeCategory.NO_CHANGE, SnapshotChangeCategory.BREAKING], + [SnapshotChangeCategory.FORWARD_ONLY, SnapshotChangeCategory.BREAKING], SnapshotChangeCategory.BREAKING, ), ( - [SnapshotChangeCategory.NON_BREAKING, SnapshotChangeCategory.NO_CHANGE], + [SnapshotChangeCategory.NON_BREAKING, SnapshotChangeCategory.FORWARD_ONLY], SnapshotChangeCategory.NON_BREAKING, ), ( @@ -576,7 +576,7 @@ def setup_rebase( SnapshotChangeCategory.BREAKING, ), ( - [SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.NO_CHANGE], + [SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.FORWARD_ONLY], SnapshotChangeCategory.BREAKING, ), ( diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py new file mode 100644 index 0000000000..db9d005b1f --- /dev/null +++ b/tests/core/test_plan.py @@ -0,0 +1,86 @@ +import pytest +from pytest_mock.plugin import MockerFixture +from sqlglot import parse_one + +from sqlmesh.core.model import Model +from sqlmesh.core.plan import Plan +from sqlmesh.core.snapshot import SnapshotChangeCategory, SnapshotDataVersion +from sqlmesh.utils.dag import DAG +from sqlmesh.utils.errors import PlanError + + +def test_forward_only_plan_sets_version(make_snapshot, mocker: MockerFixture): + snapshot_a = make_snapshot(Model(name="a", query=parse_one("select 1, ds"))) + snapshot_a.set_version() + + snapshot_b = make_snapshot(Model(name="b", query=parse_one("select 2, ds"))) + snapshot_b.previous_versions = ( + SnapshotDataVersion(fingerprint="test_fingerprint", version="test_version"), + ) + assert not snapshot_b.version + + dag = DAG[str]({"b": {"a"}}) + + context_diff_mock = mocker.Mock() + context_diff_mock.snapshots = {"a": snapshot_a, "b": snapshot_b} + context_diff_mock.added = {} + context_diff_mock.modified_snapshots = {"b", (snapshot_b, snapshot_b)} + + state_reader_mock = mocker.Mock() + + plan = Plan(context_diff_mock, dag, state_reader_mock, forward_only=True) + + assert snapshot_b.version == "test_version" + + # Make sure that the choice can't be set manually. + with pytest.raises( + PlanError, match="Choice setting is not supported by a forward-only plan." + ): + plan.set_choice(snapshot_b, SnapshotChangeCategory.BREAKING) + + +def test_forward_only_plan_new_models_not_allowed(make_snapshot, mocker: MockerFixture): + snapshot_a = make_snapshot(Model(name="a", query=parse_one("select 1, ds"))) + snapshot_a.set_version() + + dag = DAG[str]({"a": set()}) + + context_diff_mock = mocker.Mock() + context_diff_mock.snapshots = {"a": snapshot_a} + context_diff_mock.added = {"a"} + context_diff_mock.modified_snapshots = {} + + state_reader_mock = mocker.Mock() + + with pytest.raises( + PlanError, match="New models can't be added as part of the forward-only plan." + ): + Plan(context_diff_mock, dag, state_reader_mock, forward_only=True) + + +def test_paused_forward_only_parent(make_snapshot, mocker: MockerFixture): + snapshot_a = make_snapshot(Model(name="a", query=parse_one("select 1, ds"))) + snapshot_a.previous_versions = ( + SnapshotDataVersion( + fingerprint="test_fingerprint", version="test_version", change_category=None + ), + ) + snapshot_a.set_version(snapshot_a.previous_version) + + snapshot_b = make_snapshot(Model(name="b", query=parse_one("select 2, ds"))) + assert not snapshot_b.version + + dag = DAG[str]({"b": {"a"}}) + + context_diff_mock = mocker.Mock() + context_diff_mock.snapshots = {"a": snapshot_a, "b": snapshot_b} + context_diff_mock.added = {} + context_diff_mock.modified_snapshots = {"b", (snapshot_b, snapshot_b)} + + state_reader_mock = mocker.Mock() + + with pytest.raises( + PlanError, + match=r"Modified model 'b' depends on a paused forward-only snapshot.*", + ): + Plan(context_diff_mock, dag, state_reader_mock, forward_only=False) diff --git a/tests/core/test_plan_evaluator.py b/tests/core/test_plan_evaluator.py index b08a9e3321..60c6e1d6da 100644 --- a/tests/core/test_plan_evaluator.py +++ b/tests/core/test_plan_evaluator.py @@ -19,6 +19,7 @@ def sushi_plan(sushi_context: Context, mocker: MockerFixture) -> Plan: sushi_context._context_diff("dev"), dag=sushi_context.dag, state_reader=sushi_context.state_reader, + is_dev=True, ) From c9ccd50da94dd037d7fbac830bf7fc253c836ea2 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 29 Dec 2022 13:45:18 -0800 Subject: [PATCH 2/3] Cosmetic fix --- sqlmesh/core/plan/definition.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 052e5a881d..24169fd82d 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -256,16 +256,18 @@ def snapshot_change_category(self, snapshot: Snapshot) -> SnapshotChangeCategory """ if snapshot not in self.snapshots: raise SQLMeshError( - f"Snapshot {snapshot.snapshot_id} does not exist in this plan" + f"Snapshot {snapshot.snapshot_id} does not exist in this plan." ) if not snapshot.version: raise SQLMeshError( - f"Snapshot {snapshot.snapshot_id} has not be categorized yet" + f"Snapshot {snapshot.snapshot_id} has not be categorized yet." ) if snapshot.name not in self.context_diff.modified_snapshots: - raise SQLMeshError(f"Snapshot {snapshot.snapshot_id} has not been modified") + raise SQLMeshError( + f"Snapshot {snapshot.snapshot_id} has not been modified." + ) current, previous = self.context_diff.modified_snapshots[snapshot.name] if current.version == previous.version: From bd3434909c3be6dd0fefcfd0f243dce4db313054 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 29 Dec 2022 14:13:08 -0800 Subject: [PATCH 3/3] Improve the error message --- sqlmesh/core/plan/definition.py | 4 ++-- tests/core/test_plan.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 24169fd82d..7aa44ba94d 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -372,10 +372,10 @@ def _ensure_no_paused_forward_only_upstream( and upstream_snapshot.is_paused ): raise PlanError( - f"Modified model '{model_name}' depends on a paused forward-only snapshot {upstream_snapshot.snapshot_id}. " + f"Modified model '{model_name}' depends on a paused version of model '{upstream}'. " "Possible remedies: " "1) make sure your codebase is up-to-date; " - f"2) promote the snapshot {upstream_snapshot.snapshot_id} in the production environment; " + f"2) promote the current version of model '{upstream}' in the production environment; " "3) recreate this plan in a forward-only mode." ) diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index db9d005b1f..fdd2c10577 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -81,6 +81,6 @@ def test_paused_forward_only_parent(make_snapshot, mocker: MockerFixture): with pytest.raises( PlanError, - match=r"Modified model 'b' depends on a paused forward-only snapshot.*", + match=r"Modified model 'b' depends on a paused version of model 'a'.*", ): Plan(context_diff_mock, dag, state_reader_mock, forward_only=False)