diff --git a/docs/guides/models.md b/docs/guides/models.md index 67503d95bb..c43a172d60 100644 --- a/docs/guides/models.md +++ b/docs/guides/models.md @@ -91,9 +91,13 @@ Enter the backfill start date (eg. '1 year', '2020-01-01') or blank for the begi Enter the backfill end date (eg. '1 month ago', '2020-01-01') or blank to backfill up until now: Apply - Backfill Tables [y/n]: y +sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00 + All model batches have been executed successfully -sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00 +Virtually Updating 'dev' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 0:00:00 + +The target environment has been updated successfully ``` For more information, refer to [plans](../concepts/plans.md). @@ -220,7 +224,7 @@ To delete a model: All model batches have been executed successfully - sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00 + sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00 ``` **Note:** If you have other files that reference the model you wish to delete, an error message will note the file(s) containing the reference. You will need to also delete these files in order to apply the change. diff --git a/docs/quick_start.md b/docs/quick_start.md index d72a92c035..e5343002ec 100644 --- a/docs/quick_start.md +++ b/docs/quick_start.md @@ -141,7 +141,8 @@ Models needing backfill (missing dates): Enter the backfill start date (eg. '1 year', '2020-01-01') or blank for the beginning of history: Enter the backfill end date (eg. '1 month ago', '2020-01-01') or blank to backfill up until now: Apply - Backfill Tables [y/n]: y -sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00 + +sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00 All model batches have been executed successfully diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 519a980556..bd74f44363 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -40,7 +40,9 @@ class Console(abc.ABC): with them when their input is needed""" @abc.abstractmethod - def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None: + def start_snapshot_progress( + self, snapshot: Snapshot, total_batches: int, environment: str + ) -> None: """Indicates that a new load progress has begun.""" @abc.abstractmethod @@ -138,11 +140,13 @@ def _prompt(self, message: str, **kwargs: t.Any) -> t.Any: def _confirm(self, message: str, **kwargs: t.Any) -> bool: return Confirm.ask(message, console=self.console, **kwargs) - def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None: + def start_snapshot_progress( + self, snapshot: Snapshot, total_batches: int, environment: str + ) -> None: """Indicates that a new load progress has begun.""" if not self.evaluation_progress: self.evaluation_progress = Progress( - TextColumn("[bold blue]{task.fields[snapshot_name]}", justify="right"), + TextColumn("[bold blue]{task.fields[view_name]}", justify="right"), BarColumn(bar_width=40), "[progress.percentage]{task.percentage:>3.1f}%", "•", @@ -153,10 +157,11 @@ def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> Non ) self.evaluation_progress.start() self.evaluation_tasks = {} - self.evaluation_tasks[snapshot_name] = ( + view_name = snapshot.qualified_view_name.for_environment(environment) + self.evaluation_tasks[snapshot.name] = ( self.evaluation_progress.add_task( - f"Running {snapshot_name}...", - snapshot_name=snapshot_name, + f"Running {view_name}...", + view_name=view_name, total=total_batches, ), total_batches, diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index f582341671..f95a86e029 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -399,13 +399,14 @@ def run( """Run the entire dag through the scheduler. Args: - environment: The target environment to source model snapshots from. Default: prod. + environment: The target environment to source model snapshots from and virtually update. Default: prod. start: The start of the interval to render. end: The end of the interval to render. latest: The latest time used for non incremental datasets. - skip_janitor: Whether to skip the jantitor task. + skip_janitor: Whether to skip the janitor task. """ - self.scheduler(environment=environment or c.PROD).run(start, end, latest) + environment = environment or c.PROD + self.scheduler(environment=environment).run(environment, start, end, latest) if not skip_janitor: self._run_janitor() diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index cdb9918985..51006dd355 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -73,7 +73,7 @@ def evaluate(self, plan: Plan) -> None: max_workers=self.backfill_concurrent_tasks, console=self.console, ) - is_run_successful = scheduler.run(plan.start, plan.end, is_dev=plan.is_dev) + is_run_successful = scheduler.run(plan.environment.name, plan.start, plan.end) if not is_run_successful: raise SQLMeshError("Plan application failed.") diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index f404c0f0d1..ce1f38ac8b 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -4,6 +4,7 @@ import typing as t from datetime import datetime +from sqlmesh.core import constants as c from sqlmesh.core.console import Console, get_console from sqlmesh.core.snapshot import ( Snapshot, @@ -138,25 +139,25 @@ def evaluate( def run( self, + environment: str, start: t.Optional[TimeLike] = None, end: t.Optional[TimeLike] = None, latest: t.Optional[TimeLike] = None, - is_dev: bool = False, ) -> bool: """Concurrently runs all snapshots in topological order. Args: + environment: The environment the user is targeting when applying their change. start: The start of the run. Defaults to the min model start date. end: The end of the run. Defaults to now. latest: The latest datetime to use for non-incremental queries. - is_dev: Indicates whether the evaluation happens in the development mode and temporary - tables / table clones should be used where applicable. Returns: True if the execution was successful and False otherwise. """ validate_date_range(start, end) + is_dev = environment != c.PROD latest = latest or now() batches = self.batches(start, end, latest, is_dev=is_dev) dag = self._dag(batches) @@ -167,7 +168,7 @@ def run( continue visited.add(snapshot) intervals = batches[snapshot] - self.console.start_snapshot_progress(snapshot.name, len(intervals)) + self.console.start_snapshot_progress(snapshot, len(intervals), environment) def evaluate_node(node: SchedulingUnit) -> None: assert latest diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index 2040755931..9dbd38325a 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -1,6 +1,7 @@ import pytest from sqlglot import parse_one +from sqlmesh.core import constants as c from sqlmesh.core.context import Context from sqlmesh.core.scheduler import Scheduler from sqlmesh.core.snapshot import Snapshot, SnapshotFingerprint @@ -116,6 +117,7 @@ def test_run(sushi_context_pre_scheduling: Context, scheduler: Scheduler): adapter = sushi_context_pre_scheduling.engine_adapter snapshot = sushi_context_pre_scheduling.snapshots["sushi.items"] scheduler.run( + c.PROD, "2022-01-01", "2022-01-03", "2022-01-30", diff --git a/web/server/console.py b/web/server/console.py index af51f44375..e1b894d322 100644 --- a/web/server/console.py +++ b/web/server/console.py @@ -6,6 +6,7 @@ import unittest from sqlmesh.core.console import TerminalConsole +from sqlmesh.core.snapshot import Snapshot from sqlmesh.core.test import ModelTest from sqlmesh.utils.date import now_timestamp from web.server.sse import Event @@ -14,7 +15,7 @@ class ApiConsole(TerminalConsole): def __init__(self) -> None: super().__init__() - self.current_task_status: t.Dict[str, t.Dict[str, int]] = {} + self.current_task_status: t.Dict[str, t.Dict[str, t.Any]] = {} self.queue: asyncio.Queue = asyncio.Queue() def _make_event( @@ -33,12 +34,16 @@ def _make_event( data=json.dumps(payload), ) - def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None: + def start_snapshot_progress( + self, snapshot: Snapshot, total_batches: int, environment: str + ) -> None: """Indicates that a new load progress has begun.""" - self.current_task_status[snapshot_name] = { + view_name = snapshot.qualified_view_name.for_environment(environment) + self.current_task_status[snapshot.name] = { "completed": 0, "total": total_batches, "start": now_timestamp(), + "view_name": view_name, } def update_snapshot_progress(self, snapshot_name: str, num_batches: int) -> None: