From f8a4f7078f19a45385b95ada5b36d5f7af9504dc Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Sun, 28 Jan 2024 17:01:16 -0800 Subject: [PATCH 1/3] fix: context close on crud operations --- sqlmesh/cli/main.py | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index e4716e3b4a..29610e858b 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -320,13 +320,16 @@ def plan( select_models = kwargs.pop("select_model") or None backfill_models = kwargs.pop("backfill_model") or None context.console.verbose = verbose - context.plan( - environment, - restate_models=restate_models, - select_models=select_models, - backfill_models=backfill_models, - **kwargs, - ) + try: + context.plan( + environment, + restate_models=restate_models, + select_models=select_models, + backfill_models=backfill_models, + **kwargs, + ) + finally: + context.close() @cli.command("run") @@ -344,7 +347,10 @@ def plan( def run(ctx: click.Context, environment: t.Optional[str] = None, **kwargs: t.Any) -> None: """Evaluate missing intervals for the target environment.""" context = ctx.obj - success = context.run(environment, **kwargs) + try: + success = context.run(environment, **kwargs) + finally: + context.close() if not success: raise click.ClickException("Run DAG Failed. See output for details.") @@ -356,7 +362,10 @@ def run(ctx: click.Context, environment: t.Optional[str] = None, **kwargs: t.Any def invalidate(ctx: click.Context, environment: str) -> None: """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" context = ctx.obj - context.invalidate_environment(environment) + try: + context.invalidate_environment(environment) + finally: + context.close() @cli.command("dag") @@ -550,7 +559,10 @@ def ui( @error_handler def migrate(ctx: click.Context) -> None: """Migrate SQLMesh to the current running version.""" - ctx.obj.migrate() + try: + ctx.obj.migrate() + finally: + ctx.obj.close() @cli.command("rollback") @@ -558,7 +570,10 @@ def migrate(ctx: click.Context) -> None: @error_handler def rollback(obj: Context) -> None: """Rollback SQLMesh to the previous migration.""" - obj.rollback() + try: + obj.rollback() + finally: + obj.close() @cli.command("create_external_models") From e51d6ea409ac1205ce9d8e6352af07619763048f Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 29 Jan 2024 16:07:03 -0800 Subject: [PATCH 2/3] Use the existing error handler decorator to close the context --- sqlmesh/cli/__init__.py | 23 +++++++++++++++++++---- sqlmesh/cli/main.py | 27 +++++++++------------------ 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/sqlmesh/cli/__init__.py b/sqlmesh/cli/__init__.py index 0167b2185c..fd9c4e9e10 100644 --- a/sqlmesh/cli/__init__.py +++ b/sqlmesh/cli/__init__.py @@ -5,6 +5,7 @@ import click from sqlglot.errors import SqlglotError +from sqlmesh.core.context import Context from sqlmesh.utils import debug_mode_enabled from sqlmesh.utils.concurrency import NodeExecutionFailedError from sqlmesh.utils.errors import SQLMeshError @@ -19,15 +20,21 @@ def error_handler( func: t.Callable[..., DECORATOR_RETURN_TYPE] ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: @wraps(func) - def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE: + def wrapper(*args: t.List[t.Any], **kwargs: t.Any) -> DECORATOR_RETURN_TYPE: + context_or_obj = args[0] + sqlmesh_context = ( + context_or_obj.obj if isinstance(context_or_obj, click.Context) else context_or_obj + ) + if not isinstance(sqlmesh_context, Context): + sqlmesh_context = None handler = _debug_exception_handler if debug_mode_enabled() else _default_exception_handler - return handler(lambda: func(*args, **kwargs)) + return handler(sqlmesh_context, lambda: func(*args, **kwargs)) return wrapper def _default_exception_handler( - func: t.Callable[[], DECORATOR_RETURN_TYPE] + context: t.Optional[Context], func: t.Callable[[], DECORATOR_RETURN_TYPE] ) -> DECORATOR_RETURN_TYPE: try: return func() @@ -36,11 +43,19 @@ def _default_exception_handler( raise click.ClickException(f"Failed processing {ex.node}. {cause}") except (SQLMeshError, SqlglotError, ValueError) as ex: raise click.ClickException(str(ex)) + finally: + if context: + context.close() -def _debug_exception_handler(func: t.Callable[[], DECORATOR_RETURN_TYPE]) -> DECORATOR_RETURN_TYPE: +def _debug_exception_handler( + context: t.Optional[Context], func: t.Callable[[], DECORATOR_RETURN_TYPE] +) -> DECORATOR_RETURN_TYPE: try: return func() except Exception: logger.exception("Unhandled exception") raise + finally: + if context: + context.close() diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 29610e858b..0dd0c097c5 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -320,16 +320,13 @@ def plan( select_models = kwargs.pop("select_model") or None backfill_models = kwargs.pop("backfill_model") or None context.console.verbose = verbose - try: - context.plan( - environment, - restate_models=restate_models, - select_models=select_models, - backfill_models=backfill_models, - **kwargs, - ) - finally: - context.close() + context.plan( + environment, + restate_models=restate_models, + select_models=select_models, + backfill_models=backfill_models, + **kwargs, + ) @cli.command("run") @@ -347,10 +344,7 @@ def plan( def run(ctx: click.Context, environment: t.Optional[str] = None, **kwargs: t.Any) -> None: """Evaluate missing intervals for the target environment.""" context = ctx.obj - try: - success = context.run(environment, **kwargs) - finally: - context.close() + success = context.run(environment, **kwargs) if not success: raise click.ClickException("Run DAG Failed. See output for details.") @@ -362,10 +356,7 @@ def run(ctx: click.Context, environment: t.Optional[str] = None, **kwargs: t.Any def invalidate(ctx: click.Context, environment: str) -> None: """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" context = ctx.obj - try: - context.invalidate_environment(environment) - finally: - context.close() + context.invalidate_environment(environment) @cli.command("dag") From 6bfca9e1db60b6a7d5922b80c6bf86d89f7e0f08 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 29 Jan 2024 16:08:58 -0800 Subject: [PATCH 3/3] Remaining cleanup --- sqlmesh/cli/main.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 0dd0c097c5..e4716e3b4a 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -550,10 +550,7 @@ def ui( @error_handler def migrate(ctx: click.Context) -> None: """Migrate SQLMesh to the current running version.""" - try: - ctx.obj.migrate() - finally: - ctx.obj.close() + ctx.obj.migrate() @cli.command("rollback") @@ -561,10 +558,7 @@ def migrate(ctx: click.Context) -> None: @error_handler def rollback(obj: Context) -> None: """Rollback SQLMesh to the previous migration.""" - try: - obj.rollback() - finally: - obj.close() + obj.rollback() @cli.command("create_external_models")