Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/guides/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ Enter the backfill start date (eg. '1 year', '2020-01-01') or blank for the begi
Enter the backfill end date (eg. '1 month ago', '2020-01-01') or blank to backfill up until now:
Apply - Backfill Tables [y/n]: y

sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00

All model batches have been executed successfully

sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
Virtually Updating 'dev' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 0:00:00

The target environment has been updated successfully
```

For more information, refer to [plans](../concepts/plans.md).
Expand Down Expand Up @@ -220,7 +224,7 @@ To delete a model:

All model batches have been executed successfully

sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00
```

**Note:** If you have other files that reference the model you wish to delete, an error message will note the file(s) containing the reference. You will need to also delete these files in order to apply the change.
Expand Down
3 changes: 2 additions & 1 deletion docs/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ Models needing backfill (missing dates):
Enter the backfill start date (eg. '1 year', '2020-01-01') or blank for the beginning of history:
Enter the backfill end date (eg. '1 month ago', '2020-01-01') or blank to backfill up until now:
Apply - Backfill Tables [y/n]: y
sqlmesh_example.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00

sqlmesh_example__dev.example_incremental_model ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:00

All model batches have been executed successfully

Expand Down
17 changes: 11 additions & 6 deletions sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class Console(abc.ABC):
with them when their input is needed"""

@abc.abstractmethod
def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None:
def start_snapshot_progress(
self, snapshot: Snapshot, total_batches: int, environment: str
) -> None:
"""Indicates that a new load progress has begun."""

@abc.abstractmethod
Expand Down Expand Up @@ -138,11 +140,13 @@ def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
def _confirm(self, message: str, **kwargs: t.Any) -> bool:
return Confirm.ask(message, console=self.console, **kwargs)

def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None:
def start_snapshot_progress(
self, snapshot: Snapshot, total_batches: int, environment: str
) -> None:
"""Indicates that a new load progress has begun."""
if not self.evaluation_progress:
self.evaluation_progress = Progress(
TextColumn("[bold blue]{task.fields[snapshot_name]}", justify="right"),
TextColumn("[bold blue]{task.fields[view_name]}", justify="right"),
BarColumn(bar_width=40),
"[progress.percentage]{task.percentage:>3.1f}%",
"•",
Expand All @@ -153,10 +157,11 @@ def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> Non
)
self.evaluation_progress.start()
self.evaluation_tasks = {}
self.evaluation_tasks[snapshot_name] = (
view_name = snapshot.qualified_view_name.for_environment(environment)
self.evaluation_tasks[snapshot.name] = (
self.evaluation_progress.add_task(
f"Running {snapshot_name}...",
snapshot_name=snapshot_name,
f"Running {view_name}...",
view_name=view_name,
total=total_batches,
),
total_batches,
Expand Down
7 changes: 4 additions & 3 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,14 @@ def run(
"""Run the entire dag through the scheduler.

Args:
environment: The target environment to source model snapshots from. Default: prod.
environment: The target environment to source model snapshots from and virtually update. Default: prod.
start: The start of the interval to render.
end: The end of the interval to render.
latest: The latest time used for non incremental datasets.
skip_janitor: Whether to skip the jantitor task.
skip_janitor: Whether to skip the janitor task.
"""
self.scheduler(environment=environment or c.PROD).run(start, end, latest)
environment = environment or c.PROD
self.scheduler(environment=environment).run(environment, start, end, latest)

if not skip_janitor:
self._run_janitor()
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def evaluate(self, plan: Plan) -> None:
max_workers=self.backfill_concurrent_tasks,
console=self.console,
)
is_run_successful = scheduler.run(plan.start, plan.end, is_dev=plan.is_dev)
is_run_successful = scheduler.run(plan.environment.name, plan.start, plan.end)
if not is_run_successful:
raise SQLMeshError("Plan application failed.")

Expand Down
9 changes: 5 additions & 4 deletions sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import typing as t
from datetime import datetime

from sqlmesh.core import constants as c
from sqlmesh.core.console import Console, get_console
from sqlmesh.core.snapshot import (
Snapshot,
Expand Down Expand Up @@ -138,25 +139,25 @@ def evaluate(

def run(
self,
environment: str,
start: t.Optional[TimeLike] = None,
end: t.Optional[TimeLike] = None,
latest: t.Optional[TimeLike] = None,
is_dev: bool = False,
) -> bool:
"""Concurrently runs all snapshots in topological order.

Args:
environment: The environment the user is targeting when applying their change.
start: The start of the run. Defaults to the min model start date.
end: The end of the run. Defaults to now.
latest: The latest datetime to use for non-incremental queries.
is_dev: Indicates whether the evaluation happens in the development mode and temporary
tables / table clones should be used where applicable.

Returns:
True if the execution was successful and False otherwise.
"""
validate_date_range(start, end)

is_dev = environment != c.PROD
latest = latest or now()
batches = self.batches(start, end, latest, is_dev=is_dev)
dag = self._dag(batches)
Expand All @@ -167,7 +168,7 @@ def run(
continue
visited.add(snapshot)
intervals = batches[snapshot]
self.console.start_snapshot_progress(snapshot.name, len(intervals))
self.console.start_snapshot_progress(snapshot, len(intervals), environment)

def evaluate_node(node: SchedulingUnit) -> None:
assert latest
Expand Down
2 changes: 2 additions & 0 deletions tests/core/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from sqlglot import parse_one

from sqlmesh.core import constants as c
from sqlmesh.core.context import Context
from sqlmesh.core.scheduler import Scheduler
from sqlmesh.core.snapshot import Snapshot, SnapshotFingerprint
Expand Down Expand Up @@ -116,6 +117,7 @@ def test_run(sushi_context_pre_scheduling: Context, scheduler: Scheduler):
adapter = sushi_context_pre_scheduling.engine_adapter
snapshot = sushi_context_pre_scheduling.snapshots["sushi.items"]
scheduler.run(
c.PROD,
"2022-01-01",
"2022-01-03",
"2022-01-30",
Expand Down
11 changes: 8 additions & 3 deletions web/server/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import unittest

from sqlmesh.core.console import TerminalConsole
from sqlmesh.core.snapshot import Snapshot
from sqlmesh.core.test import ModelTest
from sqlmesh.utils.date import now_timestamp
from web.server.sse import Event
Expand All @@ -14,7 +15,7 @@
class ApiConsole(TerminalConsole):
def __init__(self) -> None:
super().__init__()
self.current_task_status: t.Dict[str, t.Dict[str, int]] = {}
self.current_task_status: t.Dict[str, t.Dict[str, t.Any]] = {}
self.queue: asyncio.Queue = asyncio.Queue()

def _make_event(
Expand All @@ -33,12 +34,16 @@ def _make_event(
data=json.dumps(payload),
)

def start_snapshot_progress(self, snapshot_name: str, total_batches: int) -> None:
def start_snapshot_progress(
self, snapshot: Snapshot, total_batches: int, environment: str
) -> None:
"""Indicates that a new load progress has begun."""
self.current_task_status[snapshot_name] = {
view_name = snapshot.qualified_view_name.for_environment(environment)
self.current_task_status[snapshot.name] = {
"completed": 0,
"total": total_batches,
"start": now_timestamp(),
"view_name": view_name,
}

def update_snapshot_progress(self, snapshot_name: str, num_batches: int) -> None:
Expand Down