From 96bf873b1bacc1541462a60e84dee218461f0d29 Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Thu, 21 May 2026 13:43:04 +0300 Subject: [PATCH 1/3] Fix: Make the janitor best effort Signed-off-by: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> --- sqlmesh/core/context.py | 50 ++++++++++------ sqlmesh/core/janitor.py | 63 ++++++++++---------- tests/core/integration/test_aux_commands.py | 64 ++++++++++++++++++++- tests/core/test_janitor.py | 61 ++++++++++++++++++-- 4 files changed, 183 insertions(+), 55 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 2861adbeda..bfe6dcb50c 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -2898,22 +2898,34 @@ def _destroy(self) -> bool: def _run_janitor(self, ignore_ttl: bool = False) -> None: current_ts = now_timestamp() + failures: t.List[str] = [] # Clean up expired environments by removing their views and schemas - self._cleanup_environments(current_ts=current_ts) - - delete_expired_snapshots( - self.state_sync, - self.snapshot_evaluator, - current_ts=current_ts, - ignore_ttl=ignore_ttl, - console=self.console, - batch_size=self.config.janitor.expired_snapshots_batch_size, + failures.extend(self._cleanup_environments(current_ts=current_ts)) + + failures.extend( + delete_expired_snapshots( + self.state_sync, + self.snapshot_evaluator, + current_ts=current_ts, + ignore_ttl=ignore_ttl, + console=self.console, + batch_size=self.config.janitor.expired_snapshots_batch_size, + ) ) self.state_sync.compact_intervals() - def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None: + if failures: + failure_string = "\n - ".join(failures) + summary = f"Janitor completed with failures:\n {failure_string}" + if self.config.janitor.warn_on_delete_failure: + self.console.log_warning(summary) + else: + raise SQLMeshError(summary) + + def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> t.List[str]: current_ts = current_ts or now_timestamp() + failures: t.List[str] = [] expired_environments_summaries = self.state_sync.get_expired_environments( current_ts=current_ts @@ -2923,15 +2935,19 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None: expired_env = self.state_reader.get_environment(expired_env_summary.name) if expired_env: - cleanup_expired_views( - default_adapter=self.engine_adapter, - engine_adapters=self.engine_adapters, - environments=[expired_env], - warn_on_delete_failure=self.config.janitor.warn_on_delete_failure, - console=self.console, + failures.extend( + cleanup_expired_views( + default_adapter=self.engine_adapter, + engine_adapters=self.engine_adapters, + environments=[expired_env], + console=self.console, + ) ) - self.state_sync.delete_expired_environments(current_ts=current_ts) + # we want to retry on the next janitor pass if drops failed + if not failures: + self.state_sync.delete_expired_environments(current_ts=current_ts) + return failures def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None: connection_name = connection_name.capitalize() diff --git a/sqlmesh/core/janitor.py b/sqlmesh/core/janitor.py index e050d6ef6c..3a9241c740 100644 --- a/sqlmesh/core/janitor.py +++ b/sqlmesh/core/janitor.py @@ -16,16 +16,16 @@ RowBoundary, ExpiredBatchRange, ) -from sqlmesh.utils.errors import SQLMeshError def cleanup_expired_views( default_adapter: EngineAdapter, engine_adapters: t.Dict[str, EngineAdapter], environments: t.List[Environment], - warn_on_delete_failure: bool = False, console: t.Optional[Console] = None, -) -> None: +) -> t.List[str]: + failures: t.List[str] = [] + expired_schema_or_catalog_environments = [ environment for environment in environments @@ -85,10 +85,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin console.update_cleanup_progress(expired_view) except Exception as e: message = f"Failed to drop the expired environment view '{expired_view}': {e}" - if warn_on_delete_failure: - logger.warning(message) - else: - raise SQLMeshError(message) from e + logger.warning(message) + failures.append(message) # Drop the schemas for the expired environments for engine_adapter, schema in schemas_to_drop: @@ -102,10 +100,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect)) except Exception as e: message = f"Failed to drop the expired environment schema '{schema}': {e}" - if warn_on_delete_failure: - logger.warning(message) - else: - raise SQLMeshError(message) from e + logger.warning(message) + failures.append(message) # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog' @@ -117,10 +113,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin console.update_cleanup_progress(catalog) except Exception as e: message = f"Failed to drop the expired environment catalog '{catalog}': {e}" - if warn_on_delete_failure: - logger.warning(message) - else: - raise SQLMeshError(message) from e + logger.warning(message) + failures.append(message) + + return failures def delete_expired_snapshots( @@ -131,7 +127,7 @@ def delete_expired_snapshots( ignore_ttl: bool = False, batch_size: t.Optional[int] = None, console: t.Optional[Console] = None, -) -> None: +) -> t.List[str]: """Delete all expired snapshots in batches. This helper function encapsulates the logic for deleting expired snapshots in batches, @@ -146,8 +142,9 @@ def delete_expired_snapshots( console: Optional console for reporting progress. Returns: - The total number of deleted expired snapshots. + List of failure messages so callers can surface them at the end of the janitor run. """ + failures: t.List[str] = [] num_expired_snapshots = 0 for batch in iter_expired_snapshot_batches( state_reader=state_sync, @@ -165,17 +162,23 @@ def delete_expired_snapshots( len(batch.expired_snapshot_ids), end_info, ) - snapshot_evaluator.cleanup( - target_snapshots=batch.cleanup_tasks, - on_complete=console.update_cleanup_progress if console else None, - ) - state_sync.delete_expired_snapshots( - batch_range=ExpiredBatchRange( - start=RowBoundary.lowest_boundary(), - end=batch.batch_range.end, - ), - ignore_ttl=ignore_ttl, - ) - logger.info("Cleaned up expired snapshots batch") - num_expired_snapshots += len(batch.expired_snapshot_ids) + try: + snapshot_evaluator.cleanup( + target_snapshots=batch.cleanup_tasks, + on_complete=console.update_cleanup_progress if console else None, + ) + state_sync.delete_expired_snapshots( + batch_range=ExpiredBatchRange( + start=RowBoundary.lowest_boundary(), + end=batch.batch_range.end, + ), + ignore_ttl=ignore_ttl, + ) + logger.info("Cleaned up expired snapshots batch") + num_expired_snapshots += len(batch.expired_snapshot_ids) + except Exception as e: + message = f"Failed to clean up an expired snapshots batch: {e}" + logger.warning(message) + failures.append(message) logger.info("Cleaned up %s expired snapshots", num_expired_snapshots) + return failures diff --git a/tests/core/integration/test_aux_commands.py b/tests/core/integration/test_aux_commands.py index 326e81e0c1..f8ac2e0f07 100644 --- a/tests/core/integration/test_aux_commands.py +++ b/tests/core/integration/test_aux_commands.py @@ -15,6 +15,7 @@ ModelDefaultsConfig, DuckDBConnectionConfig, ) +from sqlmesh.core.config.janitor import JanitorConfig from sqlmesh.core.context import Context from sqlmesh.core.model import ( SqlModel, @@ -146,7 +147,8 @@ def setup_scenario(): # Case 2: Assume that the view cleanup yields an error, the enviroment # record should still exist mocker.patch( - "sqlmesh.core.context.cleanup_expired_views", side_effect=Exception("view cleanup error") + "sqlmesh.core.context.cleanup_expired_views", + return_value=["view cleanup error"], ) ctx, model1_snapshot = setup_scenario() @@ -157,13 +159,71 @@ def setup_scenario(): assert ctx.state_sync.get_environment("dev") # - Run the janitor again, this time it should succeed - mocker.patch("sqlmesh.core.context.cleanup_expired_views") + mocker.patch("sqlmesh.core.context.cleanup_expired_views", return_value=[]) ctx._run_janitor(ignore_ttl=True) # - Check that the environment record does not exist in the state sync anymore assert not ctx.state_sync.get_environment("dev") +def test_janitor_aggregates_failures_into_single_error(mocker: MockerFixture, tmp_path: Path): + models_dir = tmp_path / "models" + models_dir.mkdir() + (models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col") + + ctx = Context( + paths=[tmp_path], + config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")), + ) + ctx.plan("dev", no_prompts=True, auto_apply=True) + ctx.invalidate_environment("dev") + + mocker.patch( + "sqlmesh.core.context.cleanup_expired_views", + return_value=["view drop error A", "view drop error B"], + ) + mocker.patch( + "sqlmesh.core.janitor.iter_expired_snapshot_batches", + return_value=iter([]), + ) + + with pytest.raises(SQLMeshError, match="Janitor completed with failures"): + ctx._run_janitor(ignore_ttl=True) + + +def test_janitor_warn_on_delete_failure_downgrades_aggregated_error( + mocker: MockerFixture, tmp_path: Path +): + models_dir = tmp_path / "models" + models_dir.mkdir() + (models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col") + + ctx = Context( + paths=[tmp_path], + config=Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + janitor=JanitorConfig(warn_on_delete_failure=True), + ), + ) + ctx.plan("dev", no_prompts=True, auto_apply=True) + ctx.invalidate_environment("dev") + + mocker.patch( + "sqlmesh.core.context.cleanup_expired_views", + return_value=["view drop error"], + ) + mocker.patch( + "sqlmesh.core.janitor.iter_expired_snapshot_batches", + return_value=iter([]), + ) + + warn_spy = mocker.patch.object(ctx.console, "log_warning") + + ctx._run_janitor(ignore_ttl=True) + assert warn_spy.called + assert "Janitor completed with failures" in warn_spy.call_args[0][0] + + @use_terminal_console def test_destroy(copy_to_temp_path): # Testing project with two gateways to verify cleanup is performed across engines diff --git a/tests/core/test_janitor.py b/tests/core/test_janitor.py index e5e209f2cc..282336fb06 100644 --- a/tests/core/test_janitor.py +++ b/tests/core/test_janitor.py @@ -22,7 +22,6 @@ ) from sqlmesh.core.janitor import cleanup_expired_views, delete_expired_snapshots from sqlmesh.utils.date import now_timestamp -from sqlmesh.utils.errors import SQLMeshError pytestmark = pytest.mark.slow @@ -101,7 +100,7 @@ def test_cleanup_expired_views(mocker: MockerFixture, make_snapshot: t.Callable) @pytest.mark.parametrize( "suffix_target", [EnvironmentSuffixTarget.SCHEMA, EnvironmentSuffixTarget.TABLE] ) -def test_cleanup_expired_environment_schema_warn_on_delete_failure( +def test_cleanup_expired_views_collects_failures( mocker: MockerFixture, make_snapshot: t.Callable, suffix_target: EnvironmentSuffixTarget ): adapter = mocker.MagicMock() @@ -124,10 +123,10 @@ def test_cleanup_expired_environment_schema_warn_on_delete_failure( catalog_name_override="catalog_override", ) - with pytest.raises(SQLMeshError, match="Failed to drop the expired environment .*"): - cleanup_expired_views(adapter, {}, [schema_environment], warn_on_delete_failure=False) - - cleanup_expired_views(adapter, {}, [schema_environment], warn_on_delete_failure=True) + # Janitor is now best-effort: failures are returned, not raised. + failures = cleanup_expired_views(adapter, {}, [schema_environment]) + assert len(failures) == 1 + assert "Failed to drop the expired environment" in failures[0] if suffix_target == EnvironmentSuffixTarget.SCHEMA: assert adapter.drop_schema.called @@ -135,6 +134,56 @@ def test_cleanup_expired_environment_schema_warn_on_delete_failure( assert adapter.drop_view.called +def test_cleanup_expired_views_continues_past_failures( + mocker: MockerFixture, make_snapshot: t.Callable +): + adapter = mocker.MagicMock() + adapter.dialect = None + + snapshot_failing = make_snapshot( + SqlModel(name="catalog.schema.failing", query=parse_one("select 1")) + ) + snapshot_failing.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_succeeding = make_snapshot( + SqlModel(name="catalog.schema.succeeding", query=parse_one("select 1")) + ) + snapshot_succeeding.categorize_as(SnapshotChangeCategory.BREAKING) + + failing_env = Environment( + name="failing_env", + suffix_target=EnvironmentSuffixTarget.TABLE, + snapshots=[snapshot_failing.table_info], + start_at="2022-01-01", + end_at="2022-01-01", + plan_id="p", + previous_plan_id="p", + ) + succeeding_env = Environment( + name="succeeding_env", + suffix_target=EnvironmentSuffixTarget.TABLE, + snapshots=[snapshot_succeeding.table_info], + start_at="2022-01-01", + end_at="2022-01-01", + plan_id="p", + previous_plan_id="p", + ) + + def drop_view_side_effect(view, ignore_if_not_exists=True): + if "failing" in str(view): + raise Exception("boom") + + adapter.drop_view.side_effect = drop_view_side_effect + + failures = cleanup_expired_views(adapter, {}, [failing_env, succeeding_env]) + + # Both drops were attempted + assert adapter.drop_view.call_count == 2 + + # Only the failing one is reported + assert len(failures) == 1 + assert "failing" in failures[0] + + def test_delete_expired_snapshots_common_function_batching( state_sync: EngineAdapterStateSync, make_snapshot: t.Callable, mocker: MockerFixture ): From cc82ab0f4ef5285d64089742bb0ac7fb9abcfbb3 Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Thu, 21 May 2026 22:59:11 +0300 Subject: [PATCH 2/3] add force-delete flag to delete from state even on failures Signed-off-by: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> --- sqlmesh/cli/main.py | 10 ++- sqlmesh/core/context.py | 22 +++++-- sqlmesh/core/janitor.py | 33 ++++++---- sqlmesh/core/snapshot/evaluator.py | 6 +- tests/core/integration/test_aux_commands.py | 73 +++++++++++++++++++++ 5 files changed, 123 insertions(+), 21 deletions(-) diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index ec5acbea59..dd3adaa687 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -631,16 +631,22 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None: is_flag=True, help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire", ) +@click.option( + "--force-delete", + is_flag=True, + help="Delete expired environment and snapshot state records even when the physical table or view drops fail. " + "Any objects that could not be dropped become orphaned and must be removed manually.", +) @click.pass_context @error_handler @cli_analytics -def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None: +def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None: """ Run the janitor process on-demand. The janitor cleans up old environments and expired snapshots. """ - ctx.obj.run_janitor(ignore_ttl, **kwargs) + ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs) @cli.command("destroy") diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index bfe6dcb50c..4eb0d3b40b 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -887,12 +887,12 @@ def _has_environment_changed() -> bool: return completion_status @python_api_analytics - def run_janitor(self, ignore_ttl: bool) -> bool: + def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool: success = False if self.console.start_cleanup(ignore_ttl): try: - self._run_janitor(ignore_ttl) + self._run_janitor(ignore_ttl, force_delete=force_delete) success = True finally: self.console.stop_cleanup(success=success) @@ -2896,12 +2896,14 @@ def _destroy(self) -> bool: return True - def _run_janitor(self, ignore_ttl: bool = False) -> None: + def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None: current_ts = now_timestamp() failures: t.List[str] = [] # Clean up expired environments by removing their views and schemas - failures.extend(self._cleanup_environments(current_ts=current_ts)) + failures.extend( + self._cleanup_environments(current_ts=current_ts, force_delete=force_delete) + ) failures.extend( delete_expired_snapshots( @@ -2909,6 +2911,7 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None: self.snapshot_evaluator, current_ts=current_ts, ignore_ttl=ignore_ttl, + force_delete=force_delete, console=self.console, batch_size=self.config.janitor.expired_snapshots_batch_size, ) @@ -2918,12 +2921,16 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None: if failures: failure_string = "\n - ".join(failures) summary = f"Janitor completed with failures:\n {failure_string}" + if force_delete: + summary += "\nState records have been deleted, but the underlying objects may still exist in the database.\nPlease investigate and clean up manually the above if necessary." if self.config.janitor.warn_on_delete_failure: self.console.log_warning(summary) else: raise SQLMeshError(summary) - def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> t.List[str]: + def _cleanup_environments( + self, current_ts: t.Optional[int] = None, force_delete: bool = False + ) -> t.List[str]: current_ts = current_ts or now_timestamp() failures: t.List[str] = [] @@ -2944,8 +2951,9 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> t.List[st ) ) - # we want to retry on the next janitor pass if drops failed - if not failures: + # we want to retry on the next janitor pass if drops failed, unless + # force_delete is set in which case we purge state records regardless + if not failures or force_delete: self.state_sync.delete_expired_environments(current_ts=current_ts) return failures diff --git a/sqlmesh/core/janitor.py b/sqlmesh/core/janitor.py index 3a9241c740..92d889e276 100644 --- a/sqlmesh/core/janitor.py +++ b/sqlmesh/core/janitor.py @@ -125,6 +125,7 @@ def delete_expired_snapshots( *, current_ts: int, ignore_ttl: bool = False, + force_delete: bool = False, batch_size: t.Optional[int] = None, console: t.Optional[Console] = None, ) -> t.List[str]: @@ -138,6 +139,7 @@ def delete_expired_snapshots( snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots. current_ts: Timestamp used to evaluate expiration. ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced). + force_delete: If True, delete snapshot state records even when physical table cleanup fails. batch_size: Maximum number of snapshots to fetch per batch. console: Optional console for reporting progress. @@ -162,23 +164,32 @@ def delete_expired_snapshots( len(batch.expired_snapshot_ids), end_info, ) + cleanup_succeeded = True try: snapshot_evaluator.cleanup( target_snapshots=batch.cleanup_tasks, on_complete=console.update_cleanup_progress if console else None, ) - state_sync.delete_expired_snapshots( - batch_range=ExpiredBatchRange( - start=RowBoundary.lowest_boundary(), - end=batch.batch_range.end, - ), - ignore_ttl=ignore_ttl, - ) - logger.info("Cleaned up expired snapshots batch") - num_expired_snapshots += len(batch.expired_snapshot_ids) - except Exception as e: - message = f"Failed to clean up an expired snapshots batch: {e}" + except Exception as failed_drops: + message = f"Failed to clean up: {failed_drops}" logger.warning(message) failures.append(message) + cleanup_succeeded = False + + if cleanup_succeeded or force_delete: + try: + state_sync.delete_expired_snapshots( + batch_range=ExpiredBatchRange( + start=RowBoundary.lowest_boundary(), + end=batch.batch_range.end, + ), + ignore_ttl=ignore_ttl, + ) + logger.info("Cleaned up expired snapshots batch") + num_expired_snapshots += len(batch.expired_snapshot_ids) + except Exception as e: + message = f"Failed to delete expired snapshot state records: {e}" + logger.warning(message) + failures.append(message) logger.info("Cleaned up %s expired snapshots", num_expired_snapshots) return failures diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 4df9ecb695..497763533b 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -547,7 +547,7 @@ def cleanup( t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets } with self.concurrent_context(): - concurrent_apply_to_snapshots( + errors, _ = concurrent_apply_to_snapshots( [t.snapshot for t in filtered_targets], lambda s: self._cleanup_snapshot( s, @@ -557,7 +557,11 @@ def cleanup( ), self.ddl_concurrent_tasks, reverse_order=True, + raise_on_error=False, ) + if errors: + errored_snapshots = "\n".join(f" {e.node.name}: {e.__cause__}" for e in errors) + raise SQLMeshError(f"\n{errored_snapshots}") def audit( self, diff --git a/tests/core/integration/test_aux_commands.py b/tests/core/integration/test_aux_commands.py index f8ac2e0f07..7de585576d 100644 --- a/tests/core/integration/test_aux_commands.py +++ b/tests/core/integration/test_aux_commands.py @@ -224,6 +224,79 @@ def test_janitor_warn_on_delete_failure_downgrades_aggregated_error( assert "Janitor completed with failures" in warn_spy.call_args[0][0] +def test_janitor_force_delete_removes_environment_state_despite_drop_failure( + mocker: MockerFixture, tmp_path: Path +): + models_dir = tmp_path / "models" + models_dir.mkdir() + (models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col") + + ctx = Context( + paths=[tmp_path], + config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")), + ) + ctx.plan("dev", no_prompts=True, auto_apply=True) + ctx.invalidate_environment("dev") + + mocker.patch( + "sqlmesh.core.context.cleanup_expired_views", + return_value=["view drop error"], + ) + mocker.patch( + "sqlmesh.core.janitor.iter_expired_snapshot_batches", + return_value=iter([]), + ) + + # without force_delete the environment is retained for retry + with pytest.raises(SQLMeshError): + ctx._run_janitor(ignore_ttl=True, force_delete=False) + assert ctx.state_sync.get_environment("dev") is not None + + # with force_delete the environment state is purged even though drops failed + with pytest.raises(SQLMeshError): + ctx._run_janitor(ignore_ttl=True, force_delete=True) + assert ctx.state_sync.get_environment("dev") is None + + +def test_janitor_force_delete_removes_snapshot_state_despite_cleanup_failure( + mocker: MockerFixture, tmp_path: Path +): + models_dir = tmp_path / "models" + models_dir.mkdir() + model1_path = models_dir / "model1.sql" + model1_path.write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col") + + # using warn_on_delete_failure so the janitor completes and we can inspect the state after + ctx = Context( + paths=[tmp_path], + config=Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + janitor=JanitorConfig(warn_on_delete_failure=True), + ), + ) + ctx.plan("dev", no_prompts=True, auto_apply=True) + model1_snapshot = ctx.get_snapshot("test.model1") + + # simulating a zombie snapshot + model1_path.unlink() + ctx.load() + ctx.plan("dev", no_prompts=True, auto_apply=True) + ctx.invalidate_environment("dev") + + mocker.patch( + "sqlmesh.core.snapshot.evaluator.SnapshotEvaluator.cleanup", + side_effect=Exception("table cleanup error"), + ) + + # without force_delete the snapshot state is retained for retry + ctx._run_janitor(ignore_ttl=True, force_delete=False) + assert ctx.state_sync.get_snapshots([model1_snapshot.snapshot_id]) # type: ignore + + # with force_delete the snapshot state record is purged even though cleanup failed + ctx._run_janitor(ignore_ttl=True, force_delete=True) + assert not ctx.state_sync.get_snapshots([model1_snapshot.snapshot_id]) # type: ignore + + @use_terminal_console def test_destroy(copy_to_temp_path): # Testing project with two gateways to verify cleanup is performed across engines From c6f8eb47277ed3aec51da69241ceb2bf6c08381b Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Thu, 21 May 2026 23:41:09 +0300 Subject: [PATCH 3/3] adapt test Signed-off-by: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> --- tests/core/test_snapshot_evaluator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index d7aa9e4a80..2b07e94fd6 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -570,12 +570,12 @@ def test_cleanup_fails(adapter_mock, make_snapshot): snapshot.version = "test_version" evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) - with pytest.raises(NodeExecutionFailedError) as exc_info: + with pytest.raises(SQLMeshError) as exc_info: evaluator.cleanup( [SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)] ) - assert str(exc_info.value.__cause__) == "test_error" + assert "test_error" in str(exc_info.value) def test_cleanup_skip_missing_table(adapter_mock, make_snapshot):