Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ Usage: sqlmesh invalidate [OPTIONS] ENVIRONMENT
of the janitor process.

Options:
--help Show this message and exit.
-s, --sync Wait for the environment to be deleted before returning. If not
specified, the environment will be deleted asynchronously by the
janitor process. This option requires a connection to the data
warehouse.
--help Show this message and exit.
```

## migrate
Expand Down
10 changes: 8 additions & 2 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,18 @@ def run(ctx: click.Context, environment: t.Optional[str] = None, **kwargs: t.Any

@cli.command("invalidate")
@click.argument("environment", required=True)
@click.option(
"--sync",
"-s",
is_flag=True,
help="Wait for the environment to be deleted before returning. If not specified, the environment will be deleted asynchronously by the janitor process. This option requires a connection to the data warehouse.",
)
@click.pass_context
@error_handler
def invalidate(ctx: click.Context, environment: str) -> None:
def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
"""Invalidate the target environment, forcing its removal during the next run of the janitor process."""
context = ctx.obj
context.invalidate_environment(environment)
context.invalidate_environment(environment, **kwargs)


@cli.command("dag")
Expand Down
17 changes: 13 additions & 4 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,14 +1063,20 @@ def apply(
NotificationEvent.APPLY_END, environment=plan.environment_naming_info.name
)

def invalidate_environment(self, name: str) -> None:
def invalidate_environment(self, name: str, sync: bool = False) -> None:
"""Invalidates the target environment by setting its expiration timestamp to now.

Args:
name: The name of the environment to invalidate.
sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
be deleted asynchronously by the janitor process.
"""
self.state_sync.invalidate_environment(name)
self.console.log_success(f"Environment '{name}' has been invalidated.")
if sync:
self._cleanup_environments()
self.console.log_success(f"Environment '{name}' has been deleted.")
else:
self.console.log_success(f"Environment '{name}' has been invalidated.")

def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> None:
"""Show a diff of the current context with a given environment.
Expand Down Expand Up @@ -1670,15 +1676,18 @@ def _context_diff(
)

def _run_janitor(self) -> None:
expired_environments = self.state_sync.delete_expired_environments()
cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)
self._cleanup_environments()
expired_snapshots = self.state_sync.delete_expired_snapshots()
self.snapshot_evaluator.cleanup(
expired_snapshots, on_complete=self.console.update_cleanup_progress
)

self.state_sync.compact_intervals()

def _cleanup_environments(self) -> None:
expired_environments = self.state_sync.delete_expired_environments()
cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)

def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None:
connection_name = connection_name.capitalize()
try:
Expand Down