Skip to content

Fix async endpoints when using multiple workers#725

Merged
rapids-bot[bot] merged 161 commits intoNVIDIA:developfrom
dagardner-nv:david-async-generate-223-dask-313-sqa
Sep 10, 2025
Merged

Fix async endpoints when using multiple workers#725
rapids-bot[bot] merged 161 commits intoNVIDIA:developfrom
dagardner-nv:david-async-generate-223-dask-313-sqa

Conversation

@dagardner-nv
Copy link
Contributor

@dagardner-nv dagardner-nv commented Aug 27, 2025

Description

  • Replace Starlette background tasks with Dask.
  • Persist the JobStore datastore to a database (SQLite by default but can be specified as something else).
  • Dask and SQLAlchemy are optional dependencies (async_endpoints extra), however if uninstalled, the async endpoints are disabled.
  • Work-around nest_asyncio package creates incompatibilities with newer versions of Python #770 by using an older version of Dask.

Closes #313

By Submitting this PR I confirm:

  • I am familiar with the Contributing Guidelines.
  • We require that all contributors "sign-off" on their commits. This certifies that the contribution is your original work, or you have rights to submit it under the same license, or a compatible license.
    • Any contribution which contains commits that are not Signed-Off will not be accepted.
  • When the PR is ready for review, new or existing tests cover these changes.
  • When the PR is ready for review, the documentation is up to date with these changes.

Summary by CodeRabbit

  • New Features

    • Optional "async_endpoints" install adds Dask-backed async job processing: automatic LocalCluster fallback, persistent DB-backed job store with background cleanup, new front-end config options for scheduler/db URL, worker env wiring, conditional async endpoints, and optional sync-timeout for immediate job completion.
  • Tests

    • Expanded async test suite, fixtures, and helpers covering job lifecycle, expiry, cleanup, Dask/DB integration, and added test data.
  • Chores

    • Improved startup validation, clearer import/config error handling, utilities for config/class loading, and packaging of optional dependencies.

Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
…g dir, instead simply require a yaml file extension

Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
…nc-generate-223

Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
…nc-generate-223

Signed-off-by: David Gardner <dagardner@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py (1)

188-201: Fix logging filter leak; remove filter instance correctly.

You create a new LogFilter on removal, so the added filter is never removed. Memory leak and persistent log suppression.

Apply:

-        logging.getLogger("uvicorn.access").addFilter(LogFilter(logs_to_suppress))
+        access_logger = logging.getLogger("uvicorn.access")
+        filt = LogFilter(logs_to_suppress)
+        access_logger.addFilter(filt)
         try:
             response = await call_next(request)
         finally:
-            logging.getLogger("uvicorn.access").removeFilter(LogFilter(logs_to_suppress))
+            access_logger.removeFilter(filt)
♻️ Duplicate comments (15)
tests/nat/front_ends/fastapi/test_job_store.py (3)

243-248: Same Enum vs string concern here.

Apply the same casting as above for RUNNING comparisons.

-    assert job.status == JobStatus.RUNNING
+    assert JobStatus(job.status) == JobStatus.RUNNING

264-268: Same Enum vs string concern for FAILURE.

-    assert job.status == JobStatus.FAILURE
+    assert JobStatus(job.status) == JobStatus.FAILURE

284-292: Same Enum vs string concern for SUCCESS and JSON decode.

-    assert job.status == JobStatus.SUCCESS
+    assert JobStatus(job.status) == JobStatus.SUCCESS
src/nat/front_ends/fastapi/main.py (1)

31-33: Add a concise docstring for get_app.

Public APIs in src/ require docstrings.

-def get_app() -> "FastAPI":
+def get_app() -> "FastAPI":
+    """Create and return the NAT FastAPI application configured from environment variables."""
src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py (1)

739-747: Don’t log raw outputs; use logger.exception and redact body.
Avoid leaking potentially sensitive output content; TRY400 prefers exception().

-                except json.JSONDecodeError:
-                    logger.error("Failed to parse job output as JSON: %s", job_output)
-                    job_output = {"error": "Output parsing failed"}
+                except json.JSONDecodeError:
+                    logger.exception("Failed to parse job output as JSON for job %s", job.job_id)
+                    job_output = None
src/nat/front_ends/fastapi/job_store.py (10)

31-35: Catch Dask’s TimeoutError.
Import and use distributed TimeoutError.

 from dask.distributed import Client as DaskClient
 from dask.distributed import Future
 from dask.distributed import Variable
 from dask.distributed import fire_and_forget
+from dask.distributed import TimeoutError as DaskTimeoutError

36-38: Use Text for potentially large output column.
Also prep import for status type change below.

-from sqlalchemy import DateTime
-from sqlalchemy import String
+from sqlalchemy import DateTime
+from sqlalchemy import String, Text

121-133: Fix ORM typing mismatch for status and prevent output truncation.
Status column is String; map as str and store enum.value. Output should be Text.

-    status: Mapped[JobStatus] = mapped_column(String(11))
+    status: Mapped[str] = mapped_column(String(11))
@@
-    output: Mapped[str] = mapped_column(nullable=True)
+    output: Mapped[str] = mapped_column(Text, nullable=True)

159-160: Annotate ACTIVE_STATUS as ClassVar and keep enums.
Prevents instance attribute confusion and clarifies intent.

-    ACTIVE_STATUS = {JobStatus.RUNNING, JobStatus.SUBMITTED}
+    ACTIVE_STATUS: typing.ClassVar[set[JobStatus]] = {JobStatus.RUNNING, JobStatus.SUBMITTED}

314-321: Handle Dask timeout correctly and remove assert.
Catch DaskTimeoutError; avoid assert in production.

-                try:
-                    _ = await future.result(timeout=sync_timeout)
-                    job = await self.get_job(job_id)
-                    assert job is not None, "Job should exist after future result"
-                    return (job_id, job)
-                except TimeoutError:
-                    pass
+                try:
+                    await future.result(timeout=sync_timeout)
+                    job = await self.get_job(job_id)
+                    if job is None:
+                        logger.error("Job %s completed but not found in store", job_id)
+                        return (job_id, None)
+                    return (job_id, job)
+                except DaskTimeoutError:
+                    pass

470-475: Compare against stored string value in get_jobs_by_status.
Current where clause compares Enum to String column.

-        stmt = select(JobInfo).where(JobInfo.status == status)
+        stmt = select(JobInfo).where(JobInfo.status == status.value)

495-503: Normalize status before ACTIVE check.
Otherwise active jobs may expire improperly.

-        if job.status in self.ACTIVE_STATUS:
+        if JobStatus(job.status) in self.ACTIVE_STATUS:
             return None

514-517: Use notin_ and compare to enum values.
SQLAlchemy API is notin_(), and column stores strings.

-        stmt = select(JobInfo).where(
-            and_(JobInfo.is_expired == sa_expr.false(),
-                 JobInfo.status.not_in(self.ACTIVE_STATUS))).order_by(JobInfo.updated_at.desc())
+        stmt = select(JobInfo).where(
+            and_(
+                JobInfo.is_expired == sa_expr.false(),
+                JobInfo.status.notin_([s.value for s in self.ACTIVE_STATUS]),
+            )
+        ).order_by(JobInfo.updated_at.desc())

549-551: Also catch Dask TimeoutError when reading Variable.

-                        except TimeoutError:
+                        except DaskTimeoutError:
                             pass

575-584: Don’t delete existing SQLite DB by default.
Overwriting breaks persistence across runs and multi-process setups.

             db_file = os.path.join(dot_tmp_dir, "job_store.db")
-            if os.path.exists(db_file):
-                logger.warning("Database file %s already exists, it will be overwritten.", db_file)
-                os.remove(db_file)

Optional: enable WAL to reduce “database is locked” under load.

🧹 Nitpick comments (19)
src/nat/front_ends/fastapi/dask_client_mixin.py (4)

25-25: Add a concise class docstring.

Public classes in src/ need Google-style docstrings.

 class DaskClientMixin(ABC):
+    """Mixin that provides an async context manager for obtaining and closing a Dask `Client`."""

25-25: Drop ABC or add an abstract contract (ruff B024).

This class has no abstract members; either remove ABC or add a small abstract method/property.

-class DaskClientMixin(ABC):
+class DaskClientMixin:

27-29: Fix AsyncGenerator type to include the send type.

Use AsyncGenerator[Client, None] for accuracy.

-    async def client(self, address: str) -> AsyncGenerator["Client"]:
+    async def client(self, address: str) -> AsyncGenerator["Client", None]:

16-20: Log lifecycle events and failures.

Add a module logger for client creation/closure messages.

-import typing
+import typing
+import logging
@@
 if typing.TYPE_CHECKING:
     from dask.distributed import Client
+
+logger = logging.getLogger(__name__)
tests/nat/front_ends/fastapi/test_job_store.py (3)

111-118: Be explicit about Enum/string status comparison.

JobInfo.status column is declared as String in ORM; tests comparing to JobStatus may fail. Cast status in assertions or switch the column to sqlalchemy.Enum in the model.

Example test-side change:

-    assert job.status == JobStatus.SUBMITTED
+    assert JobStatus(job.status) == JobStatus.SUBMITTED

If you prefer model-side fix (outside this file), convert status to sa.Enum(JobStatus, native_enum=False) so ORM returns JobStatus.


589-596: Reduce sleeps or mark as slow for stability.

3s sleeps make tests slow/flaky on CI. Consider marking with @pytest.mark.slow or tightening expiry and sleep values.


759-769: Accessing scheduler address on async Client can vary by Dask version.

If this flakes, consider asserting the address substring is present or that the client is connected (e.g., await client.scheduler_info()).

src/nat/front_ends/fastapi/main.py (1)

47-50: Use TypeError for issubclass check failures.

Signals a type mismatch more clearly; keep the message as-is.

-        if (not issubclass(front_end_worker_class, FastApiFrontEndPluginWorkerBase)):
-            raise ValueError(
+        if not issubclass(front_end_worker_class, FastApiFrontEndPluginWorkerBase):
+            raise TypeError(
                 f"Front end worker {front_end_worker_full_name} is not a subclass of FastApiFrontEndPluginWorker.")
src/nat/front_ends/fastapi/fastapi_front_end_plugin.py (6)

36-45: Add class docstring and return types on new methods.

Document the plugin behavior and annotate methods with -> None.

-class FastApiFrontEndPlugin(DaskClientMixin, FrontEndBase[FastApiFrontEndConfig]):
+class FastApiFrontEndPlugin(DaskClientMixin, FrontEndBase[FastApiFrontEndConfig]):
+    """FastAPI front-end runner that wires Dask and the persistent JobStore for async endpoints."""
@@
-    def __init__(self, full_config: "Config"):
+    def __init__(self, full_config: "Config"):
         super().__init__(full_config)
@@
-        self._cleanup_future = None
+        self._cleanup_future = None  # type: ignore[assignment]

And add return annotations below (see per-method comments).


81-89: Prefer passed argument over instance attribute; add return type.

Use the scheduler_address argument for clarity and annotate return type.

-    async def _submit_cleanup_task(self, scheduler_address: str, db_url: str):
+    async def _submit_cleanup_task(self, scheduler_address: str, db_url: str) -> None:
@@
-        async with self.client(self._scheduler_address) as client:
+        async with self.client(scheduler_address) as client:
             self._cleanup_future = client.submit(self._periodic_cleanup,
-                                                 scheduler_address=self._scheduler_address,
+                                                 scheduler_address=scheduler_address,
                                                  db_url=db_url,
                                                  log_level=logger.getEffectiveLevel())

131-136: Fix typo in comment.

-                # Set environment variabls such that the worker subprocesses will know how to connect to dask and to
+                # Set environment variables such that the worker subprocesses will know how to connect to dask and to

92-101: Add return annotation and a short docstring to run().

-    async def run(self):
+    async def run(self) -> None:
+        """Start the FastAPI server, wiring Dask and persistent JobStore if configured."""

195-206: Close LocalCluster robustly (sync vs async).

Handle both sync and async close methods; keep shutdown resilient.

         finally:
             logger.debug("Shutting down")
             if self._cleanup_future is not None:
                 logger.info("Cancelling periodic cleanup task.")
                 # Use the scheduler address, because self._cluster is None if an external cluster is used
                 async with self.client(self._scheduler_address) as client:
                     await client.cancel([self._cleanup_future], asynchronous=True, force=True)
 
             if self._cluster is not None:
                 # Only shut down the cluster if we created it
                 logger.info("Closing Local Dask cluster.")
-                self._cluster.close()
+                try:
+                    close_fn = getattr(self._cluster, "close", None)
+                    if close_fn is None:
+                        return
+                    result = close_fn()
+                    if asyncio.iscoroutine(result):
+                        await result
+                except Exception:
+                    logger.exception("Failed to close Local Dask cluster")

207-210: Avoid duplicating exception message in logger.exception.

logger.exception includes traceback automatically; format message without f"{e}".

-            except OSError as e:
-                logger.exception(f"Warning: Failed to delete temp file {config_file_name}: {e}")
+            except OSError:
+                logger.exception("Failed to delete temp file %s", config_file_name)
src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py (4)

186-190: Docstring typo.
“supreses” → “suppresses”.

-        Intercepts authentication request and supreses logs that contain sensitive data.
+        Intercepts authentication request and suppresses logs that contain sensitive data.

269-276: Remove TODO or link to tracked issue.
Inline TODOs are disallowed by guidelines; convert to a comment referencing an issue or remove.

-        # TODO: Find another way to limit the number of concurrent evaluations
+        # Note: consider limiting concurrent evaluations via Dask worker resources (tracked in <issue-link>).

549-555: Fix typo in field description.
“sectonds” → “seconds”.

-                    description="Attempt to perform the job synchronously up until `sync_timeout` sectonds, "
+                    description="Attempt to perform the job synchronously up until `sync_timeout` seconds, "

1101-1101: Sort all for RUF022.

-__all__ = ["FastApiFrontEndPluginWorkerBase", "FastApiFrontEndPluginWorker", "RouteInfo"]
+__all__ = ["FastApiFrontEndPluginWorker", "FastApiFrontEndPluginWorkerBase", "RouteInfo"]
src/nat/front_ends/fastapi/job_store.py (1)

602-602: Sort all for RUF022.

-__all__ = ["get_db_engine", "JobInfo", "JobStatus", "JobStore"]
+__all__ = ["JobInfo", "JobStatus", "JobStore", "get_db_engine"]
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9722f82 and bef16aa.

📒 Files selected for processing (6)
  • src/nat/front_ends/fastapi/dask_client_mixin.py (1 hunks)
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py (4 hunks)
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py (12 hunks)
  • src/nat/front_ends/fastapi/job_store.py (3 hunks)
  • src/nat/front_ends/fastapi/main.py (2 hunks)
  • tests/nat/front_ends/fastapi/test_job_store.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
src/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

src/**/*.py: All importable Python code must live under src/
All public APIs in src/ require Python 3.11+ type hints on parameters and return values; prefer typing/collections.abc abstractions; use typing.Annotated when useful
Provide Google-style docstrings for every public module, class, function, and CLI command; first line concise with a period; surround code entities with backticks

Files:

  • src/nat/front_ends/fastapi/main.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
  • src/nat/front_ends/fastapi/job_store.py
  • src/nat/front_ends/fastapi/dask_client_mixin.py
src/nat/**/*

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Core functionality under src/nat should prioritize backward compatibility when changed

Files:

  • src/nat/front_ends/fastapi/main.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
  • src/nat/front_ends/fastapi/job_store.py
  • src/nat/front_ends/fastapi/dask_client_mixin.py

⚙️ CodeRabbit configuration file

This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.

Files:

  • src/nat/front_ends/fastapi/main.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
  • src/nat/front_ends/fastapi/job_store.py
  • src/nat/front_ends/fastapi/dask_client_mixin.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bare raise and log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial

Files:

  • src/nat/front_ends/fastapi/main.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
  • src/nat/front_ends/fastapi/job_store.py
  • src/nat/front_ends/fastapi/dask_client_mixin.py
  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks

Files:

  • src/nat/front_ends/fastapi/main.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
  • src/nat/front_ends/fastapi/job_store.py
  • src/nat/front_ends/fastapi/dask_client_mixin.py
  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm

Files:

  • src/nat/front_ends/fastapi/main.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
  • src/nat/front_ends/fastapi/job_store.py
  • src/nat/front_ends/fastapi/dask_client_mixin.py
  • tests/nat/front_ends/fastapi/test_job_store.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass
  • For Python exception handling, ensure proper stack trace preservation:
    • When re-raising exceptions: use bare raise statements to maintain the original stack trace,
      and use logger.error() (not logger.exception()) to avoid duplicate stack trace output.
    • When catching and logging exceptions without re-raising: always use logger.exception()
      to capture the full stack trace information.

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,

and should contain an Apache License 2.0 header comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • src/nat/front_ends/fastapi/main.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
  • src/nat/front_ends/fastapi/job_store.py
  • src/nat/front_ends/fastapi/dask_client_mixin.py
  • tests/nat/front_ends/fastapi/test_job_store.py
tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py

⚙️ CodeRabbit configuration file

tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using the test_ prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using the fixture_ prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
🧬 Code graph analysis (6)
src/nat/front_ends/fastapi/main.py (1)
src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py (2)
  • FastApiFrontEndPluginWorkerBase (85-211)
  • config (119-120)
src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py (4)
src/nat/runtime/loader.py (1)
  • load_workflow (97-117)
src/nat/front_ends/fastapi/job_store.py (12)
  • session (198-217)
  • JobInfo (89-135)
  • JobStatus (58-82)
  • update_status (327-377)
  • get_status (414-432)
  • ensure_job_id (219-234)
  • submit_job (270-325)
  • get_expires_at (477-503)
  • get_job (397-412)
  • get_last_job (434-453)
  • get_all_jobs (379-395)
  • get_jobs_by_status (455-475)
src/nat/front_ends/fastapi/fastapi_front_end_config.py (1)
  • EvaluateRequest (50-94)
src/nat/front_ends/fastapi/response_helpers.py (1)
  • generate_single_response (108-117)
src/nat/front_ends/fastapi/fastapi_front_end_plugin.py (5)
src/nat/front_ends/fastapi/dask_client_mixin.py (2)
  • DaskClientMixin (25-43)
  • client (28-43)
src/nat/front_ends/fastapi/fastapi_front_end_config.py (1)
  • FastApiFrontEndConfig (136-257)
src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py (3)
  • FastApiFrontEndPluginWorkerBase (85-211)
  • config (119-120)
  • front_end_config (123-124)
src/nat/front_ends/fastapi/main.py (1)
  • get_app (31-64)
src/nat/front_ends/fastapi/job_store.py (5)
  • JobStore (138-558)
  • cleanup_expired_jobs (505-558)
  • client (184-195)
  • Base (85-86)
  • get_db_engine (561-598)
src/nat/front_ends/fastapi/job_store.py (1)
src/nat/front_ends/fastapi/dask_client_mixin.py (2)
  • DaskClientMixin (25-43)
  • client (28-43)
src/nat/front_ends/fastapi/dask_client_mixin.py (1)
src/nat/front_ends/fastapi/job_store.py (1)
  • client (184-195)
tests/nat/front_ends/fastapi/test_job_store.py (1)
src/nat/front_ends/fastapi/job_store.py (15)
  • JobStore (138-558)
  • ensure_job_id (219-234)
  • JobStatus (58-82)
  • _create_job (236-268)
  • get_job (397-412)
  • submit_job (270-325)
  • update_status (327-377)
  • JobInfo (89-135)
  • get_status (414-432)
  • get_all_jobs (379-395)
  • get_last_job (434-453)
  • get_jobs_by_status (455-475)
  • get_expires_at (477-503)
  • cleanup_expired_jobs (505-558)
  • get_db_engine (561-598)
🪛 Ruff (0.12.2)
src/nat/front_ends/fastapi/main.py

48-49: Prefer TypeError exception for invalid type

(TRY004)


48-49: Abstract raise to an inner function

(TRY301)


48-49: Avoid specifying long messages outside the exception class

(TRY003)

src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py

103-103: Avoid specifying long messages outside the exception class

(TRY003)


106-107: Avoid specifying long messages outside the exception class

(TRY003)


114-114: Avoid specifying long messages outside the exception class

(TRY003)


745-745: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


957-957: Avoid specifying long messages outside the exception class

(TRY003)


1101-1101: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

src/nat/front_ends/fastapi/job_store.py

159-159: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


171-171: Avoid specifying long messages outside the exception class

(TRY003)


319-319: Consider moving this statement to an else block

(TRY300)


359-359: Avoid specifying long messages outside the exception class

(TRY003)


602-602: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

src/nat/front_ends/fastapi/dask_client_mixin.py

25-25: DaskClientMixin is an abstract base class, but it has no abstract methods or properties

(B024)

tests/nat/front_ends/fastapi/test_job_store.py

43-43: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (2)
tests/nat/front_ends/fastapi/test_job_store.py (1)

371-373: Same Enum vs string concern in get_status test is fine; here you already return JobStatus.

No change needed.

src/nat/front_ends/fastapi/fastapi_front_end_plugin.py (1)

100-117: Minor: import guard message is good; consider documenting LocalCluster creation.

Optionally note in a comment that LocalCluster is only created when Dask is installed and no address is provided.

Signed-off-by: David Gardner <dagardner@nvidia.com>
@dagardner-nv dagardner-nv removed the breaking Breaking change label Sep 9, 2025
…avid-async-generate-223-dask-313-sqa

Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
…avid-async-generate-223-dask-313-sqa

Signed-off-by: David Gardner <dagardner@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (8)
tests/nat/front_ends/fastapi/test_job_store.py (8)

19-19: Remove unused import.

tempfile isn't used; drop it to satisfy ruff F401.

-import tempfile

41-44: Silence TRY003 on intentional test exception message.

Keep the descriptive message but ignore TRY003 locally.

 async def failing_job_function() -> None:
     """Function that raises an exception for testing error handling."""
-    raise ValueError("This job is designed to fail")
+    raise ValueError("This job is designed to fail")  # noqa: TRY003 - explicit test message

46-55: Avoid asserting on private attributes; use the public context manager.

This test peeks at _session. Prefer a black‑box check.

     job_store = JobStore(scheduler_address=dask_scheduler_address, db_engine=db_engine)
 
     assert job_store._scheduler_address == dask_scheduler_address
-    assert job_store._session is not None
+    async with job_store.session() as session:
+        assert session is not None

57-66: Avoid asserting on private attributes; use the public context manager.

Same as above for the db_url path.

     job_store = JobStore(scheduler_address=dask_scheduler_address, db_url=db_url)
 
     assert job_store._scheduler_address == dask_scheduler_address
-    assert job_store._session is not None
+    async with job_store.session() as session:
+        assert session is not None

612-669: Use monkeypatch for MIN_EXPIRY and relax brittle assertion on removed artifacts.

  • Don’t mutate a global permanently; patch MIN_EXPIRY per test.
  • Cleanup may remove one or both artifacts depending on timing; assert “>= 1” to avoid flakes.
  • Shorten the sleep to speed up.
-async def test_cleanup_expired_jobs_with_output_files(db_engine: "AsyncEngine",
-                                                      dask_scheduler_address: str,
-                                                      tmp_path: Path):
+async def test_cleanup_expired_jobs_with_output_files(
+    db_engine: "AsyncEngine",
+    dask_scheduler_address: str,
+    tmp_path: Path,
+    monkeypatch: pytest.MonkeyPatch,
+):
@@
-    JobStore.MIN_EXPIRY = 1  # Lower minimum expiry for testing
+    # Lower minimum expiry for this test only
+    monkeypatch.setattr(JobStore, "MIN_EXPIRY", 1, raising=True)
@@
-    # Wait for jobs to expire
-    await asyncio.sleep(3)
+    # Wait for jobs to expire
+    await asyncio.sleep(1.5)
@@
-    assert removed_count == 1  # Files may or may not be removed depending on cleanup behavior
+    assert removed_count >= 1  # At least one expired job's artifacts should be removed

701-709: Avoid writing test DBs into the repo root; use tmp_path.

This prevents residue and CI cross‑test interference.

-def test_get_db_engine_with_url():
+def test_get_db_engine_with_url(tmp_path: Path):
@@
-    db_url = "sqlite:///test.db"
+    db_url = f"sqlite:///{(tmp_path / 'test.db')}"

712-721: Same: isolate async DB file under tmp_path.

-def test_get_db_engine_async():
+def test_get_db_engine_async(tmp_path: Path):
@@
-    db_url = "sqlite+aiosqlite:///test.db"
+    db_url = f"sqlite+aiosqlite:///{(tmp_path / 'test.db')}"

735-756: Isolate default SQLite path by changing CWD to tmp_path.

get_db_engine() writes .tmp/job_store.db under cwd. Change into a temp dir for this test.

-def test_get_db_engine_creates_default_sqlite():
+def test_get_db_engine_creates_default_sqlite(monkeypatch: pytest.MonkeyPatch, tmp_path: Path):
@@
-    try:
+    try:
+        # Avoid touching repo root
+        monkeypatch.chdir(tmp_path)
         engine = get_db_engine(use_async=True)
         assert engine is not None
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5fd9745 and fdd0624.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (1)
  • tests/nat/front_ends/fastapi/test_job_store.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (7)
tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py

⚙️ CodeRabbit configuration file

tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using the test_ prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using the fixture_ prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bare raise and log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial

**/*.py: Programmatic use: create TestLLMConfig(response_seq=[...], delay_ms=...), add with builder.add_llm("", cfg).
When retrieving the test LLM wrapper, use builder.get_llm(name, wrapper_type=LLMFrameworkEnum.) and call the framework’s method (e.g., ainvoke, achat, call).

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,yaml,yml}

📄 CodeRabbit inference engine (.cursor/rules/nat-test-llm.mdc)

**/*.{py,yaml,yml}: Configure response_seq as a list of strings; values cycle per call, and [] yields an empty string.
Configure delay_ms to inject per-call artificial latency in milliseconds for nat_test_llm.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass
  • For Python exception handling, ensure proper stack trace preservation:
    • When re-raising exceptions: use bare raise statements to maintain the original stack trace,
      and use logger.error() (not logger.exception()) to avoid duplicate stack trace output.
    • When catching and logging exceptions without re-raising: always use logger.exception()
      to capture the full stack trace information.

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,

and should contain an Apache License 2.0 header comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
🧬 Code graph analysis (1)
tests/nat/front_ends/fastapi/test_job_store.py (1)
src/nat/front_ends/fastapi/job_store.py (15)
  • JobStore (138-558)
  • ensure_job_id (219-234)
  • JobStatus (58-82)
  • _create_job (236-268)
  • get_job (397-412)
  • submit_job (270-325)
  • update_status (327-377)
  • JobInfo (89-135)
  • get_status (414-432)
  • get_all_jobs (379-395)
  • get_last_job (434-453)
  • get_jobs_by_status (455-475)
  • get_expires_at (477-503)
  • cleanup_expired_jobs (505-558)
  • get_db_engine (561-598)
🪛 Ruff (0.12.2)
tests/nat/front_ends/fastapi/test_job_store.py

43-43: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: CI Pipeline / Check

Signed-off-by: David Gardner <dagardner@nvidia.com>
@coderabbitai coderabbitai bot added the breaking Breaking change label Sep 9, 2025
Signed-off-by: David Gardner <dagardner@nvidia.com>
…avid-async-generate-223-dask-313-sqa

Signed-off-by: David Gardner <dagardner@nvidia.com>
@dagardner-nv dagardner-nv removed the breaking Breaking change label Sep 9, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
tests/nat/front_ends/fastapi/test_job_store.py (1)

572-609: Make expiry cleanup test deterministic by lowering MIN_EXPIRY via monkeypatch and asserting at least one expiration

With MIN_EXPIRY=600 in JobStore, expiry_seconds=1 won’t expire after 3s; current assertions become weak. Patch MIN_EXPIRY and assert at least one expired.

-async def test_cleanup_expired_jobs_with_expired(db_engine: "AsyncEngine", dask_scheduler_address: str):
+async def test_cleanup_expired_jobs_with_expired(
+    db_engine: "AsyncEngine",
+    dask_scheduler_address: str,
+    monkeypatch: pytest.MonkeyPatch,
+):
@@
-    job_store = JobStore(scheduler_address=dask_scheduler_address, db_engine=db_engine)
+    job_store = JobStore(scheduler_address=dask_scheduler_address, db_engine=db_engine)
+    # Honor 1-second expiries in this test
+    from nat.front_ends.fastapi.job_store import JobStore as _JS
+    monkeypatch.setattr(_JS, "MIN_EXPIRY", 1, raising=True)
@@
-    # Wait for jobs to expire (longer wait to be more robust)
-    await asyncio.sleep(3)
+    # Wait for jobs to expire
+    await asyncio.sleep(1.5)
@@
-    # At least one job should remain (most recent), and total should be 2
-    assert kept_count >= 1
-    assert expired_count + kept_count == 2
+    # One kept (most recent), at least one expired
+    assert kept_count >= 1
+    assert expired_count >= 1
+    assert expired_count + kept_count == 2
🧹 Nitpick comments (5)
tests/nat/front_ends/fastapi/test_job_store.py (5)

34-37: Prefer a synchronous job function for Dask workers

Dask workers execute regular callables; submitting an async def returns a coroutine object rather than executing it. Use a sync function to avoid misleading results in submit/result flows.

+import time
@@
-async def simple_job_function(x: int, y: int = 10) -> int:
-    """Simple function for testing job execution."""
-    await asyncio.sleep(0.1)  # Simulate some work
-    return x + y
+def simple_job_function(x: int, y: int = 10) -> int:
+    """Simple function for testing job execution."""
+    time.sleep(0.1)  # Simulate some work
+    return x + y

40-43: Remove unused helper or add a test that exercises it

failing_job_function() is currently unused; drop it or add a failure-path test.

-async def failing_job_function() -> None:
-    """Function that raises an exception for testing error handling."""
-    raise ValueError("This job is designed to fail")

241-247: Compare against the enum’s value for clarity

Avoid relying on StrEnum equality; compare to the string value explicitly.

-    assert job.status == JobStatus.RUNNING
+    assert job.status == JobStatus.RUNNING.value

700-709: Avoid filesystem pollution; use tmp_path for SQLite URL

Write the test DB into pytest’s temp dir.

-def test_get_db_engine_with_url():
+def test_get_db_engine_with_url(tmp_path: Path):
@@
-    db_url = "sqlite:///test.db"
+    db_url = f"sqlite:///{(tmp_path / 'test.db')}"

711-721: Same here: use tmp_path for async SQLite URL

Keep artifacts in a temp directory.

-def test_get_db_engine_async():
+def test_get_db_engine_async(tmp_path: Path):
@@
-    db_url = "sqlite+aiosqlite:///test.db"
+    db_url = f"sqlite+aiosqlite:///{(tmp_path / 'test.db')}"
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fdd0624 and 7b0678c.

📒 Files selected for processing (1)
  • tests/nat/front_ends/fastapi/test_job_store.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (7)
tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py

⚙️ CodeRabbit configuration file

tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using the test_ prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using the fixture_ prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bare raise and log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial

**/*.py: Programmatic use: create TestLLMConfig(response_seq=[...], delay_ms=...), add with builder.add_llm("", cfg).
When retrieving the test LLM wrapper, use builder.get_llm(name, wrapper_type=LLMFrameworkEnum.) and call the framework’s method (e.g., ainvoke, achat, call).

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,yaml,yml}

📄 CodeRabbit inference engine (.cursor/rules/nat-test-llm.mdc)

**/*.{py,yaml,yml}: Configure response_seq as a list of strings; values cycle per call, and [] yields an empty string.
Configure delay_ms to inject per-call artificial latency in milliseconds for nat_test_llm.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass
  • For Python exception handling, ensure proper stack trace preservation:
    • When re-raising exceptions: use bare raise statements to maintain the original stack trace,
      and use logger.error() (not logger.exception()) to avoid duplicate stack trace output.
    • When catching and logging exceptions without re-raising: always use logger.exception()
      to capture the full stack trace information.

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,

and should contain an Apache License 2.0 header comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
🧬 Code graph analysis (1)
tests/nat/front_ends/fastapi/test_job_store.py (1)
src/nat/front_ends/fastapi/job_store.py (15)
  • JobStore (138-558)
  • ensure_job_id (219-234)
  • JobStatus (58-82)
  • _create_job (236-268)
  • get_job (397-412)
  • submit_job (270-325)
  • update_status (327-377)
  • JobInfo (89-135)
  • get_status (414-432)
  • get_all_jobs (379-395)
  • get_last_job (434-453)
  • get_jobs_by_status (455-475)
  • get_expires_at (477-503)
  • cleanup_expired_jobs (505-558)
  • get_db_engine (561-598)
🪛 Ruff (0.12.2)
tests/nat/front_ends/fastapi/test_job_store.py

42-42: Avoid specifying long messages outside the exception class

(TRY003)

Signed-off-by: David Gardner <dagardner@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
tests/nat/front_ends/fastapi/test_job_store.py (1)

596-599: Make expiry/“most recent” ordering deterministic and shorten sleep.

Add a brief sleep so job2 is definitively “most recent,” and trim the wait.

-    await job_store.update_status(job_id1, JobStatus.SUCCESS, output_path=str(output_dir1))
-    await job_store.update_status(job_id2, JobStatus.SUCCESS, output_path=str(output_dir2))
+    await job_store.update_status(job_id1, JobStatus.SUCCESS, output_path=str(output_dir1))
+    await asyncio.sleep(0.05)
+    await job_store.update_status(job_id2, JobStatus.SUCCESS, output_path=str(output_dir2))
@@
-    await asyncio.sleep(3)
+    await asyncio.sleep(1.5)

Also applies to: 605-605, 617-621

🧹 Nitpick comments (5)
tests/nat/front_ends/fastapi/test_job_store.py (5)

40-43: Silence Ruff TRY003 for test helper or use a custom exception.

This is a test utility; simplest is a one-line noqa.

-    raise ValueError("This job is designed to fail")
+    raise ValueError("This job is designed to fail")  # noqa: TRY003

655-664: Avoid writing test DB files to repo root; use tmp_path.

Prevents leftover artifacts and potential CI conflicts.

-def test_get_db_engine_with_url():
+def test_get_db_engine_with_url(tmp_path: Path):
@@
-    db_url = "sqlite:///test.db"
+    db_url = f"sqlite:///{tmp_path / 'test.db'}"
@@
-def test_get_db_engine_async():
+def test_get_db_engine_async(tmp_path: Path):
@@
-    db_url = "sqlite+aiosqlite:///test.db"
+    db_url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"

Also applies to: 667-676


689-705: Isolate default-SQLite test’s working directory.

get_db_engine() creates .tmp/job_store.db under CWD and may delete an existing file; run in a temp CWD.

-def test_get_db_engine_creates_default_sqlite():
+def test_get_db_engine_creates_default_sqlite(monkeypatch: pytest.MonkeyPatch, tmp_path: Path):
@@
-    try:
+    try:
+        monkeypatch.chdir(tmp_path)
         engine = get_db_engine(use_async=True)

165-176: Mark Dask-dependent tests as integration to control runtime in CI.

These spin up a client/scheduler; tagging helps selective runs.

-@pytest.mark.asyncio
+@pytest.mark.integration
+@pytest.mark.asyncio
 async def test_submit_job_success(...
@@
-@pytest.mark.asyncio
+@pytest.mark.integration
+@pytest.mark.asyncio
 async def test_submit_job_with_sync_timeout(...
@@
-@pytest.mark.asyncio
+@pytest.mark.integration
+@pytest.mark.asyncio
 async def test_submit_job_with_kwargs(...
@@
-@pytest.mark.asyncio
+@pytest.mark.integration
+@pytest.mark.asyncio
 async def test_client_context_manager(...

Also applies to: 185-200, 204-215, 712-723


40-43: Remove unused failing_job_function or add a test that uses it.

Dead code in tests is noise; either use it to assert failure-path handling or drop it.

I can add a test_submit_job_failure_with_sync_timeout that asserts failure status and error propagation once the worker updates status.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7b0678c and e84dce0.

📒 Files selected for processing (1)
  • tests/nat/front_ends/fastapi/test_job_store.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (7)
tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py

⚙️ CodeRabbit configuration file

tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using the test_ prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using the fixture_ prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bare raise and log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial

**/*.py: Programmatic use: create TestLLMConfig(response_seq=[...], delay_ms=...), add with builder.add_llm("", cfg).
When retrieving the test LLM wrapper, use builder.get_llm(name, wrapper_type=LLMFrameworkEnum.) and call the framework’s method (e.g., ainvoke, achat, call).

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,yaml,yml}

📄 CodeRabbit inference engine (.cursor/rules/nat-test-llm.mdc)

**/*.{py,yaml,yml}: Configure response_seq as a list of strings; values cycle per call, and [] yields an empty string.
Configure delay_ms to inject per-call artificial latency in milliseconds for nat_test_llm.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass
  • For Python exception handling, ensure proper stack trace preservation:
    • When re-raising exceptions: use bare raise statements to maintain the original stack trace,
      and use logger.error() (not logger.exception()) to avoid duplicate stack trace output.
    • When catching and logging exceptions without re-raising: always use logger.exception()
      to capture the full stack trace information.

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,

and should contain an Apache License 2.0 header comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
🧬 Code graph analysis (1)
tests/nat/front_ends/fastapi/test_job_store.py (1)
src/nat/front_ends/fastapi/job_store.py (15)
  • JobStore (138-558)
  • ensure_job_id (219-234)
  • JobStatus (58-82)
  • _create_job (236-268)
  • get_job (397-412)
  • submit_job (270-325)
  • update_status (327-377)
  • JobInfo (89-135)
  • get_status (414-432)
  • get_all_jobs (379-395)
  • get_last_job (434-453)
  • get_jobs_by_status (455-475)
  • get_expires_at (477-503)
  • cleanup_expired_jobs (505-558)
  • get_db_engine (561-598)
🪛 Ruff (0.12.2)
tests/nat/front_ends/fastapi/test_job_store.py

42-42: Avoid specifying long messages outside the exception class

(TRY003)

Signed-off-by: David Gardner <dagardner@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
tests/nat/front_ends/fastapi/test_job_store.py (5)

110-113: Make status assertions explicit (avoid implicit str↔Enum equality)

Cast stored strings to JobStatus for clarity and future-proofing.

-    assert job.status == JobStatus.SUBMITTED
+    assert JobStatus(job.status) == JobStatus.SUBMITTED
@@
-    assert job.status == JobStatus.SUBMITTED
+    assert JobStatus(job.status) == JobStatus.SUBMITTED
@@
-    assert job.status == JobStatus.RUNNING
+    assert JobStatus(job.status) == JobStatus.RUNNING
@@
-    assert job.status == JobStatus.FAILURE
+    assert JobStatus(job.status) == JobStatus.FAILURE
@@
-    assert job.status == JobStatus.SUCCESS
+    assert JobStatus(job.status) == JobStatus.SUCCESS

Also applies to: 176-181, 242-246, 262-266, 283-286


470-483: Pass string values to get_jobs_by_status (aligns with DB String column)

Use .value to avoid relying on implicit coercion; consider updating impl to compare by .value.

-    submitted_jobs = await job_store.get_jobs_by_status(JobStatus.SUBMITTED)
+    submitted_jobs = await job_store.get_jobs_by_status(JobStatus.SUBMITTED.value)
@@
-    running_jobs = await job_store.get_jobs_by_status(JobStatus.RUNNING)
+    running_jobs = await job_store.get_jobs_by_status(JobStatus.RUNNING.value)
@@
-    success_jobs = await job_store.get_jobs_by_status(JobStatus.SUCCESS)
+    success_jobs = await job_store.get_jobs_by_status(JobStatus.SUCCESS.value)
@@
-    failure_jobs = await job_store.get_jobs_by_status(JobStatus.FAILURE)
+    failure_jobs = await job_store.get_jobs_by_status(JobStatus.FAILURE.value)

498-501: Explicit active-status membership check

Avoid implicit str↔Enum membership by casting.

-    assert job is not None
-    assert job.status in job_store.ACTIVE_STATUS
+    assert job is not None
+    assert JobStatus(job.status) in job_store.ACTIVE_STATUS

596-599: Deflake most-recent retention and speed up expiry wait

Insert a small delay to make updated_at ordering deterministic and shorten sleep (MIN_EXPIRY is patched to 1s).

-        await job_store.update_status(job_id1, JobStatus.SUCCESS, output_path=str(output_dir1))
-        await job_store.update_status(job_id2, JobStatus.SUCCESS, output_path=str(output_dir2))
+        await job_store.update_status(job_id1, JobStatus.SUCCESS, output_path=str(output_dir1))
+        await asyncio.sleep(0.05)
+        await job_store.update_status(job_id2, JobStatus.SUCCESS, output_path=str(output_dir2))
@@
-        # Wait for jobs to expire
-        await asyncio.sleep(3)
+        # Wait for jobs to expire
+        await asyncio.sleep(1.5)

Also applies to: 604-606


644-649: Deflake “keeps active” by ensuring most-recent finished ordering

Guarantee job3 is the most recent finished job.

-        await job_store.update_status(job_id2, JobStatus.SUCCESS)
-        await job_store.update_status(job_id3, JobStatus.SUCCESS)
+        await job_store.update_status(job_id2, JobStatus.SUCCESS)
+        await asyncio.sleep(0.05)
+        await job_store.update_status(job_id3, JobStatus.SUCCESS)
🧹 Nitpick comments (2)
tests/nat/front_ends/fastapi/test_job_store.py (2)

40-42: Fix Ruff TRY003 in failing helper

Drop the message to satisfy TRY003 in tests.

-async def failing_job_function() -> None:
-    """Function that raises an exception for testing error handling."""
-    raise ValueError("This job is designed to fail")
+async def failing_job_function() -> None:
+    """Function that raises an exception for testing error handling."""
+    raise ValueError

701-722: Use monkeypatch for env var to avoid leakage/races

Prefer pytest’s monkeypatch over manual env edits.

-def test_get_db_engine_creates_default_sqlite():
+def test_get_db_engine_creates_default_sqlite(monkeypatch: pytest.MonkeyPatch):
     """Test get_db_engine creates default SQLite when no URL provided."""
     from nat.front_ends.fastapi.job_store import get_db_engine
 
-    # Temporarily clear the environment variable
-    original_url = os.environ.get("NAT_JOB_STORE_DB_URL")
-    if original_url:
-        del os.environ["NAT_JOB_STORE_DB_URL"]
-
-    try:
-        engine = get_db_engine(use_async=True)
-        assert engine is not None
-
-        # Should create a SQLite database in .tmp directory
-        assert "sqlite" in str(engine.url)
-        assert ".tmp/job_store.db" in str(engine.url)
-
-    finally:
-        # Restore environment variable
-        if original_url:
-            os.environ["NAT_JOB_STORE_DB_URL"] = original_url
+    # Temporarily clear the environment variable
+    monkeypatch.delenv("NAT_JOB_STORE_DB_URL", raising=False)
+
+    engine = get_db_engine(use_async=True)
+    assert engine is not None
+    # Should create a SQLite database in .tmp directory
+    assert "sqlite" in str(engine.url)
+    assert ".tmp/job_store.db" in str(engine.url)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e84dce0 and dc17c8c.

📒 Files selected for processing (1)
  • tests/nat/front_ends/fastapi/test_job_store.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (7)
tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py

⚙️ CodeRabbit configuration file

tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using the test_ prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using the fixture_ prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bare raise and log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial

**/*.py: Programmatic use: create TestLLMConfig(response_seq=[...], delay_ms=...), add with builder.add_llm("", cfg).
When retrieving the test LLM wrapper, use builder.get_llm(name, wrapper_type=LLMFrameworkEnum.) and call the framework’s method (e.g., ainvoke, achat, call).

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*.{py,yaml,yml}

📄 CodeRabbit inference engine (.cursor/rules/nat-test-llm.mdc)

**/*.{py,yaml,yml}: Configure response_seq as a list of strings; values cycle per call, and [] yields an empty string.
Configure delay_ms to inject per-call artificial latency in milliseconds for nat_test_llm.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass
  • For Python exception handling, ensure proper stack trace preservation:
    • When re-raising exceptions: use bare raise statements to maintain the original stack trace,
      and use logger.error() (not logger.exception()) to avoid duplicate stack trace output.
    • When catching and logging exceptions without re-raising: always use logger.exception()
      to capture the full stack trace information.

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,

and should contain an Apache License 2.0 header comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • tests/nat/front_ends/fastapi/test_job_store.py
🧬 Code graph analysis (1)
tests/nat/front_ends/fastapi/test_job_store.py (1)
src/nat/front_ends/fastapi/job_store.py (15)
  • JobStore (138-558)
  • ensure_job_id (219-234)
  • JobStatus (58-82)
  • _create_job (236-268)
  • get_job (397-412)
  • submit_job (270-325)
  • update_status (327-377)
  • JobInfo (89-135)
  • get_status (414-432)
  • get_all_jobs (379-395)
  • get_last_job (434-453)
  • get_jobs_by_status (455-475)
  • get_expires_at (477-503)
  • cleanup_expired_jobs (505-558)
  • get_db_engine (561-598)
🪛 Ruff (0.12.2)
tests/nat/front_ends/fastapi/test_job_store.py

42-42: Avoid specifying long messages outside the exception class

(TRY003)

Signed-off-by: David Gardner <dagardner@nvidia.com>
…buted variable

Signed-off-by: David Gardner <dagardner@nvidia.com>
Signed-off-by: David Gardner <dagardner@nvidia.com>
@coderabbitai coderabbitai bot added the breaking Breaking change label Sep 10, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (5)
src/nat/front_ends/fastapi/job_store.py (5)

320-321: Import and catch the correct Dask TimeoutError.

The current code catches the built-in TimeoutError, but Dask's Future.result() raises distributed.TimeoutError.

Add the import and fix the exception handling:

-from dask.distributed import Client as DaskClient
-from dask.distributed import Future
-from dask.distributed import Variable
-from dask.distributed import fire_and_forget
+from dask.distributed import Client as DaskClient
+from dask.distributed import Future
+from dask.distributed import TimeoutError as DaskTimeoutError
+from dask.distributed import Variable
+from dask.distributed import fire_and_forget
-                except TimeoutError:
+                except DaskTimeoutError:
                     pass

516-516: Fix SQLAlchemy method name.

The method should be notin_() not not_in().

-                 JobInfo.status.not_in(self.ACTIVE_STATUS))).order_by(JobInfo.updated_at.desc())
+                 JobInfo.status.notin_(self.ACTIVE_STATUS))).order_by(JobInfo.updated_at.desc())

549-550: Catch correct Dask TimeoutError in cleanup.

Similar to the earlier issue, this should catch Dask's TimeoutError, not the built-in one.

-                        except TimeoutError:
+                        except DaskTimeoutError:
                             pass

530-537: Security risk: Unrestricted file system cleanup.

The current code performs arbitrary file deletions based on user-provided output_path values, which poses a security risk for path traversal attacks.

+                    # Guard against path traversal attacks
                     if job.output_path:
-                        logger.info("Cleaning up output directory for job %s at %s", job.job_id, job.output_path)
-                        # If it is a file remove it
-                        if os.path.isfile(job.output_path):
-                            os.remove(job.output_path)
-                        # If it is a directory remove it
-                        elif os.path.isdir(job.output_path):
-                            shutil.rmtree(job.output_path)
+                        output_path = os.path.realpath(job.output_path)
+                        safe_root = os.path.realpath(os.getenv("NAT_OUTPUT_BASE_DIR", os.getcwd()))
+                        if not output_path.startswith(safe_root + os.sep):
+                            logger.warning("Skipping cleanup for %s: path %s outside %s", 
+                                         job.job_id, output_path, safe_root)
+                        else:
+                            logger.info("Cleaning up output for job %s at %s", job.job_id, output_path)
+                            if os.path.isfile(output_path):
+                                os.remove(output_path)
+                            elif os.path.isdir(output_path):
+                                shutil.rmtree(output_path)

581-583: Database persistence issue: Don't delete existing database files.

Deleting existing database files undermines the primary goal of this PR, which is to provide persistent job storage across multiple workers.

-            if os.path.exists(db_file):
-                logger.warning("Database file %s already exists, it will be overwritten.", db_file)
-                os.remove(db_file)
+            # Reuse existing database to preserve job history across restarts
+            if os.path.exists(db_file):
+                logger.info("Using existing database file %s", db_file)
🧹 Nitpick comments (5)
src/nat/front_ends/fastapi/job_store.py (5)

159-159: Fix mutable class attribute annotation.

The static analysis tool correctly identifies that ACTIVE_STATUS should be annotated with typing.ClassVar.

-    ACTIVE_STATUS = {JobStatus.RUNNING, JobStatus.SUBMITTED}
+    ACTIVE_STATUS: typing.ClassVar[set[JobStatus]] = {JobStatus.RUNNING, JobStatus.SUBMITTED}

169-172: Simplify exception message.

Following the coding guidelines, avoid specifying long messages outside the exception class.

-            if db_url is None:
-                raise ValueError("Either db_engine or db_url must be provided")
+            if db_url is None:
+                raise ValueError("Missing required db_engine or db_url")

358-359: Simplify exception message.

Following coding guidelines, avoid long messages outside the exception class.

-            if job is None:
-                raise ValueError(f"Job {job_id} not found in job store")
+            if job is None:
+                raise ValueError(f"Job {job_id} not found")

602-602: Fix all sorting.

The static analysis tool correctly identifies that __all__ should be sorted.

-__all__ = ["get_db_engine", "JobInfo", "JobStatus", "JobStore"]
+__all__ = ["JobInfo", "JobStatus", "JobStore", "get_db_engine"]

594-598: Add SQLite WAL mode for better concurrency.

To improve performance under multiple workers, enable SQLite WAL (Write-Ahead Logging) mode.

    if use_async:
        # This is actually a blocking call, it just returns an AsyncEngine
        from sqlalchemy.ext.asyncio import create_async_engine as create_engine_fn
+        
+        def _setup_sqlite_wal(connection, connection_record):
+            if "sqlite" in str(connection.get_dialect().name):
+                connection.execute("PRAGMA journal_mode=WAL")
+        
+        engine = create_engine_fn(db_url, echo=echo)
+        
+        # Enable WAL mode for better concurrency
+        if "sqlite" in db_url:
+            from sqlalchemy import event
+            event.listen(engine.sync_engine, "connect", _setup_sqlite_wal)
+        
+        return engine
    else:
        from sqlalchemy import create_engine as create_engine_fn
+        engine = create_engine_fn(db_url, echo=echo)
+        
+        # Enable WAL mode for better concurrency
+        if "sqlite" in db_url:
+            def _setup_sqlite_wal(connection, connection_record):
+                connection.execute("PRAGMA journal_mode=WAL")
+            from sqlalchemy import event
+            event.listen(engine, "connect", _setup_sqlite_wal)
+        
+        return engine

-    return create_engine_fn(db_url, echo=echo)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc17c8c and b8c8399.

📒 Files selected for processing (2)
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py (4 hunks)
  • src/nat/front_ends/fastapi/job_store.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/nat/front_ends/fastapi/fastapi_front_end_plugin.py
🧰 Additional context used
📓 Path-based instructions (7)
src/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

src/**/*.py: All importable Python code must live under src/
All public APIs in src/ require Python 3.11+ type hints on parameters and return values; prefer typing/collections.abc abstractions; use typing.Annotated when useful
Provide Google-style docstrings for every public module, class, function, and CLI command; first line concise with a period; surround code entities with backticks

Files:

  • src/nat/front_ends/fastapi/job_store.py
src/nat/**/*

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Core functionality under src/nat should prioritize backward compatibility when changed

Files:

  • src/nat/front_ends/fastapi/job_store.py

⚙️ CodeRabbit configuration file

This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.

Files:

  • src/nat/front_ends/fastapi/job_store.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bare raise and log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial

**/*.py: Programmatic use: create TestLLMConfig(response_seq=[...], delay_ms=...), add with builder.add_llm("", cfg).
When retrieving the test LLM wrapper, use builder.get_llm(name, wrapper_type=LLMFrameworkEnum.) and call the framework’s method (e.g., ainvoke, achat, call).

Files:

  • src/nat/front_ends/fastapi/job_store.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks

Files:

  • src/nat/front_ends/fastapi/job_store.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm

Files:

  • src/nat/front_ends/fastapi/job_store.py
**/*.{py,yaml,yml}

📄 CodeRabbit inference engine (.cursor/rules/nat-test-llm.mdc)

**/*.{py,yaml,yml}: Configure response_seq as a list of strings; values cycle per call, and [] yields an empty string.
Configure delay_ms to inject per-call artificial latency in milliseconds for nat_test_llm.

Files:

  • src/nat/front_ends/fastapi/job_store.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass
  • For Python exception handling, ensure proper stack trace preservation:
    • When re-raising exceptions: use bare raise statements to maintain the original stack trace,
      and use logger.error() (not logger.exception()) to avoid duplicate stack trace output.
    • When catching and logging exceptions without re-raising: always use logger.exception()
      to capture the full stack trace information.

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,

and should contain an Apache License 2.0 header comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • src/nat/front_ends/fastapi/job_store.py
🧬 Code graph analysis (1)
src/nat/front_ends/fastapi/job_store.py (1)
src/nat/front_ends/fastapi/dask_client_mixin.py (2)
  • DaskClientMixin (25-43)
  • client (28-43)
🪛 Ruff (0.12.2)
src/nat/front_ends/fastapi/job_store.py

159-159: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


171-171: Avoid specifying long messages outside the exception class

(TRY003)


319-319: Consider moving this statement to an else block

(TRY300)


359-359: Avoid specifying long messages outside the exception class

(TRY003)


602-602: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: CI Pipeline / Check
🔇 Additional comments (1)
src/nat/front_ends/fastapi/job_store.py (1)

554-555: Improve exception logging following coding guidelines.

The current implementation correctly uses logger.exception() when catching and not re-raising, which follows the coding guidelines.

…a, and to use the gunicorn extra instead of defining it directly

Signed-off-by: David Gardner <dagardner@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
packages/nvidia_nat_all/pyproject.toml (2)

1-1: Add the SPDX header (CI will fail without it).

All repo files must start with the standard SPDX Apache-2.0 header and up-to-date copyright years.

Apply at the very top:

+# SPDX-FileCopyrightText: 2025 NVIDIA CORPORATION & AFFILIATES
+# SPDX-License-Identifier: Apache-2.0
+
 [build-system]

If this file predates 2025, adjust the year range accordingly (e.g., 2023-2025).


24-41: Pin all nvidia-nat- deps to ~=1.3 to satisfy packages/ policy.**

The packages/* rule requires nvidia-nat(-*) dependencies use two-digit ~= pins. Add ~=1.3 to each.

Apply:

   "nvidia-nat[gunicorn, async_endpoints]~=1.3",
-  "nvidia-nat-agno",
-  "nvidia-nat-crewai",
-  "nvidia-nat-langchain",
-  "nvidia-nat-llama-index",
-  "nvidia-nat-mcp",
-  "nvidia-nat-mem0ai",
-  "nvidia-nat-mysql",
-  "nvidia-nat-opentelemetry",
-  "nvidia-nat-phoenix",
-  "nvidia-nat-profiling",
-  "nvidia-nat-ragaai",
-  "nvidia-nat-redis",
-  "nvidia-nat-s3",
-  "nvidia-nat-semantic-kernel",
-  "nvidia-nat-weave",
-  "nvidia-nat-zep-cloud",
-  "nvidia-nat-ingestion",
+  "nvidia-nat-agno~=1.3",
+  "nvidia-nat-crewai~=1.3",
+  "nvidia-nat-langchain~=1.3",
+  "nvidia-nat-llama-index~=1.3",
+  "nvidia-nat-mcp~=1.3",
+  "nvidia-nat-mem0ai~=1.3",
+  "nvidia-nat-mysql~=1.3",
+  "nvidia-nat-opentelemetry~=1.3",
+  "nvidia-nat-phoenix~=1.3",
+  "nvidia-nat-profiling~=1.3",
+  "nvidia-nat-ragaai~=1.3",
+  "nvidia-nat-redis~=1.3",
+  "nvidia-nat-s3~=1.3",
+  "nvidia-nat-semantic-kernel~=1.3",
+  "nvidia-nat-weave~=1.3",
+  "nvidia-nat-zep-cloud~=1.3",
+  "nvidia-nat-ingestion~=1.3",

Note: Workspace overrides in [tool.uv.sources] will still be honored during dev while keeping publish-time constraints correct.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b8c8399 and 2ec606b.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (1)
  • packages/nvidia_nat_all/pyproject.toml (1 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
packages/*/pyproject.toml

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

packages/*/pyproject.toml: Each package under packages/ must include a pyproject.toml
Package pyproject.toml must depend on nvidia-nat or a package starting with nvidia-nat- using ~= versions (e.g., ~=1.0)

Files:

  • packages/nvidia_nat_all/pyproject.toml
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks

Files:

  • packages/nvidia_nat_all/pyproject.toml
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass
  • For Python exception handling, ensure proper stack trace preservation:
    • When re-raising exceptions: use bare raise statements to maintain the original stack trace,
      and use logger.error() (not logger.exception()) to avoid duplicate stack trace output.
    • When catching and logging exceptions without re-raising: always use logger.exception()
      to capture the full stack trace information.

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,

and should contain an Apache License 2.0 header comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • packages/nvidia_nat_all/pyproject.toml
packages/**/*

⚙️ CodeRabbit configuration file

packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain a pyproject.toml file. - The pyproject.toml file should declare a dependency on nvidia-nat or another package with a name starting
with nvidia-nat-. This dependency should be declared using ~=<version>, and the version should be a two
digit version (ex: ~=1.0).

  • Not all packages contain Python code, if they do they should also contain their own set of tests, in a
    tests/ directory at the same level as the pyproject.toml file.

Files:

  • packages/nvidia_nat_all/pyproject.toml
🧠 Learnings (1)
📚 Learning: 2025-08-28T23:22:41.742Z
Learnt from: CR
PR: NVIDIA/NeMo-Agent-Toolkit#0
File: .cursor/rules/general.mdc:0-0
Timestamp: 2025-08-28T23:22:41.742Z
Learning: Applies to packages/*/pyproject.toml : Package pyproject.toml must depend on nvidia-nat or a package starting with nvidia-nat- using ~=<two-digit> versions (e.g., ~=1.0)

Applied to files:

  • packages/nvidia_nat_all/pyproject.toml
🔇 Additional comments (1)
packages/nvidia_nat_all/pyproject.toml (1)

23-23: Good change: fold gunicorn/async extras into the core meta dep.

Using nvidia-nat[gunicorn, async_endpoints]~=1.3 complies with the packages/* rule and keeps constraints centralized.

…avid-async-generate-223-dask-313-sqa

Signed-off-by: David Gardner <dagardner@nvidia.com>
@dagardner-nv dagardner-nv removed the breaking Breaking change label Sep 10, 2025
@dagardner-nv
Copy link
Contributor Author

/merge

@rapids-bot rapids-bot bot merged commit 8298565 into NVIDIA:develop Sep 10, 2025
17 checks passed
@dagardner-nv dagardner-nv deleted the david-async-generate-223-dask-313-sqa branch September 10, 2025 18:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working non-breaking Non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG]: evaluate API returns inconsistent results when workers > 1

2 participants