Skip to content

Conversation

kartpop
Copy link
Collaborator

@kartpop kartpop commented Sep 5, 2025

  • Added RabbitMQ and Redis configuration to .env.example and config.py.
  • Implemented Celery application setup in celery_app.py with task routing and queue management.
  • Created high and low priority task execution functions in job_execution.py.
  • Developed utility functions for starting jobs and managing task status in utils.py.
  • Added Celery beat scheduler for periodic tasks in beat.py.
  • Implemented worker management script for starting Celery workers in worker.py.
  • Updated dependencies in pyproject.toml to include Celery and Redis packages.

Summary

Target issue is #PLEASE_TYPE_ISSUE_NUMBER
Explain the motivation for making this change. What existing problem does the pull request solve?

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

Notes

Please add here if any other information is required for the reviewer.

Summary by CodeRabbit

  • New Features

    • Adds background job processing with prioritized queues, bound high/low priority tasks, helper APIs to start/check/revoke jobs, and CLIs to run workers and the scheduler.
  • Chores

    • Updates example environment with RabbitMQ, Redis, and Celery settings.
    • Adds Celery and Redis dependencies, a Celery worker image recipe, a Docker venv path tweak, and a development compose setup for local integration.
  • Documentation

    • Adds local dev setup assets and minor README formatting fixes.

Copy link

coderabbitai bot commented Sep 5, 2025

Walkthrough

Adds Celery integration: environment variables and Docker/compose support for RabbitMQ and Redis, new Settings fields and computed broker/backend URLs, a configured Celery app (queues, routing, worker/task tuning), task modules and utilities, CLI scripts for worker/beat, new dependencies, and minor README/Dockerfile adjustments.

Changes

Cohort / File(s) Summary
Environment & Example
\.env\.example
Adds RabbitMQ, Redis, and multiple Celery-related environment variables (broker, backend, worker/task tuning, timezone set to Asia/Kolkata).
Config
backend/app/core/config.py
Adds RabbitMQ/Redis fields and computed RABBITMQ_URL/REDIS_URL; adds many Celery settings and a computed worker concurrency default using multiprocessing.
Celery package export
backend/app/celery/__init__.py
Re-exports celery_app and defines __all__ = ["celery_app"].
Celery app
backend/app/celery/celery_app.py
New Celery application configured with broker/backend, queues (high/low/cron/default) and priorities, routing rules, worker/task tuning, serialization, monitoring, broker/backoff settings, and task autodiscovery (include list adjusted).
Tasks
backend/app/celery/tasks/__init__.py, backend/app/celery/tasks/job_execution.py
Adds __all__ = []; adds bound tasks execute_high_priority_task and execute_low_priority_task that delegate to an internal executor which imports and runs target functions with logging, correlation context, and error handling.
Utilities
backend/app/celery/utils.py
Adds helpers: start_high_priority_job, start_low_priority_job, get_task_status, and revoke_task (wraps AsyncResult and control.revoke).
CLIs
backend/app/celery/worker.py, backend/app/celery/beat.py
Adds programmatic/CLI entrypoints to start a Celery worker and Celery Beat (argparse options for queues, concurrency, loglevel).
Docker / Build
backend/Dockerfile, backend/Dockerfile.celery, docker-compose.dev.yml
Updates runtime venv path in backend Dockerfile; adds Dockerfile.celery for a Celery worker image; adds docker-compose.dev.yml provisioning Redis, RabbitMQ, Postgres, backend and a Celery worker for development.
Dependencies
backend/pyproject.toml
Adds celery>=5.3.0,<6.0.0 and redis>=5.0.0,<6.0.0 to backend dependencies.
Docs
backend/README.md
Minor formatting adjustment (trailing newline) in Email Templates section.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant Utils as app.celery.utils
  participant Broker as RabbitMQ
  participant Worker as Celery Worker
  participant Exec as job_execution.task
  participant Target as Target Function
  participant Backend as Redis

  Client->>Utils: start_high_priority_job(function_path, project_id, job_id, **kwargs)
  Utils->>Broker: enqueue execute_high_priority_task(payload)
  note right of Broker #E8F0FF: Routed to high_priority queue
  Worker-->>Broker: fetch task
  Worker->>Exec: execute_high_priority_task(...)
  Exec->>Exec: import function_path, set correlation_id, log
  Exec->>Target: call(project_id, job_id, task_id, task_instance, **kwargs)
  Target-->>Exec: result or exception
  Exec->>Backend: store result/state
  Exec-->>Worker: return result
  Utils-->>Client: return Celery task_id

  opt Status & Revoke
    Client->>Utils: get_task_status(task_id)
    Utils->>Backend: AsyncResult query
    Backend-->>Utils: status/result/info
    Client->>Utils: revoke_task(task_id, terminate?)
    Utils->>Worker: control.revoke(task_id, terminate)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

ready-for-review

Suggested reviewers

  • AkhileshNegi
  • avirajsingh7
  • nishika26

Poem

I hop and bind tasks in a queue,
RabbitMQ hums — off they flew.
Redis keeps results safe and snug,
Workers munch carrots, give each job a hug.
Hooray — Celery bakes a task-filled mug! 🥕🐇

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly states the primary change—integrating Celery with RabbitMQ and Redis for task management—which directly matches the PR contents (new env/config entries, Celery app, tasks, worker/beat, and compose/Docker updates). It is concise, specific, and informative enough for a reviewer scanning history to understand the PR's main purpose.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/celery

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
.env.example (1)

14-16: ENVIRONMENT value invalid vs Settings literal — will fail validation.

.env.example uses ENVIRONMENT=local, but Settings.ENVIRONMENT only allows "development" | "testing" | "staging" | "production". This will raise a validation error on startup if copied verbatim. Change to development, or extend the literal to include local.

Apply one of:

Option A (update env):

-ENVIRONMENT=local
+ENVIRONMENT=development

Option B (extend allowed values in code; see config.py comment).

backend/app/core/config.py (2)

39-41: Align ENVIRONMENT choices with .env.example or support “local”.

Currently ENVIRONMENT excludes "local", but .env.example uses it. Either add "local" to the Literal and mapping, or update the example file.

-    ENVIRONMENT: Literal[
-        "development", "testing", "staging", "production"
-    ] = "development"
+    ENVIRONMENT: Literal[
+        "local", "development", "testing", "staging", "production"
+    ] = "development"
@@
-def get_settings() -> Settings:
+def get_settings() -> Settings:
@@
-    # Determine env file
-    env_files = {"testing": "../.env.test", "development": "../.env"}
-    env_file = env_files.get(environment, "../.env")
+    # Determine env file
+    env_files = {
+        "testing": "../.env.test",
+        "development": "../.env",
+        "local": "../.env",
+    }
+    env_file = env_files.get(environment, "../.env")

Also applies to: 142-151


1-4: URL-encode broker/backend credentials and vhost to avoid connection bugs.

Special chars in usernames/passwords (e.g., @, /, :) or vhost names require encoding; otherwise Celery/kombu will fail to parse the URL.

 import secrets
 import warnings
 import os
 from typing import Any, Literal
+from urllib.parse import quote
@@
     def RABBITMQ_URL(self) -> str:
-        return f"amqp://{self.RABBITMQ_USER}:{self.RABBITMQ_PASSWORD}@{self.RABBITMQ_HOST}:{self.RABBITMQ_PORT}/{self.RABBITMQ_VHOST}"
+        user = quote(self.RABBITMQ_USER, safe="")
+        pwd = quote(self.RABBITMQ_PASSWORD, safe="")
+        vhost = quote(self.RABBITMQ_VHOST, safe="")
+        return f"amqp://{user}:{pwd}@{self.RABBITMQ_HOST}:{self.RABBITMQ_PORT}/{vhost}"
@@
     def REDIS_URL(self) -> str:
-        if self.REDIS_PASSWORD:
-            return f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
-        return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
+        if self.REDIS_PASSWORD:
+            pwd = quote(self.REDIS_PASSWORD, safe="")
+            return f"redis://:{pwd}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
+        return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"

Optional: consider a future REDIS_SSL flag to switch to rediss://.

Also applies to: 74-85, 92-98

🧹 Nitpick comments (20)
backend/pyproject.toml (1)

32-34: Replace explicit redis dependency with Celery’s redis extra
Since no direct redis imports or instantiations were found, consolidate to:

-    "celery>=5.3.0,<6.0.0",
-    "redis>=5.0.0,<6.0.0",
+    "celery[redis]>=5.3.0,<6.0.0",
.env.example (4)

48-51: Fix dotenv lint nits: remove quotes and key order.

Remove quotes on non-secret public key and order keys to satisfy dotenv-linter.

-# OpenAI
-OPENAI_API_KEY=your_openai_api_key_here
-LANGFUSE_PUBLIC_KEY="this_is_not_a_secret"
+# OpenAI / Langfuse
+LANGFUSE_PUBLIC_KEY=this_is_not_a_secret
+OPENAI_API_KEY=your_openai_api_key_here

53-58: Order RabbitMQ keys for consistency/linting.

Place PASSWORD before PORT and keep fields alphabetically ordered within the section.

-# RabbitMQ Configuration (Celery Broker)
-RABBITMQ_HOST=localhost
-RABBITMQ_PORT=5672
-RABBITMQ_USER=guest
-RABBITMQ_PASSWORD=guest
-RABBITMQ_VHOST=/
+# RabbitMQ Configuration (Celery Broker)
+RABBITMQ_HOST=localhost
+RABBITMQ_PASSWORD=guest
+RABBITMQ_PORT=5672
+RABBITMQ_USER=guest
+RABBITMQ_VHOST=/

60-64: Order Redis keys for consistency/linting.

Align ordering to satisfy dotenv-linter (DB and PASSWORD before HOST/PORT).

-# Redis Configuration (Celery Result Backend)
-REDIS_HOST=localhost
-REDIS_PORT=6379
-REDIS_DB=0
-REDIS_PASSWORD=
+# Redis Configuration (Celery Result Backend)
+REDIS_DB=0
+REDIS_PASSWORD=
+REDIS_HOST=localhost
+REDIS_PORT=6379

66-77: Clarify units and defaults for Celery knobs.

Add comments for units (seconds / KB) and note that blank concurrency falls back to Celery default.

-# Celery Configuration
-CELERY_WORKER_CONCURRENCY=
-CELERY_WORKER_MAX_TASKS_PER_CHILD=1000
-CELERY_WORKER_MAX_MEMORY_PER_CHILD=200000
-CELERY_TASK_SOFT_TIME_LIMIT=300
-CELERY_TASK_TIME_LIMIT=600
+# Celery Configuration
+# Leave blank to use Celery default (cores)
+CELERY_WORKER_CONCURRENCY=
+CELERY_WORKER_MAX_TASKS_PER_CHILD=1000
+# KB (e.g., 200000 ~= 195 MB)
+CELERY_WORKER_MAX_MEMORY_PER_CHILD=200000
+# Seconds
+CELERY_TASK_SOFT_TIME_LIMIT=300
+CELERY_TASK_TIME_LIMIT=600
 CELERY_TASK_MAX_RETRIES=3
 CELERY_TASK_DEFAULT_RETRY_DELAY=60
 CELERY_RESULT_EXPIRES=3600
 CELERY_BROKER_POOL_LIMIT=10
 CELERY_WORKER_PREFETCH_MULTIPLIER=1
 CELERY_ENABLE_UTC=true
 CELERY_TIMEZONE=UTC
backend/app/core/config.py (1)

120-129: Warn on insecure RabbitMQ/Redis credentials outside dev/test.

We already guard SECRET_KEY, etc. Add a similar warning/guard for RABBITMQ_PASSWORD="guest" and empty REDIS_PASSWORD when not in development/testing.

     def _check_default_secret(self, var_name: str, value: str | None) -> None:
         if value == "changethis":
@@
     def _enforce_non_default_secrets(self) -> Self:
         self._check_default_secret("SECRET_KEY", self.SECRET_KEY)
         self._check_default_secret("POSTGRES_PASSWORD", self.POSTGRES_PASSWORD)
         self._check_default_secret(
             "FIRST_SUPERUSER_PASSWORD", self.FIRST_SUPERUSER_PASSWORD
         )
+        # Additional broker/backend guards
+        if self.ENVIRONMENT not in ["development", "testing"]:
+            if self.RABBITMQ_PASSWORD == "guest":
+                warnings.warn(
+                    "RABBITMQ_PASSWORD is 'guest' outside development/testing.",
+                    stacklevel=1,
+                )
+            if not self.REDIS_PASSWORD:
+                warnings.warn(
+                    "REDIS_PASSWORD is empty outside development/testing.",
+                    stacklevel=1,
+                )
 
         return self

Also applies to: 131-139

backend/app/celery/beat.py (3)

15-15: Drop unnecessary f-string.

No placeholders in the string.

-    print(f"Starting Celery beat scheduler")
+    print("Starting Celery beat scheduler")

5-6: Clean imports and prefer programmatic Beat runner (optional).

  • Celery import is unused.
  • Consider using the programmatic beat runner for better control/testability.
-from celery import Celery
-from app.celery.celery_app import celery_app
+from celery.bin import beat as celery_beat
+from app.celery.celery_app import celery_app
-    # Start the beat scheduler
-    celery_app.start(["celery", "beat", "-l", loglevel])
+    # Start the beat scheduler
+    beat_cmd = celery_beat.beat(app=celery_app)
+    beat_cmd.run(loglevel=loglevel)

26-30: Include 'critical' in loglevel choices (optional).

Celery supports 'critical'.

-        choices=["debug", "info", "warning", "error"],
+        choices=["debug", "info", "warning", "error", "critical"],
backend/app/celery/worker.py (4)

5-11: Remove unused imports.

os and sys aren’t used.

-import os
-import sys
 import multiprocessing
 from celery.bin import worker
 from app.celery.celery_app import celery_app
 from app.core.config import settings

28-31: Prefer logging over prints (optional).

Use the logging module for consistency with the rest of the app.

-    print(f"Starting Celery worker with {concurrency} processes")
-    print(f"Consuming queues: {queues}")
-    print(f"Log level: {loglevel}")
+    import logging
+    logger = logging.getLogger(__name__)
+    logger.info("Starting Celery worker with %s processes", concurrency)
+    logger.info("Consuming queues: %s", queues)
+    logger.info("Log level: %s", loglevel)

38-41: Operational trade-offs with disabled gossip/mingle/heartbeat.

Disabling these reduces startup latency but limits remote control, worker discovery, and health monitoring (e.g., Flower alerts).

If observability is desired in prod, consider enabling at least heartbeat.


59-62: Add 'critical' to loglevel choices (optional).

-        choices=["debug", "info", "warning", "error"],
+        choices=["debug", "info", "warning", "error", "critical"],
backend/app/celery/utils.py (2)

6-7: Use built-in generics and drop unused imports.

Replace typing.Dict with dict[...]; remove unused Optional.

-from typing import Any, Dict, Optional
+from typing import Any

72-88: Return a safer, serializable task status payload (optional).

Avoid exposing raw result/info when not ready; ensure JSON-friendly values.

-def get_task_status(task_id: str) -> Dict[str, Any]:
+def get_task_status(task_id: str) -> dict[str, Any]:
@@
-    result = AsyncResult(task_id)
-    return {
-        "task_id": task_id,
-        "status": result.status,
-        "result": result.result,
-        "info": result.info
-    }
+    result = AsyncResult(task_id)
+    data: dict[str, Any] = {
+        "task_id": task_id,
+        "status": result.state,
+        "ready": result.ready(),
+    }
+    if result.ready():
+        data["result"] = result.result
+        data["info"] = result.info
+        data["successful"] = result.successful()
+    else:
+        data["result"] = None
+        data["info"] = None
+        data["successful"] = False
+    return data
backend/app/celery/tasks/job_execution.py (5)

1-3: Drop unused/legacy typing import; prefer Celery task logger (Ruff UP035).

  • Dict isn’t used and Ruff flags typing.Dict as deprecated.
  • Switching to Celery’s task logger integrates with worker log config.
 import logging
 import importlib
-from typing import Any, Dict
+from typing import Any
 from celery import current_task
+from celery.utils.log import get_task_logger
 
 from app.celery.celery_app import celery_app
 
-logger = logging.getLogger(__name__)
+logger = get_task_logger(__name__)

Also applies to: 8-9


26-38: Mirror reliability options on low-priority task.

Keep behavior consistent across priorities.

-@celery_app.task(bind=True, queue="low_priority")
+@celery_app.task(
+    bind=True,
+    queue="low_priority",
+    autoretry_for=(Exception,),
+    retry_backoff=True,
+    retry_backoff_max=600,
+    retry_jitter=True,
+    retry_kwargs={"max_retries": 3},
+    acks_late=True,
+    track_started=True,
+)
 def execute_low_priority_task(self, function_path: str, user_id: int, job_id: str, **kwargs):

41-54: Prefer self.request.id over current_task; annotate return type.

Using task_instance.request.id is more direct and resilient (e.g., nested tasks); fall back to current_task only if needed.

-def _execute_job_internal(task_instance, function_path: str, user_id: int, job_id: str, priority: str, **kwargs):
+def _execute_job_internal(task_instance, function_path: str, user_id: int, job_id: str, priority: str, **kwargs) -> Any:
@@
-    task_id = current_task.request.id
+    task_id = (
+        getattr(getattr(task_instance, "request", None), "id", None)
+        or getattr(getattr(current_task, "request", None), "id", None)
+    )
+    if not task_id:
+        logger.warning("No task_id available; defaulting to 'unknown'")
+        task_id = "unknown"

61-77: Emit structured logs and use logger.exception for failures.

Structured fields make filtering and tracing easier; logger.exception auto-includes traceback.

-        logger.info(f"Executing {priority} job {job_id} (task {task_id}) using function {function_path}")
+        logger.info(
+            "Executing job",
+            extra={"priority": priority, "job_id": job_id, "task_id": task_id, "user_id": user_id, "function_path": function_path},
+        )
@@
-        logger.info(f"{priority.capitalize()} job {job_id} (task {task_id}) completed successfully")
+        logger.info(
+            "Job completed",
+            extra={"priority": priority, "job_id": job_id, "task_id": task_id, "user_id": user_id, "function_path": function_path},
+        )
@@
-        logger.error(f"{priority.capitalize()} job {job_id} (task {task_id}) failed: {exc}", exc_info=True)
+        logger.exception(
+            "Job failed",
+            extra={"priority": priority, "job_id": job_id, "task_id": task_id, "user_id": user_id, "function_path": function_path},
+        )

11-23: Routing confirmed; optional resilience enhancements

Verified that high_priority queue and its routing are configured in backend/app/celery/celery_app.py (task_queues at lines 23–24; task_routes at lines 33–34). As an optional improvement, you can harden this task by adding autoretry_for=(Exception,), retry_backoff, retry_backoff_max, retry_jitter, retry_kwargs={'max_retries':…}, acks_late=True, and track_started=True to the decorator for conservative retry semantics and started-state tracking.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 01f66ee and 74c2a42.

⛔ Files ignored due to path filters (1)
  • backend/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (10)
  • .env.example (1 hunks)
  • backend/app/celery/__init__.py (1 hunks)
  • backend/app/celery/beat.py (1 hunks)
  • backend/app/celery/celery_app.py (1 hunks)
  • backend/app/celery/tasks/__init__.py (1 hunks)
  • backend/app/celery/tasks/job_execution.py (1 hunks)
  • backend/app/celery/utils.py (1 hunks)
  • backend/app/celery/worker.py (1 hunks)
  • backend/app/core/config.py (1 hunks)
  • backend/pyproject.toml (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/celery/utils.py (1)
backend/app/celery/tasks/job_execution.py (2)
  • execute_high_priority_task (12-23)
  • execute_low_priority_task (27-38)
backend/app/celery/celery_app.py (1)
backend/app/core/config.py (2)
  • RABBITMQ_URL (83-84)
  • REDIS_URL (94-97)
🪛 Ruff (0.12.2)
backend/app/celery/utils.py

6-6: typing.Dict is deprecated, use dict instead

(UP035)

backend/app/celery/beat.py

15-15: f-string without any placeholders

Remove extraneous f prefix

(F541)

backend/app/celery/tasks/job_execution.py

3-3: typing.Dict is deprecated, use dict instead

(UP035)

🪛 dotenv-linter (3.3.0)
.env.example

[warning] 50-50: [QuoteCharacter] The value has quote characters (', ")

(QuoteCharacter)


[warning] 50-50: [UnorderedKey] The LANGFUSE_PUBLIC_KEY key should go before the OPENAI_API_KEY key

(UnorderedKey)


[warning] 56-56: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)


[warning] 62-62: [UnorderedKey] The REDIS_DB key should go before the REDIS_HOST key

(UnorderedKey)


[warning] 63-63: [UnorderedKey] The REDIS_PASSWORD key should go before the REDIS_PORT key

(UnorderedKey)


[warning] 68-68: [UnorderedKey] The CELERY_WORKER_MAX_MEMORY_PER_CHILD key should go before the CELERY_WORKER_MAX_TASKS_PER_CHILD key

(UnorderedKey)


[warning] 69-69: [UnorderedKey] The CELERY_TASK_SOFT_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 70-70: [UnorderedKey] The CELERY_TASK_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 71-71: [UnorderedKey] The CELERY_TASK_MAX_RETRIES key should go before the CELERY_TASK_SOFT_TIME_LIMIT key

(UnorderedKey)


[warning] 72-72: [UnorderedKey] The CELERY_TASK_DEFAULT_RETRY_DELAY key should go before the CELERY_TASK_MAX_RETRIES key

(UnorderedKey)


[warning] 73-73: [UnorderedKey] The CELERY_RESULT_EXPIRES key should go before the CELERY_TASK_DEFAULT_RETRY_DELAY key

(UnorderedKey)


[warning] 74-74: [UnorderedKey] The CELERY_BROKER_POOL_LIMIT key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 76-76: [UnorderedKey] The CELERY_ENABLE_UTC key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 77-77: [UnorderedKey] The CELERY_TIMEZONE key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)

🔇 Additional comments (9)
backend/app/celery/__init__.py (1)

1-3: Re-export looks good.

Public celery_app export enables from app.celery import celery_app. No issues.

backend/app/celery/tasks/__init__.py (1)

1-3: Fix broken import in __init__.py

generic_task.py doesn’t exist under backend/app/celery/tasks and I couldn’t find def execute_job_task there. Confirm the correct source module (likely job_execution.py) and update:

-from .generic_task import execute_job_task
+from .job_execution import execute_job_task
backend/app/celery/worker.py (1)

35-41: Queues argument type may be incorrect for Worker.run.

queues=queues.split(",") might need to be a comma-separated string (CLI-style) rather than a list, depending on Celery version.

Would you confirm against your Celery version’s celery.bin.worker.WorkerCommand.run signature and expected types? If needed, change to:

-        queues=queues.split(","),
+        queues=queues,

If lists are accepted, disregard.

backend/app/celery/utils.py (2)

32-41: LGTM: high-priority enqueue wrapper.

Clean wrapper over the task; logging is appropriate.


61-69: LGTM: low-priority enqueue wrapper.

Consistent with the high-priority path.

backend/app/celery/celery_app.py (2)

22-37: Queue/routing config looks solid.

Priority caps align with route priorities; default/cron routes are sensible.


8-9: Ignore property accessor suggestion RABBITMQ_URL and REDIS_URL are defined as Pydantic BaseSettings fields (class attributes), not methods, so accessing them directly as attributes is correct.

Likely an incorrect or invalid review comment.

backend/app/celery/tasks/job_execution.py (2)

41-52: Nice centralization of execution flow.

Shared _execute_job_internal keeps duplication low and simplifies routing/observability.


56-60: Validate and restrict dynamic job execution paths: enforce that function_path is a dotted import path, check it against an ALLOWED_FUNCTION_PREFIXES whitelist, and verify the resolved attribute is callable; manually audit all call sites to ensure function_path cannot be driven by untrusted input.

- Added RabbitMQ and Redis configuration to .env.example and config.py.
- Implemented Celery application setup in celery_app.py with task routing and queue management.
- Created high and low priority task execution functions in job_execution.py.
- Developed utility functions for starting jobs and managing task status in utils.py.
- Added Celery beat scheduler for periodic tasks in beat.py.
- Implemented worker management script for starting Celery workers in worker.py.
- Updated dependencies in pyproject.toml to include Celery and Redis packages.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
backend/app/celery/beat.py (1)

33-34: Remove duplicate start_beat invocation; first call blocks.

Second call is unreachable and misleading.

     args = parser.parse_args()
     start_beat(args.loglevel)
-    start_beat(args.loglevel)
🧹 Nitpick comments (12)
backend/app/celery/tasks/__init__.py (1)

1-1: Optional: quiet linters and aid type-checkers

Some linters flag “imported but unused” in re-export patterns. Two lightweight options:

Apply one of the diffs below.

Option A (alias re-export, helps type-checkers):

-from .generic_task import execute_job_task
+from .generic_task import execute_job_task as execute_job_task  # re-export

Option B (suppress flake8 F401):

-from .generic_task import execute_job_task
+from .generic_task import execute_job_task  # noqa: F401
.env.example (3)

48-73: Sort env keys to satisfy dotenv-linter and keep sections consistent

Reorder RabbitMQ/Redis/Celery blocks alphabetically by key to silence linter warnings and reduce churn.

Apply this diff:

 # RabbitMQ Configuration (Celery Broker)
-RABBITMQ_HOST=localhost
-RABBITMQ_PORT=5672
-RABBITMQ_USER=guest
-RABBITMQ_PASSWORD=guest
-RABBITMQ_VHOST=/
+RABBITMQ_HOST=localhost
+RABBITMQ_PASSWORD=guest
+RABBITMQ_PORT=5672
+RABBITMQ_USER=guest
+RABBITMQ_VHOST=/
 
 # Redis Configuration (Celery Result Backend)
-REDIS_HOST=localhost
-REDIS_PORT=6379
-REDIS_DB=0
-REDIS_PASSWORD=
+REDIS_DB=0
+REDIS_HOST=localhost
+REDIS_PASSWORD=
+REDIS_PORT=6379
 
 # Celery Configuration
-CELERY_WORKER_CONCURRENCY=
-CELERY_WORKER_MAX_TASKS_PER_CHILD=1000
-CELERY_WORKER_MAX_MEMORY_PER_CHILD=200000
-CELERY_TASK_SOFT_TIME_LIMIT=300
-CELERY_TASK_TIME_LIMIT=600
-CELERY_TASK_MAX_RETRIES=3
-CELERY_TASK_DEFAULT_RETRY_DELAY=60
-CELERY_RESULT_EXPIRES=3600
-CELERY_BROKER_POOL_LIMIT=10
-CELERY_WORKER_PREFETCH_MULTIPLIER=1
-CELERY_ENABLE_UTC=true
-CELERY_TIMEZONE=UTC
+CELERY_BROKER_POOL_LIMIT=10
+CELERY_ENABLE_UTC=true
+CELERY_RESULT_EXPIRES=3600
+CELERY_TASK_DEFAULT_RETRY_DELAY=60
+CELERY_TASK_MAX_RETRIES=3
+CELERY_TASK_SOFT_TIME_LIMIT=300
+CELERY_TASK_TIME_LIMIT=600
+CELERY_TIMEZONE=UTC
+CELERY_WORKER_CONCURRENCY=
+CELERY_WORKER_MAX_MEMORY_PER_CHILD=200000
+CELERY_WORKER_MAX_TASKS_PER_CHILD=1000
+CELERY_WORKER_PREFETCH_MULTIPLIER=1

49-55: Prevent accidental use of guest credentials outside local

Make sure CI/staging/prod override RabbitMQ defaults; the guest user is local-only by RabbitMQ policy and insecure elsewhere.

If helpful, I can add comments like “DO NOT USE IN PROD” above these lines.


62-73: Clarify worker concurrency default

CELERY_WORKER_CONCURRENCY is blank. Consider documenting “auto” or setting a sane default to avoid surprises in containers with limited CPUs.

backend/app/celery/utils.py (3)

6-6: Drop deprecated/unused typing imports

Replace typing.Dict with built-in dict and remove Optional (unused).

-from typing import Any, Dict, Optional
+from typing import Any

72-88: Bind AsyncResult to our app and modernize typing

Ensure status queries use the configured Celery app and switch to built-in dict typing.

-def get_task_status(task_id: str) -> Dict[str, Any]:
+def get_task_status(task_id: str) -> dict[str, Any]:
@@
-    result = AsyncResult(task_id)
+    from app.celery.celery_app import celery_app
+    result = AsyncResult(task_id, app=celery_app)

Optional: sanitize non-serializable exception objects with repr() if this dict is returned via JSON.


91-109: Return value from revoke is best-effort; consider verifying state

control.revoke doesn’t indicate success. Optionally, follow with AsyncResult(task_id).state to confirm not RUNNING/PENDING and log accordingly.

backend/app/celery/tasks/job_execution.py (1)

1-5: Remove unused imports and simplify typing

current_task becomes unnecessary with self.request.id; Dict import is unused.

-import importlib
-from typing import Any, Dict
-from celery import current_task
+import importlib
+from typing import Any
backend/app/celery/beat.py (4)

15-15: Drop extraneous f-prefix (Ruff F541).

No placeholders in the string.

-    print(f"Starting Celery beat scheduler")
+    print("Starting Celery beat scheduler")

5-6: Remove unused import.

Celery isn’t referenced here.

-from celery import Celery
 from app.celery.celery_app import celery_app

25-30: Make log level case-insensitive and include 'critical'.

Small UX win; aligns with standard levels.

     parser.add_argument(
         "--loglevel",
         default="info",
-        choices=["debug", "info", "warning", "error"],
-        help="Logging level"
+        type=str.lower,
+        choices=["debug", "info", "warning", "error", "critical"],
+        help="Logging level (case-insensitive)"
     )

18-19: Add basic error handling around beat start.

Gracefully handle Ctrl-C and surface failures.

-    # Start the beat scheduler
-    celery_app.start(["celery", "beat", "-l", loglevel])
+    # Start the beat scheduler
+    try:
+        celery_app.start(["celery", "beat", "-l", loglevel])
+    except KeyboardInterrupt:
+        print("Celery beat stopped by user")
+    except Exception as exc:
+        print(f"Celery beat failed: {exc!r}")
+        raise
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 74c2a42 and 1b061de.

⛔ Files ignored due to path filters (1)
  • backend/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (10)
  • .env.example (1 hunks)
  • backend/app/celery/__init__.py (1 hunks)
  • backend/app/celery/beat.py (1 hunks)
  • backend/app/celery/celery_app.py (1 hunks)
  • backend/app/celery/tasks/__init__.py (1 hunks)
  • backend/app/celery/tasks/job_execution.py (1 hunks)
  • backend/app/celery/utils.py (1 hunks)
  • backend/app/celery/worker.py (1 hunks)
  • backend/app/core/config.py (1 hunks)
  • backend/pyproject.toml (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • backend/app/celery/init.py
  • backend/pyproject.toml
  • backend/app/core/config.py
  • backend/app/celery/celery_app.py
  • backend/app/celery/worker.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/celery/utils.py (1)
backend/app/celery/tasks/job_execution.py (2)
  • execute_high_priority_task (12-23)
  • execute_low_priority_task (27-38)
🪛 dotenv-linter (3.3.0)
.env.example

[warning] 52-52: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)


[warning] 58-58: [UnorderedKey] The REDIS_DB key should go before the REDIS_HOST key

(UnorderedKey)


[warning] 59-59: [UnorderedKey] The REDIS_PASSWORD key should go before the REDIS_PORT key

(UnorderedKey)


[warning] 64-64: [UnorderedKey] The CELERY_WORKER_MAX_MEMORY_PER_CHILD key should go before the CELERY_WORKER_MAX_TASKS_PER_CHILD key

(UnorderedKey)


[warning] 65-65: [UnorderedKey] The CELERY_TASK_SOFT_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 66-66: [UnorderedKey] The CELERY_TASK_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 67-67: [UnorderedKey] The CELERY_TASK_MAX_RETRIES key should go before the CELERY_TASK_SOFT_TIME_LIMIT key

(UnorderedKey)


[warning] 68-68: [UnorderedKey] The CELERY_TASK_DEFAULT_RETRY_DELAY key should go before the CELERY_TASK_MAX_RETRIES key

(UnorderedKey)


[warning] 69-69: [UnorderedKey] The CELERY_RESULT_EXPIRES key should go before the CELERY_TASK_DEFAULT_RETRY_DELAY key

(UnorderedKey)


[warning] 70-70: [UnorderedKey] The CELERY_BROKER_POOL_LIMIT key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 72-72: [UnorderedKey] The CELERY_ENABLE_UTC key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 73-73: [UnorderedKey] The CELERY_TIMEZONE key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)

🪛 Ruff (0.12.2)
backend/app/celery/utils.py

6-6: typing.Dict is deprecated, use dict instead

(UP035)

backend/app/celery/beat.py

15-15: f-string without any placeholders

Remove extraneous f prefix

(F541)

backend/app/celery/tasks/job_execution.py

3-3: typing.Dict is deprecated, use dict instead

(UP035)

🪛 GitHub Actions: AI Platform CI
backend/app/celery/utils.py

[error] 1-1: End-of-file fixer: Added newline at end of file.


[error] 1-1: Trailing whitespace removed by pre-commit hook.


[error] 1-1: Black formatting applied. File reformatted by Black.

backend/app/celery/beat.py

[error] 1-1: Trailing whitespace removed by pre-commit hook.


[error] 1-1: Black formatting applied. File reformatted by Black.

backend/app/celery/tasks/job_execution.py

[error] 1-1: End-of-file fixer: Added newline at end of file.


[error] 1-1: Trailing whitespace removed by pre-commit hook.


[error] 1-1: Black formatting applied. File reformatted by Black.

🔇 Additional comments (3)
backend/app/celery/tasks/__init__.py (1)

1-1: LGTM: clean re-export for package-level import

Re-exporting execute_job_task here is fine and will trigger generic_task import on package load.

backend/app/celery/tasks/job_execution.py (1)

11-38: Celery queues and routes for high_priority and low_priority are correctly configured
Verified in celery_app.py that task_queues includes Queue('high_priority') and Queue('low_priority') and task_routes maps the corresponding tasks to those queues.

backend/app/celery/beat.py (1)

1-1: Use explicit module invocation for formatting checks
Ensure black and ruff are invoked via python -m black and python -m ruff (or that their scripts are on PATH), then run:

pre-commit run --all-files --show-diff-on-failure

Commit any resulting changes.

Comment on lines 11 to 12
@celery_app.task(bind=True, queue="high_priority")
def execute_high_priority_task(self, function_path: str, user_id: int, job_id: str, **kwargs):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make tasks resilient to worker crashes

Enable acks_late and reject_on_worker_lost so tasks are re-queued on worker death after execution starts.

-@celery_app.task(bind=True, queue="high_priority")
+@celery_app.task(bind=True, queue="high_priority", acks_late=True, reject_on_worker_lost=True)
🤖 Prompt for AI Agents
In backend/app/celery/tasks/job_execution.py around lines 11-12 the task
decorator for execute_high_priority_task lacks settings to handle worker
crashes; modify the @celery_app.task decorator to include acks_late=True and
reject_on_worker_lost=True so the task message is acknowledged only after
successful execution and will be re-queued if the worker dies during execution;
keep bind=True and queue="high_priority" unchanged and ensure any existing task
options remain merged with these two flags.

Comment on lines 26 to 27
@celery_app.task(bind=True, queue="low_priority")
def execute_low_priority_task(self, function_path: str, user_id: int, job_id: str, **kwargs):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Mirror reliability flags for low-priority queue

-@celery_app.task(bind=True, queue="low_priority")
+@celery_app.task(bind=True, queue="low_priority", acks_late=True, reject_on_worker_lost=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@celery_app.task(bind=True, queue="low_priority")
def execute_low_priority_task(self, function_path: str, user_id: int, job_id: str, **kwargs):
@celery_app.task(
bind=True,
queue="low_priority",
acks_late=True,
reject_on_worker_lost=True,
)
def execute_low_priority_task(self, function_path: str, user_id: int, job_id: str, **kwargs):

Comment on lines 32 to 40
task = execute_high_priority_task.delay(
function_path=function_path,
user_id=user_id,
job_id=job_id,
**kwargs
)

logger.info(f"Started high priority job {job_id} with Celery task {task.id}")
return task.id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle broker outages when enqueueing high-priority jobs

Wrap delay() to surface broker connection errors instead of failing later on attribute access.

-    task = execute_high_priority_task.delay(
-        function_path=function_path,
-        user_id=user_id,
-        job_id=job_id,
-        **kwargs
-    )
+    try:
+        task = execute_high_priority_task.delay(
+            function_path=function_path,
+            user_id=user_id,
+            job_id=job_id,
+            **kwargs
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue high priority job {job_id}: {e}")
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
task = execute_high_priority_task.delay(
function_path=function_path,
user_id=user_id,
job_id=job_id,
**kwargs
)
logger.info(f"Started high priority job {job_id} with Celery task {task.id}")
return task.id
try:
task = execute_high_priority_task.delay(
function_path=function_path,
user_id=user_id,
job_id=job_id,
**kwargs
)
except Exception as e:
logger.error(f"Failed to enqueue high priority job {job_id}: {e}")
raise
logger.info(f"Started high priority job {job_id} with Celery task {task.id}")
return task.id
🤖 Prompt for AI Agents
In backend/app/celery/utils.py around lines 32-40, the call to
execute_high_priority_task.delay(...) is not protected against broker outages so
attribute access on the returned task later will raise obscure errors; wrap the
delay() call in a try/except that catches broker/connection-related exceptions
(e.g., kombu.exceptions.OperationalError,
celery.exceptions.BrokerConnectionError, and a general Exception fallback), log
the error with context including job_id and function_path, and re-raise a clear
RuntimeError (or a custom enqueue exception) that includes the original
exception as the cause so callers immediately see a surfaced broker connection
error instead of failing later when accessing task.id.

Comment on lines 61 to 69
task = execute_low_priority_task.delay(
function_path=function_path,
user_id=user_id,
job_id=job_id,
**kwargs
)

logger.info(f"Started low priority job {job_id} with Celery task {task.id}")
return task.id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle broker outages when enqueueing low-priority jobs

Mirror the protective try/except for low-priority.

-    task = execute_low_priority_task.delay(
-        function_path=function_path,
-        user_id=user_id,
-        job_id=job_id,
-        **kwargs
-    )
+    try:
+        task = execute_low_priority_task.delay(
+            function_path=function_path,
+            user_id=user_id,
+            job_id=job_id,
+            **kwargs
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue low priority job {job_id}: {e}")
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
task = execute_low_priority_task.delay(
function_path=function_path,
user_id=user_id,
job_id=job_id,
**kwargs
)
logger.info(f"Started low priority job {job_id} with Celery task {task.id}")
return task.id
try:
task = execute_low_priority_task.delay(
function_path=function_path,
user_id=user_id,
job_id=job_id,
**kwargs
)
except Exception as e:
logger.error(f"Failed to enqueue low priority job {job_id}: {e}")
raise
logger.info(f"Started low priority job {job_id} with Celery task {task.id}")
return task.id
🤖 Prompt for AI Agents
In backend/app/celery/utils.py around lines 61 to 69, the call to
execute_low_priority_task.delay is not protected against broker outages; wrap
the delay call in a try/except that mirrors the existing low-priority
protection: catch broker/connection exceptions (or broadly Exception) as e, log
an error including the exception details and job_id/context, and return a safe
value (e.g., None) when enqueueing fails instead of letting the exception
propagate.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (5)
backend/app/celery/celery_app.py (1)

6-13: Include list looks good; removed missing module.

The prior example_tasks entry is gone; this avoids import errors at startup.

backend/app/celery/utils.py (2)

32-40: Guard enqueue against broker outages (surface clear errors).

Wrap delay() with try/except; log and re-raise to avoid later attribute errors when task is None.

-    task = execute_high_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        **kwargs
-    )
-    
-    logger.info(f"Started high priority job {job_id} with Celery task {task.id}")
-    return task.id
+    try:
+        task = execute_high_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            **kwargs
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue high priority job {job_id} ({function_path}): {e}")
+        raise
+    logger.info(f"Started high priority job {job_id} with Celery task {task.id}")
+    return task.id

61-69: Mirror broker-outage handling for low-priority.

-    task = execute_low_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        **kwargs
-    )
-    
-    logger.info(f"Started low priority job {job_id} with Celery task {task.id}")
-    return task.id
+    try:
+        task = execute_low_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            **kwargs
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue low priority job {job_id} ({function_path}): {e}")
+        raise
+    logger.info(f"Started low priority job {job_id} with Celery task {task.id}")
+    return task.id
backend/app/celery/tasks/job_execution.py (2)

11-12: Per-task acks_late/reject_on_worker_lost not required now.

You set task_acks_late and task_reject_on_worker_lost globally in celery_app.conf; no need to duplicate on decorators.

Also applies to: 26-27


55-60: Validate function_path to prevent RCE; ensure target is callable.

Unvalidated dynamic imports are a code-execution risk if user-controlled. Enforce an allowlist and callability checks.

-        # Dynamically import and resolve the function
-        module_path, function_name = function_path.rsplit('.', 1)
+        # Validate and resolve the function safely
+        if not function_path or "." not in function_path:
+            raise ValueError("Invalid function_path")
+        ALLOWED_FUNCTION_PREFIXES = ("app.core.", "app.services.", "app.jobs.")
+        if not function_path.startswith(ALLOWED_FUNCTION_PREFIXES):
+            raise ValueError(f"Disallowed function_path: {function_path}")
+        module_path, function_name = function_path.rsplit('.', 1)
         module = importlib.import_module(module_path)
-        execute_function = getattr(module, function_name)
+        execute_function = getattr(module, function_name)
+        if not callable(execute_function):
+            raise TypeError(f"Target is not callable: {function_path}")
🧹 Nitpick comments (7)
backend/app/celery/tasks/__init__.py (1)

1-1: Empty public API is fine; consider removing the module if unused.

If the package won’t re-export anything, you can delete this file to reduce noise, or explicitly re-export tasks for DX.

backend/Dockerfile (1)

19-24: Align compose mounts with new venv path.

You’ve moved the venv to /opt/venv. The dev compose still mounts /app/.venv; drop that mount (or switch to /opt/venv) to avoid confusion.

.env.example (1)

61-76: Tidy variable order to satisfy dotenv linters (optional).

Reorder keys as suggested by dotenv-linter to keep CI/lint green.

backend/Dockerfile.celery (1)

11-12: Clean apt lists to shrink the image.

Add rm -rf /var/lib/apt/lists/* after apt-get install.

-RUN apt-get update && apt-get install -y curl poppler-utils
+RUN apt-get update && apt-get install -y curl poppler-utils && rm -rf /var/lib/apt/lists/*
backend/app/celery/celery_app.py (1)

95-96: Consider dropping autodiscover_tasks() for faster startup.

You already specify include=[...]; autodiscover may be redundant and slows boot.

-celery_app.autodiscover_tasks()
+# Autodiscovery not needed since include=... is explicit.
backend/app/celery/utils.py (1)

5-8: Modernize typing; drop unused Optional.

Use built-in generics and remove Optional import (unused).

-import logging
-from typing import Any, Dict, Optional
+import logging
+from typing import Any
 from celery.result import AsyncResult
@@
-def get_task_status(task_id: str) -> Dict[str, Any]:
+def get_task_status(task_id: str) -> dict[str, Any]:

Also applies to: 72-88

backend/app/celery/tasks/job_execution.py (1)

1-5: Use the bound task instance instead of current_task.

current_task is unnecessary and can be None in edge cases; use task_instance.request.id and drop the import.

-import logging
-import importlib
-from typing import Any, Dict
-from celery import current_task
+import logging
+import importlib
+from typing import Any, Dict
@@
-    task_id = current_task.request.id
+    task_id = task_instance.request.id

Also applies to: 53-55

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1b061de and 7685ee1.

📒 Files selected for processing (9)
  • .env.example (1 hunks)
  • backend/Dockerfile (1 hunks)
  • backend/Dockerfile.celery (1 hunks)
  • backend/app/celery/celery_app.py (1 hunks)
  • backend/app/celery/tasks/__init__.py (1 hunks)
  • backend/app/celery/tasks/job_execution.py (1 hunks)
  • backend/app/celery/utils.py (1 hunks)
  • backend/app/core/config.py (2 hunks)
  • docker-compose.dev.yml (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/core/config.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/celery/celery_app.py (1)
backend/app/core/config.py (3)
  • RABBITMQ_URL (84-85)
  • REDIS_URL (95-98)
  • COMPUTED_CELERY_WORKER_CONCURRENCY (123-128)
backend/app/celery/utils.py (1)
backend/app/celery/tasks/job_execution.py (2)
  • execute_high_priority_task (12-23)
  • execute_low_priority_task (27-38)
🪛 Ruff (0.12.2)
backend/app/celery/tasks/job_execution.py

3-3: typing.Dict is deprecated, use dict instead

(UP035)

backend/app/celery/utils.py

6-6: typing.Dict is deprecated, use dict instead

(UP035)

🪛 GitHub Actions: AI Platform CI
backend/app/celery/tasks/job_execution.py

[error] 1-1: End-of-file fixer reformatted file to ensure newline at end of file.


[error] 1-1: Trailing whitespace removed by pre-commit hook trailing-whitespace.

backend/app/celery/celery_app.py

[error] 1-1: Trailing whitespace removed by pre-commit hook trailing-whitespace.

backend/app/celery/utils.py

[error] 1-1: End-of-file fixer reformatted file to ensure newline at end of file.


[error] 1-1: Trailing whitespace removed by pre-commit hook trailing-whitespace.

🪛 dotenv-linter (3.3.0)
.env.example

[warning] 52-52: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)


[warning] 58-58: [UnorderedKey] The REDIS_DB key should go before the REDIS_HOST key

(UnorderedKey)


[warning] 59-59: [UnorderedKey] The REDIS_PASSWORD key should go before the REDIS_PORT key

(UnorderedKey)


[warning] 65-65: [UnorderedKey] The CELERY_WORKER_MAX_MEMORY_PER_CHILD key should go before the CELERY_WORKER_MAX_TASKS_PER_CHILD key

(UnorderedKey)


[warning] 66-66: [UnorderedKey] The CELERY_TASK_SOFT_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 67-67: [UnorderedKey] The CELERY_TASK_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 68-68: [UnorderedKey] The CELERY_TASK_MAX_RETRIES key should go before the CELERY_TASK_SOFT_TIME_LIMIT key

(UnorderedKey)


[warning] 69-69: [UnorderedKey] The CELERY_TASK_DEFAULT_RETRY_DELAY key should go before the CELERY_TASK_MAX_RETRIES key

(UnorderedKey)


[warning] 70-70: [UnorderedKey] The CELERY_RESULT_EXPIRES key should go before the CELERY_TASK_DEFAULT_RETRY_DELAY key

(UnorderedKey)


[warning] 71-71: [UnorderedKey] The CELERY_BROKER_POOL_LIMIT key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 73-73: [UnorderedKey] The CELERY_ENABLE_UTC key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 75-75: [UnorderedKey] The CELERY_TIMEZONE key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)

🔇 Additional comments (1)
backend/app/celery/celery_app.py (1)

81-84: Double-check broker URL construction for vhost “/”.

Ensure settings.RABBITMQ_URL correctly encodes the vhost (// or %2F). Mis-encoding will break broker connection.

Comment on lines +48 to +55
# RabbitMQ Configuration (Celery Broker)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/

# Redis Configuration (Celery Result Backend)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid RabbitMQ “guest” user across containers.

RabbitMQ’s “guest” is restricted to localhost. Use a non-guest user and update docker-compose to create it; mirror the creds in RABBITMQ_USER/PASSWORD.

Apply in example env:

-RABBITMQ_USER=guest
-RABBITMQ_PASSWORD=guest
+RABBITMQ_USER=app
+RABBITMQ_PASSWORD=change_me
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# RabbitMQ Configuration (Celery Broker)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
# Redis Configuration (Celery Result Backend)
# RabbitMQ Configuration (Celery Broker)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=app
RABBITMQ_PASSWORD=change_me
RABBITMQ_VHOST=/
# Redis Configuration (Celery Result Backend)
🧰 Tools
🪛 dotenv-linter (3.3.0)

[warning] 52-52: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)

🤖 Prompt for AI Agents
In .env.example around lines 48 to 55, the file currently uses the RabbitMQ
default guest/localhost settings which won’t work across Docker containers;
replace RABBITMQ_USER and RABBITMQ_PASSWORD with placeholders for a non-guest
user (e.g. RABBITMQ_USER=your_rabbit_user,
RABBITMQ_PASSWORD=your_rabbit_password) and update RABBITMQ_HOST to the RabbitMQ
service name used in docker-compose (e.g. rabbitmq) and then update
docker-compose.yml to create that user and password (or use environment vars to
set RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS) so the example env and
compose file match.

Comment on lines +49 to +55
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/

# Redis Configuration (Celery Result Backend)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm vhost URL handling (default “/”).

With RABBITMQ_VHOST=/ and the current URL builder, ensure the final AMQP URL encodes the vhost correctly (// or %2F) to avoid malformed broker URLs.


🌐 Web query:

For RabbitMQ AMQP URLs (Celery/kombu), how should the default vhost "/" be represented in the URL path: as a double slash ("//") or as "%2F"?

💡 Result:

Short answer: use %2F (percent-encoded). RabbitMQ's URI spec requires "/" in a vhost name to be percent-encoded as %2F; Celery/Kombu historically accept the shorthand // for the default vhost, but the correct, portable form is amqp://host/%2F. [1][2][3]

Sources:

  • RabbitMQ URI spec (vhost encoding). [1]
  • Celery docs example using 'amqp://...//'. [2]
  • Kombu docs/changelog explaining how the path maps to vhost and that '/' must be encoded. [3]

Use percent-encoded default vhost (%2F)
Change the example default from

RABBITMQ_VHOST=/

to

RABBITMQ_VHOST=%2F

so that the generated AMQP URL becomes amqp://…/%2F, as required by the RabbitMQ URI spec.

🧰 Tools
🪛 dotenv-linter (3.3.0)

[warning] 52-52: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)

🤖 Prompt for AI Agents
In .env.example around lines 49 to 55, the RABBITMQ_VHOST default uses an
unencoded slash; update the example value to the percent-encoded form (%2F) so
generated AMQP URLs include %2F instead of /, ensuring the URI follows the
RabbitMQ spec; simply replace the RABBITMQ_VHOST entry from "/" to "%2F" in the
file.

Comment on lines +12 to +23
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
volumes:
- rabbitmq_data:/var/lib/rabbitmq
networks:
- app-network
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Use non-guest RabbitMQ creds and drop stale .venv mount.

  • Switch RabbitMQ to a non-guest user and mirror in backend/worker env.
  • Remove the unused /app/.venv volume (venv now lives in /opt/venv).
   rabbitmq:
     image: rabbitmq:3-management-alpine
@@
-    environment:
-      RABBITMQ_DEFAULT_USER: guest
-      RABBITMQ_DEFAULT_PASS: guest
+    environment:
+      RABBITMQ_DEFAULT_USER: app
+      RABBITMQ_DEFAULT_PASS: appsecret
@@
   backend:
@@
-      - RABBITMQ_USER=guest
-      - RABBITMQ_PASSWORD=guest
+      - RABBITMQ_USER=app
+      - RABBITMQ_PASSWORD=appsecret
@@
-    volumes:
-      - ./backend:/app  # Mount for live code changes
-      - /app/.venv  # Exclude .venv from volume mount
+    volumes:
+      - ./backend:/app  # Mount for live code changes
@@
   celery-worker:
@@
-      - RABBITMQ_USER=guest
-      - RABBITMQ_PASSWORD=guest
+      - RABBITMQ_USER=app
+      - RABBITMQ_PASSWORD=appsecret
@@
-    volumes:
-      - ./backend:/app  # Mount for live code changes
-      - /app/.venv  # Exclude .venv from volume mount
+    volumes:
+      - ./backend:/app  # Mount for live code changes

Also applies to: 38-66, 67-94

🤖 Prompt for AI Agents
In docker-compose.dev.yml around lines 12 to 23 (and also apply same changes to
ranges 38-66 and 67-94), the RabbitMQ service is using the default guest
credentials and there's a stale /app/.venv volume mount referenced elsewhere;
change RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS to a non-guest
username/password (and update the corresponding BACKEND/WORKER environment files
to match), and remove any volume mapping that mounts /app/.venv (since the venv
now lives at /opt/venv) — ensure docker-compose and backend/worker env files use
the new credentials and delete the obsolete volume entry from the compose file
and any service volume lists.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
backend/README.md (5)

180-193: Fix list style to satisfy markdownlint (MD004) and keep style consistent across the doc.

Switch “-” to “*” for unordered lists in this section.

-- Create .venv (in backend/):
+* Create .venv (in backend/):
 ...
-- Activate it:
+* Activate it:
 ...
-- Reinstall dependencies:
+* Reinstall dependencies:

196-202: Clarify build guidance and fix “tag” → “flag”; show both commands.

Current text says “use --build” but the example always includes it.

-- In root folder:
-
-If source code in backend has changed since last run, use ```--build``` else skip tag
-```shell
-docker compose -f docker-compose.dev.yml up --build
-```
+* In repo root:
+```shell
+docker compose -f docker-compose.dev.yml up
+```
+If backend code changed since the last run, rebuild with the flag:
+```shell
+docker compose -f docker-compose.dev.yml up --build
+```

203-212: Avoid duplicating migration instructions; link to the “Migrations” section or scope to container usage.

This repeats content from “Migrations” above and can drift. Either reference the section or provide the container-scoped one-liner.

Option A (reference):

- - Run database migrations and upgrade head from backend/ (optional - only if any change in db models since last run)
-
-```shell
-alembic revision --autogenerate -m "your-revision-message"
-```
-
-```shell
-alembic upgrade head
-```
+* Migrations: see the “Migrations” section above.

Option B (container-scoped):

- - Run database migrations and upgrade head from backend/ (optional - only if any change in db models since last run)
-
-```shell
-alembic revision --autogenerate -m "your-revision-message"
-```
-
-```shell
-alembic upgrade head
-```
+* Run inside the backend container (only if models changed):
+```shell
+docker compose exec backend bash -lc 'alembic revision --autogenerate -m "your-revision-message" && alembic upgrade head'
+```

175-212: Deduplicate venv guidance with the earlier “General Workflow” section.

The new venv/uv steps largely repeat lines 16–29. Prefer a single source of truth to prevent drift.

-### Setup venv (if not already done) in backend/
-...
-uv sync
+See “General Workflow” above for venv/uv setup (create/activate .venv and run `uv sync`).

175-176: Remove date from the “Dev setup” section header to keep it evergreen.

-## Dev setup (September-2025)
+## Dev setup
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7685ee1 and 58865fb.

📒 Files selected for processing (1)
  • backend/README.md (1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.17.2)
backend/README.md

180-180: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


185-185: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


189-189: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


196-196: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


203-203: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)

🪛 GitHub Actions: AI Platform CI
backend/README.md

[error] 1-1: End-of-file fixer changed file to ensure proper newline at end of file.

🔇 Additional comments (1)
backend/README.md (1)

1-1: FYI: CI fixed missing newline at EOF.

No action needed; just keep an EOF newline to avoid churn.

```shell
python3.12 -m venv .venv
```
Note: If venv is broken, remove it first before running above command: ```rm -rf .venv```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Invalid fenced code inside a sentence; use inline code instead.

Triple backticks inside a paragraph break rendering.

-Note: If venv is broken, remove it first before running above command: ```rm -rf .venv```
+Note: If the venv is broken, remove it first before running the command: `rm -rf .venv`
🤖 Prompt for AI Agents
In backend/README.md around line 184, the sentence contains a fenced code block
marker (triple backticks) inside a paragraph which breaks rendering; replace the
fenced block with inline code formatting and update the sentence to: "Note: If
the venv is broken, remove it first before running the command: `rm -rf .venv`"
ensuring you remove the stray backticks and use single backticks around the
command.

Comment on lines 195 to 201

- In root folder:

If source code in backend has changed since last run, use ```--build``` else skip tag
```shell
docker compose -f docker-compose.dev.yml up --build
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add prerequisite to create/populate .env before starting the stack.

With new RabbitMQ/Redis/Celery settings, missing envs will cause boot failures.

 ### Run everything in docker: infrastructure + backend + celery
 
-- In root folder:
+* Ensure your environment is configured (e.g., copy and edit): `cp .env.example .env`
+* In repo root:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- In root folder:
If source code in backend has changed since last run, use ```--build``` else skip tag
```shell
docker compose -f docker-compose.dev.yml up --build
```
### Run everything in docker: infrastructure + backend + celery
* Ensure your environment is configured (e.g., copy and edit): `cp .env.example .env`
* In repo root:
If source code in backend has changed since last run, use `--build` else skip tag
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

196-196: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)

🤖 Prompt for AI Agents
In backend/README.md around lines 195-201, add a prerequisite step instructing
developers to create and populate the .env file before starting the Docker
stack; explicitly list the required environment variables for RabbitMQ, Redis,
and Celery (host/port/username/password/virtual host or whatever keys the app
reads), point to an example .env.template or create one if missing, and instruct
to export or place the .env in the backend root so docker-compose can load it
prior to running docker compose -f docker-compose.dev.yml up --build.

Copy link

codecov bot commented Sep 18, 2025

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (7)
backend/README.md (2)

184-184: Fix fenced code inside sentence; use inline code.

Triple backticks break rendering here. Replace with inline code.

-Note: If venv is broken, remove it first before running above command: ```rm -rf .venv```
+Note: If the venv is broken, remove it first before running the command: `rm -rf .venv`

194-201: Add prerequisite to create/populate .env before starting the stack.

Without the new RabbitMQ/Redis/Celery envs, the stack may fail to boot.

 ### Run everything in docker: infrastructure + backend + celery
 
- * In repo root:
+ * Ensure your environment is configured (copy and edit):
+   ```shell
+   cp .env.example .env
+   ```
+   Populate RabbitMQ, Redis, and Celery variables as needed.
+ * In repo root:
backend/app/celery/beat.py (1)

35-36: Remove duplicate start, fix unnecessary f-string, drop unused import.

First call to start_beat blocks; the second is redundant. Also remove the stray f in a plain string and unused Celery import.

Apply:

@@
-from celery import Celery
 from app.celery.celery_app import celery_app
@@
-    print(f"Starting Celery beat scheduler")
+    print("Starting Celery beat scheduler")
@@
-    start_beat(args.loglevel)
-    start_beat(args.loglevel)
+    start_beat(args.loglevel)

Also applies to: 16-17, 5-5

backend/app/celery/tasks/job_execution.py (2)

83-84: Use the bound task instance for task_id (avoid global current_task).

Prefer task_instance.request.id to reduce reliance on global context.

-    task_id = current_task.request.id
+    task_id = task_instance.request.id

12-20: Per‑task reliability flags optional; global config already sets them.

No changes needed; decorators can stay lean since celery_app.conf sets ack‑late and reject‑on‑worker‑lost.

Also applies to: 37-45, 49-59

backend/app/celery/utils.py (2)

33-41: Protect enqueue against broker outages and surface clear errors.

Wrap delay() to catch connection errors and fail fast with context.

-    task = execute_high_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        trace_id=trace_id,
-        **kwargs,
-    )
+    try:
+        task = execute_high_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            trace_id=trace_id,
+            **kwargs,
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue high priority job {job_id} ({function_path}): {e}")
+        raise

61-69: Mirror the same protection for low‑priority.

-    task = execute_low_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        trace_id=trace_id,
-        **kwargs,
-    )
+    try:
+        task = execute_low_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            trace_id=trace_id,
+            **kwargs,
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue low priority job {job_id} ({function_path}): {e}")
+        raise
🧹 Nitpick comments (8)
backend/README.md (4)

180-180: Align list marker style to satisfy markdownlint (MD004).

The file predominantly uses asterisks; switch these dashes to asterisks.

- - Create .venv (in backend/):
+ * Create .venv (in backend/):
- - Activate it:
+ * Activate it:
- - Reinstall dependencies:
+ * Reinstall dependencies:
- - In root folder:
+ * In repo root:
- - Run database migrations and upgrade head from backend/ (optional - only if any change in db models since last run)
+ * Run database migrations and upgrade head (optional — only if DB models changed since last run)

Also applies to: 185-185, 189-189, 196-196, 203-203


198-201: Clarify optional --build usage and avoid backticks in prose.

Show both commands; use inline code for flags.

-If source code in backend has changed since last run, use ```--build``` else skip tag
-```shell
-docker compose -f docker-compose.dev.yml up --build
-```
+If backend code changed since the last run, include the optional `--build` flag; otherwise omit it.
+```shell
+docker compose -f docker-compose.dev.yml up --build
+```
+or, without rebuilding:
+```shell
+docker compose -f docker-compose.dev.yml up
+```

175-193: Optional: let uv manage the virtualenv to simplify setup.

Since the project standardizes on uv, you can skip manual venv creation.

-### Setup venv (if not already done) in backend/
-
- * Create .venv (in backend/):
-```shell
-python3.12 -m venv .venv
-```
-Note: If venv is broken, remove it first before running the command: `rm -rf .venv`
- * Activate it:
-```shell
-source .venv/bin/activate
-```
- * Reinstall dependencies:
-```shell
-uv sync
-```
+### Environment setup (managed by uv)
+
+* From `backend/`, sync deps (uv will create/refresh `.venv` automatically):
+```shell
+uv sync
+```
+If you prefer a clean slate:
+```shell
+rm -rf .venv && uv sync
+```

203-211: Clarify where Alembic commands run; prefer running inside the container.

Avoid confusion between host and container contexts; run via compose exec.

-* Run database migrations and upgrade head (optional — only if DB models changed since last run)
+* Run database migrations and upgrade head inside the backend container (optional — only if DB models changed since last run)
 
-```shell
-alembic revision --autogenerate -m "your-revision-message"
-```
+```shell
+docker compose exec backend alembic revision --autogenerate -m "your-revision-message"
+```
 
-```shell
-alembic upgrade head
-```
+```shell
+docker compose exec backend alembic upgrade head
+```
backend/app/celery/worker.py (2)

5-7: Drop unused imports.

-import os
-import sys
 import multiprocessing

26-28: Use a single source of truth for concurrency.

Use settings.COMPUTED_CELERY_WORKER_CONCURRENCY to mirror config logic.

-    if concurrency is None:
-        concurrency = settings.CELERY_WORKER_CONCURRENCY or multiprocessing.cpu_count()
+    if concurrency is None:
+        concurrency = settings.COMPUTED_CELERY_WORKER_CONCURRENCY
backend/app/celery/utils.py (1)

6-6: Modernize typing and remove unused import.

Use built‑in dict for annotations and drop Optional (unused).

-from typing import Any, Dict, Optional
+from typing import Any
@@
-def get_task_status(task_id: str) -> Dict[str, Any]:
+def get_task_status(task_id: str) -> dict[str, Any]:

Also applies to: 73-89

backend/app/celery/tasks/job_execution.py (1)

3-3: Drop unused Dict import.

-from typing import Any, Dict
+from typing import Any
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 58865fb and 0757b88.

📒 Files selected for processing (7)
  • backend/README.md (1 hunks)
  • backend/app/celery/beat.py (1 hunks)
  • backend/app/celery/celery_app.py (1 hunks)
  • backend/app/celery/tasks/job_execution.py (1 hunks)
  • backend/app/celery/utils.py (1 hunks)
  • backend/app/celery/worker.py (1 hunks)
  • backend/app/core/config.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/core/config.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/celery/celery_app.py (1)
backend/app/core/config.py (3)
  • RABBITMQ_URL (84-85)
  • REDIS_URL (95-98)
  • COMPUTED_CELERY_WORKER_CONCURRENCY (123-128)
backend/app/celery/utils.py (1)
backend/app/celery/tasks/job_execution.py (2)
  • execute_high_priority_task (13-34)
  • execute_low_priority_task (38-59)
🪛 Ruff (0.12.2)
backend/app/celery/tasks/job_execution.py

3-3: typing.Dict is deprecated, use dict instead

(UP035)

backend/app/celery/beat.py

16-16: f-string without any placeholders

Remove extraneous f prefix

(F541)

backend/app/celery/utils.py

6-6: typing.Dict is deprecated, use dict instead

(UP035)

🪛 markdownlint-cli2 (0.17.2)
backend/README.md

180-180: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


185-185: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


189-189: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


196-196: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)


203-203: Unordered list style
Expected: asterisk; Actual: dash

(MD004, ul-style)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/celery/celery_app.py (1)

49-65: Global reliability flags are set correctly.

ack-late and reject-on-worker-lost are enabled globally; no need to duplicate them per-task.

backend/app/celery/worker.py (1)

36-37: Confirm worker.run expects a comma‑separated string for queues

backend/app/celery/worker.py:36 currently does queues=queues.split(","). The CLI uses -Q a,b,c — verify whether the programmatic API (celery worker.run for your Celery version) expects a comma‑separated string or a list; calling .split() on an already‑list will error or misroute.

I couldn't import celery in the verification environment (ModuleNotFoundError). Run locally to inspect the signature:

python - <<'PY'
import inspect
from celery.bin import worker as w
print("worker.run:", getattr(w.worker, "run", None))
print("signature:", inspect.signature(w.worker.run))
PY

If worker.run accepts lists, make the code tolerant:

if isinstance(queues, str):
    queues = [q.strip() for q in queues.split(",") if q.strip()]

Comment on lines +62 to +70
def _execute_job_internal(
task_instance,
function_path: str,
project_id: int,
job_id: str,
priority: str,
trace_id: str,
**kwargs,
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Harden dynamic import to prevent arbitrary code execution.

Validate function_path, restrict to allow‑listed prefixes, and ensure callability before invocation.

 def _execute_job_internal(
@@
-    try:
-        # Dynamically import and resolve the function
-        module_path, function_name = function_path.rsplit(".", 1)
+    try:
+        # Validate and resolve the function safely
+        if not function_path or "." not in function_path:
+            raise ValueError("Invalid function_path")
+        ALLOWED_FUNCTION_PREFIXES = ("app.core.", "app.services.")
+        if not function_path.startswith(ALLOWED_FUNCTION_PREFIXES):
+            raise ValueError(f"Disallowed function_path: {function_path}")
+
+        module_path, function_name = function_path.rsplit(".", 1)
         module = importlib.import_module(module_path)
-        execute_function = getattr(module, function_name)
+        execute_function = getattr(module, function_name, None)
+        if not callable(execute_function):
+            raise TypeError(f"Target is not callable: {function_path}")
@@
-        result = execute_function(
+        result = execute_function(
             project_id=project_id,
             job_id=job_id,
             task_id=task_id,
             task_instance=task_instance,  # For progress updates, retries if needed
             **kwargs,
         )

Also applies to: 88-106

Comment on lines 39 to 42
without_gossip=True,
without_mingle=True,
without_heartbeat=True,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid disabling heartbeat in production.

without_heartbeat=True weakens liveness and can mask dead workers. Keep heartbeats on or make it configurable.

Apply:

-        without_heartbeat=True,
+        # keep heartbeat enabled for liveness
+        without_heartbeat=False,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
without_gossip=True,
without_mingle=True,
without_heartbeat=True,
)
without_gossip=True,
without_mingle=True,
# keep heartbeat enabled for liveness
without_heartbeat=False,
)
🤖 Prompt for AI Agents
In backend/app/celery/worker.py around lines 39 to 42, the worker is started
with without_heartbeat=True which disables heartbeats; revert this to keep
heartbeats enabled and make it configurable: replace the hardcoded flag with a
config-driven value (e.g., read from environment or app settings), default it to
False (heartbeats enabled), and ensure production config does not set it true
(optionally log or raise if an unsafe value is detected in production); update
any startup docs/tests to reflect the new config option.

Once you have the MJML extension installed, you can create a new email template in the `src` directory. After creating the new email template and with the `.mjml` file open in your editor, open the command palette with `Ctrl+Shift+P` and search for `MJML: Export to HTML`. This will convert the `.mjml` file to a `.html` file and now you can save it in the build directory.


## Dev setup (September-2025)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this heading? Is it going to change with time?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not seem like it is removed though, and the lines added in this readme are already there, curious as to why they are written again

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
backend/app/celery/beat.py (1)

18-18: Drop unnecessary f-string.

No placeholders present.

-    logger.info(f"Starting Celery beat scheduler")
+    logger.info("Starting Celery beat scheduler")
backend/app/celery/worker.py (2)

28-31: Use logging instead of print for observability.

Route through a module logger.

+import logging
@@
-    print(f"Starting Celery worker with {concurrency} processes")
-    print(f"Consuming queues: {queues}")
-    print(f"Log level: {loglevel}")
+    logger = logging.getLogger(__name__)
+    logger.info("Starting Celery worker with %s processes", concurrency)
+    logger.info("Consuming queues: %s", queues)
+    logger.info("Log level: %s", loglevel)

25-27: Guard against empty/whitespace queues.

Fail fast to avoid starting an idle worker.

     if concurrency is None:
         concurrency = settings.CELERY_WORKER_CONCURRENCY or multiprocessing.cpu_count()
 
+    queues = (queues or "").strip()
+    if not queues:
+        raise ValueError("No queues specified; provide --queues or configure defaults.")
backend/app/celery/utils.py (1)

6-7: Use built-in collection types in annotations (ruff UP035).

-from typing import Any, Dict, Optional
+from typing import Any
@@
-def get_task_status(task_id: str) -> Dict[str, Any]:
+def get_task_status(task_id: str) -> dict[str, Any]:

Also applies to: 74-90

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0757b88 and 16ffb97.

📒 Files selected for processing (4)
  • backend/app/celery/beat.py (1 hunks)
  • backend/app/celery/tasks/job_execution.py (1 hunks)
  • backend/app/celery/utils.py (1 hunks)
  • backend/app/celery/worker.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/celery/tasks/job_execution.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/celery/utils.py (1)
backend/app/celery/tasks/job_execution.py (2)
  • execute_high_priority_task (12-33)
  • execute_low_priority_task (37-58)
🪛 Ruff (0.13.1)
backend/app/celery/beat.py

18-18: f-string without any placeholders

Remove extraneous f prefix

(F541)

backend/app/celery/utils.py

6-6: typing.Dict is deprecated, use dict instead

(UP035)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (6)
backend/app/celery/beat.py (1)

35-36: Remove duplicate start_beat call (first call blocks).

Second invocation is unreachable and misleading.

     args = parser.parse_args()
     start_beat(args.loglevel)
-    start_beat(args.loglevel)
backend/app/celery/worker.py (3)

12-16: Default queues omit high/low; “long_running” is undefined elsewhere.

Include high_priority and low_priority so routed tasks are consumed.

-def start_worker(
-    queues: str = "default,long_running,cron",
+def start_worker(
+    queues: str = "default,high_priority,low_priority,cron",
@@
     parser.add_argument(
         "--queues",
-        default="default,long_running,cron",
+        default="default,high_priority,low_priority,cron",
         help="Comma-separated list of queues to consume",
     )

Also applies to: 49-52


38-41: Avoid disabling heartbeat by default.

Disabling heartbeats masks dead workers; keep them enabled unless explicitly configured.

-        without_heartbeat=True,
+        # keep heartbeat enabled for liveness
+        without_heartbeat=False,

35-36: Bug: queues must be a comma-separated string, not a list.

celery.bin.worker expects the -Q/queues option as a string; passing a list can break parsing.

-    worker_instance.run(
-        queues=queues.split(","),
+    worker_instance.run(
+        queues=queues,
         concurrency=concurrency,
         loglevel=loglevel,
         without_gossip=True,
         without_mingle=True,
         without_heartbeat=True,
     )

Likely an incorrect or invalid review comment.

backend/app/celery/utils.py (2)

34-43: Surface broker enqueue failures for high-priority jobs.

Wrap delay() so callers don’t crash later on task.id access.

-    task = execute_high_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        trace_id=trace_id,
-        **kwargs,
-    )
-
-    logger.info(f"Started high priority job {job_id} with Celery task {task.id}")
-    return task.id
+    try:
+        task = execute_high_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            trace_id=trace_id,
+            **kwargs,
+        )
+    except Exception as e:
+        logger.error("Failed to enqueue high priority job %s: %s", job_id, e)
+        raise
+
+    logger.info("Started high priority job %s with Celery task %s", job_id, task.id)
+    return task.id

62-71: Mirror enqueue protection for low-priority jobs.

-    task = execute_low_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        trace_id=trace_id,
-        **kwargs,
-    )
-
-    logger.info(f"Started low priority job {job_id} with Celery task {task.id}")
-    return task.id
+    try:
+        task = execute_low_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            trace_id=trace_id,
+            **kwargs,
+        )
+    except Exception as e:
+        logger.error("Failed to enqueue low priority job %s: %s", job_id, e)
+        raise
+
+    logger.info("Started low priority job %s with Celery task %s", job_id, task.id)
+    return task.id

Comment on lines +84 to +90
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result,
"info": result.info,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bind AsyncResult to the configured app.

Ensures correct backend/broker when multiple Celery apps are present.

-    result = AsyncResult(task_id)
+    result = AsyncResult(task_id, app=celery_app)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result,
"info": result.info,
}
result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": result.status,
"result": result.result,
"info": result.info,
}
🤖 Prompt for AI Agents
In backend/app/celery/utils.py around lines 84 to 90, the AsyncResult is created
using the global constructor which may bind to the wrong Celery instance; import
or reference the configured Celery app and call its AsyncResult (e.g.,
configured_app.AsyncResult(task_id)) so the result is bound to the correct
backend/broker, then return the same dict structure; ensure the configured app
is the one exported by your Celery setup and remove the direct AsyncResult(...)
call.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
backend/app/celery/beat.py (2)

18-18: Remove unnecessary f-string prefix.

The f-string doesn't contain any placeholders, making the f prefix redundant.

-    logger.info(f"Starting Celery beat scheduler")
+    logger.info("Starting Celery beat scheduler")

5-5: Remove unused import.

The Celery import is not used in this file since celery_app is imported directly.

-from celery import Celery
backend/app/celery/utils.py (2)

6-6: Modernize type annotation.

Use dict instead of deprecated typing.Dict for Python 3.9+ compatibility.

-from typing import Any, Dict, Optional
+from typing import Any, Optional
-def get_task_status(task_id: str) -> Dict[str, Any]:
+def get_task_status(task_id: str) -> dict[str, Any]:

84-90: Consider handling AsyncResult exceptions.

While AsyncResult operations rarely fail, they can raise exceptions for malformed task IDs or connection issues. Consider adding basic error handling to return a more informative status dict.

 def get_task_status(task_id: str) -> dict[str, Any]:
     """
     Get the status of a Celery task.
 
     Args:
         task_id: Celery task ID
 
     Returns:
         Dictionary with task status information
     """
-    result = AsyncResult(task_id)
-    return {
-        "task_id": task_id,
-        "status": result.status,
-        "result": result.result,
-        "info": result.info,
-    }
+    try:
+        result = AsyncResult(task_id)
+        return {
+            "task_id": task_id,
+            "status": result.status,
+            "result": result.result,
+            "info": result.info,
+        }
+    except Exception as e:
+        logger.error(f"Failed to get status for task {task_id}: {e}")
+        return {
+            "task_id": task_id,
+            "status": "ERROR",
+            "result": None,
+            "info": f"Failed to retrieve task status: {e}",
+        }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0757b88 and a742721.

📒 Files selected for processing (4)
  • backend/app/celery/beat.py (1 hunks)
  • backend/app/celery/tasks/job_execution.py (1 hunks)
  • backend/app/celery/utils.py (1 hunks)
  • backend/app/celery/worker.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/celery/worker.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/celery/utils.py (1)
backend/app/celery/tasks/job_execution.py (2)
  • execute_high_priority_task (12-33)
  • execute_low_priority_task (37-58)
🪛 Ruff (0.13.1)
backend/app/celery/utils.py

6-6: typing.Dict is deprecated, use dict instead

(UP035)

backend/app/celery/beat.py

18-18: f-string without any placeholders

Remove extraneous f prefix

(F541)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (8)
backend/app/celery/beat.py (1)

35-36: Fix duplicate start_beat call.

The duplicate call to start_beat was already identified in previous reviews. Line 36 should be removed as the first call at line 35 blocks and the second is never reached.

backend/app/celery/utils.py (2)

34-43: Handle broker connection failures gracefully.

The lack of error handling for broker outages when enqueueing high-priority jobs was already identified. The delay() call should be wrapped in try/except to catch broker connection errors and provide clear feedback rather than failing with obscure attribute access errors later.


62-71: Handle broker connection failures gracefully.

Similar to the high-priority function, this low-priority enqueueing should also be protected against broker outages with appropriate error handling around the delay() call.

backend/app/celery/tasks/job_execution.py (5)

11-11: Add task reliability settings.

The task decorator should include acks_late=True and reject_on_worker_lost=True to ensure tasks are re-queued if workers crash during execution, improving reliability.


36-36: Add task reliability settings.

The low-priority task should mirror the same reliability flags as the high-priority task for consistent behavior across both queues.


88-91: Validate function_path to prevent code injection.

The dynamic import of function_path without validation creates a code execution vulnerability. An allowlist of permitted function prefixes should be enforced, and callability should be verified before execution.


98-104: LGTM: Well-structured task execution.

The standardized parameter passing to the dynamically loaded function is well-designed, providing consistent access to project_id, job_id, task_id, and task_instance for progress updates and retry logic. The **kwargs forwarding allows flexibility for function-specific parameters.


111-116: Excellent error handling and logging.

The exception handling with exc_info=True provides complete stack traces while maintaining context with job ID, task ID, and priority information. Re-raising the exception ensures proper Celery task failure handling.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
backend/app/celery/beat.py (3)

18-18: Remove unnecessary f-string (Ruff F541).

No placeholders present; this will trip the linter.

-    logger.info(f"Starting Celery beat scheduler")
+    logger.info("Starting Celery beat scheduler")

11-11: Annotate return type for clarity.

Explicit None return improves readability and tooling.

-def start_beat(loglevel: str = "info"):
+def start_beat(loglevel: str = "info") -> None:

27-31: Include “critical” in --loglevel choices.

Celery/logging supports it; harmless addition for parity.

-        choices=["debug", "info", "warning", "error"],
+        choices=["debug", "info", "warning", "error", "critical"],
backend/app/celery/utils.py (3)

6-6: Use modern Python type annotation.

Replace deprecated typing.Dict with the built-in dict for forward compatibility.

-from typing import Any, Dict, Optional
+from typing import Any, Optional

74-90: Update type annotation to use modern syntax.

Replace Dict[str, Any] with dict[str, Any] to align with the modern Python type annotation style.

-def get_task_status(task_id: str) -> Dict[str, Any]:
+def get_task_status(task_id: str) -> dict[str, Any]:

84-90: Handle AsyncResult exceptions for missing or invalid tasks.

The AsyncResult constructor and property access can raise exceptions for invalid task IDs or when the result backend is unavailable. Add error handling to return meaningful status information.

-def get_task_status(task_id: str) -> dict[str, Any]:
-    """
-    Get the status of a Celery task.
-
-    Args:
-        task_id: Celery task ID
-
-    Returns:
-        Dictionary with task status information
-    """
-    result = AsyncResult(task_id)
-    return {
-        "task_id": task_id,
-        "status": result.status,
-        "result": result.result,
-        "info": result.info,
-    }
+def get_task_status(task_id: str) -> dict[str, Any]:
+    """
+    Get the status of a Celery task.
+
+    Args:
+        task_id: Celery task ID
+
+    Returns:
+        Dictionary with task status information
+    """
+    try:
+        result = AsyncResult(task_id)
+        return {
+            "task_id": task_id,
+            "status": result.status,
+            "result": result.result,
+            "info": result.info,
+        }
+    except Exception as e:
+        logger.error(f"Failed to get status for task {task_id}: {e}")
+        return {
+            "task_id": task_id,
+            "status": "UNKNOWN",
+            "result": None,
+            "info": f"Error retrieving status: {e}",
+        }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0757b88 and a742721.

📒 Files selected for processing (4)
  • backend/app/celery/beat.py (1 hunks)
  • backend/app/celery/tasks/job_execution.py (1 hunks)
  • backend/app/celery/utils.py (1 hunks)
  • backend/app/celery/worker.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/celery/worker.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/celery/utils.py (1)
backend/app/celery/tasks/job_execution.py (2)
  • execute_high_priority_task (12-33)
  • execute_low_priority_task (37-58)
🪛 Ruff (0.13.1)
backend/app/celery/beat.py

18-18: f-string without any placeholders

Remove extraneous f prefix

(F541)

backend/app/celery/utils.py

6-6: typing.Dict is deprecated, use dict instead

(UP035)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (6)
backend/app/celery/beat.py (1)

35-36: Remove duplicate start_beat call; first call blocks, second is unreachable.

Keeps CLI intent clear and avoids confusion.

-    start_beat(args.loglevel)
-    start_beat(args.loglevel)
+    start_beat(args.loglevel)
backend/app/celery/utils.py (2)

34-43: Handle broker connection failures in task enqueuing.

The call to execute_high_priority_task.delay() can fail if the message broker is unavailable, leading to obscure errors when accessing task.id. Wrap the enqueue operation with proper exception handling.

-    task = execute_high_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        trace_id=trace_id,
-        **kwargs,
-    )
+    try:
+        task = execute_high_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            trace_id=trace_id,
+            **kwargs,
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue high priority job {job_id}: {e}")
+        raise

62-71: Handle broker connection failures for low-priority jobs.

Apply the same protective exception handling for low-priority task enqueuing to ensure consistent error handling across both priority levels.

-    task = execute_low_priority_task.delay(
-        function_path=function_path,
-        project_id=project_id,
-        job_id=job_id,
-        trace_id=trace_id,
-        **kwargs,
-    )
+    try:
+        task = execute_low_priority_task.delay(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=job_id,
+            trace_id=trace_id,
+            **kwargs,
+        )
+    except Exception as e:
+        logger.error(f"Failed to enqueue low priority job {job_id}: {e}")
+        raise
backend/app/celery/tasks/job_execution.py (3)

11-33: Add task reliability options for worker crash resilience.

Configure the task to handle worker crashes gracefully by enabling late acknowledgment and re-queuing on worker loss.

-@celery_app.task(bind=True, queue="high_priority")
+@celery_app.task(bind=True, queue="high_priority", acks_late=True, reject_on_worker_lost=True)

36-58: Apply consistent reliability configuration.

Mirror the same reliability flags for the low-priority task to ensure consistent behavior across both priority levels.

-@celery_app.task(bind=True, queue="low_priority")
+@celery_app.task(bind=True, queue="low_priority", acks_late=True, reject_on_worker_lost=True)

88-104: Strengthen dynamic import security to prevent code injection.

The current dynamic import mechanism can execute arbitrary code if function_path is user-controlled. Implement validation to restrict imports to approved module prefixes and verify callability.

     try:
-        # Dynamically import and resolve the function
-        module_path, function_name = function_path.rsplit(".", 1)
+        # Validate and resolve the function safely
+        if not function_path or "." not in function_path:
+            raise ValueError("Invalid function_path")
+        ALLOWED_FUNCTION_PREFIXES = ("app.core.", "app.services.")
+        if not function_path.startswith(ALLOWED_FUNCTION_PREFIXES):
+            raise ValueError(f"Disallowed function_path: {function_path}")
+        
+        module_path, function_name = function_path.rsplit(".", 1)
         module = importlib.import_module(module_path)
-        execute_function = getattr(module, function_name)
+        execute_function = getattr(module, function_name, None)
+        if not callable(execute_function):
+            raise TypeError(f"Target is not callable: {function_path}")

@AkhileshNegi AkhileshNegi changed the title feat: Integrate Celery with RabbitMQ and Redis for task management Queue: Integrate Celery with RabbitMQ and Redis for task management Sep 22, 2025
@AkhileshNegi AkhileshNegi added the enhancement New feature or request label Sep 22, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
backend/app/celery/beat.py (3)

18-18: Remove unnecessary f-string (Ruff F541).

-    logger.info(f"Starting Celery beat scheduler")
+    logger.info("Starting Celery beat scheduler")

27-32: Make loglevel CLI case-insensitive and include 'critical'.

-    parser.add_argument(
-        "--loglevel",
-        default="info",
-        choices=["debug", "info", "warning", "error"],
-        help="Logging level",
-    )
+    parser.add_argument(
+        "--loglevel",
+        default="info",
+        type=str.lower,
+        choices=["debug", "info", "warning", "error", "critical"],
+        help="Logging level",
+    )

11-17: Add explicit return type for clarity.

-def start_beat(loglevel: str = "info"):
+def start_beat(loglevel: str = "info") -> None:
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a742721 and a256c45.

📒 Files selected for processing (2)
  • backend/app/celery/beat.py (1 hunks)
  • backend/app/celery/worker.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/celery/worker.py
🧰 Additional context used
🪛 Ruff (0.13.1)
backend/app/celery/beat.py

18-18: f-string without any placeholders

Remove extraneous f prefix

(F541)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/celery/beat.py (2)

35-35: Previous duplicate invocation removed — resolved.


20-20: Keep the program name 'celery' in argv for app.start (matches Celery docs).

Celery's app.start expects argv to mimic sys.argv, so use ["celery", "beat", "-l", loglevel]; removing "celery" is incorrect.

Likely an incorrect or invalid review comment.

Once you have the MJML extension installed, you can create a new email template in the `src` directory. After creating the new email template and with the `.mjml` file open in your editor, open the command palette with `Ctrl+Shift+P` and search for `MJML: Export to HTML`. This will convert the `.mjml` file to a `.html` file and now you can save it in the build directory.


## Dev setup (September-2025)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not seem like it is removed though, and the lines added in this readme are already there, curious as to why they are written again

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
.env.example (4)

56-60: Tighten Redis example defaults; prefer container host and a placeholder password.

This avoids leaking a working no‑auth default and aligns with compose service naming.

-REDIS_HOST=localhost
+REDIS_HOST=redis
 REDIS_PORT=6379
 REDIS_DB=0
-REDIS_PASSWORD=
+REDIS_PASSWORD=change_me

Confirm your compose service is named redis, or adjust accordingly.


65-68: Add unit hints for worker memory/time limits to prevent misconfiguration.

Celery’s max-memory-per-child is in kB; limits are in seconds. Add inline comments to make this explicit.

-CELERY_WORKER_MAX_TASKS_PER_CHILD=1000
-CELERY_WORKER_MAX_MEMORY_PER_CHILD=200000
-CELERY_TASK_SOFT_TIME_LIMIT=300
-CELERY_TASK_TIME_LIMIT=600
+# max tasks before child recycle
+CELERY_WORKER_MAX_TASKS_PER_CHILD=1000
+# in kilobytes (200000 ~= ~195 MiB)
+CELERY_WORKER_MAX_MEMORY_PER_CHILD=200000
+# seconds
+CELERY_TASK_SOFT_TIME_LIMIT=300
+# seconds
+CELERY_TASK_TIME_LIMIT=600

71-72: Prefetch of 1 trades throughput for fairness—confirm this is intentional.

Default is higher; 1 can significantly reduce worker throughput on I/O‑bound tasks.


49-75: Satisfy dotenv‑linter ordering warnings (alphabetize keys within blocks).

Non‑functional, but it keeps CI/lint clean. For example: place RABBITMQ_PASSWORD before RABBITMQ_PORT, REDIS_DB before REDIS_HOST, and sort Celery keys alphabetically.

If helpful, I can provide a fully reordered diff once you confirm desired grouping/comments.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a256c45 and fb3f5c8.

📒 Files selected for processing (2)
  • .env.example (1 hunks)
  • backend/README.md (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/README.md
🧰 Additional context used
🪛 dotenv-linter (3.3.0)
.env.example

[warning] 52-52: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)


[warning] 58-58: [UnorderedKey] The REDIS_DB key should go before the REDIS_HOST key

(UnorderedKey)


[warning] 59-59: [UnorderedKey] The REDIS_PASSWORD key should go before the REDIS_PORT key

(UnorderedKey)


[warning] 65-65: [UnorderedKey] The CELERY_WORKER_MAX_MEMORY_PER_CHILD key should go before the CELERY_WORKER_MAX_TASKS_PER_CHILD key

(UnorderedKey)


[warning] 66-66: [UnorderedKey] The CELERY_TASK_SOFT_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 67-67: [UnorderedKey] The CELERY_TASK_TIME_LIMIT key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)


[warning] 68-68: [UnorderedKey] The CELERY_TASK_MAX_RETRIES key should go before the CELERY_TASK_SOFT_TIME_LIMIT key

(UnorderedKey)


[warning] 69-69: [UnorderedKey] The CELERY_TASK_DEFAULT_RETRY_DELAY key should go before the CELERY_TASK_MAX_RETRIES key

(UnorderedKey)


[warning] 70-70: [UnorderedKey] The CELERY_RESULT_EXPIRES key should go before the CELERY_TASK_DEFAULT_RETRY_DELAY key

(UnorderedKey)


[warning] 71-71: [UnorderedKey] The CELERY_BROKER_POOL_LIMIT key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 73-73: [UnorderedKey] The CELERY_ENABLE_UTC key should go before the CELERY_RESULT_EXPIRES key

(UnorderedKey)


[warning] 75-75: [UnorderedKey] The CELERY_TIMEZONE key should go before the CELERY_WORKER_CONCURRENCY key

(UnorderedKey)

🔇 Additional comments (3)
.env.example (3)

62-63: Good clarification on concurrency.

The microcopy now clearly states leaving CELERY_WORKER_CONCURRENCY empty will auto‑detect cores.


49-52: Do not ship RabbitMQ “guest”/localhost defaults; use a non‑guest user and container host.

RabbitMQ’s guest account is restricted to localhost and will fail from other containers; also avoid committing working defaults that promote weak creds.

Apply:

-RABBITMQ_HOST=localhost
-RABBITMQ_PORT=5672
-RABBITMQ_USER=guest
-RABBITMQ_PASSWORD=guest
+RABBITMQ_HOST=rabbitmq
+RABBITMQ_PORT=5672
+RABBITMQ_USER=app
+RABBITMQ_PASSWORD=change_me

Verify docker-compose uses service name rabbitmq and sets RABBITMQ_DEFAULT_USER/RABBITMQ_DEFAULT_PASS to match.


53-53: Encode the default vhost as %2F to avoid malformed AMQP URLs.

/ must be percent‑encoded in the URI path; many clients break with a raw slash.

-RABBITMQ_VHOST=/
+RABBITMQ_VHOST=%2F

@kartpop
Copy link
Collaborator Author

kartpop commented Sep 23, 2025

@nishika26 @AkhileshNegi @avirajsingh7 merging this PR; have removed the dev setup September-2025 from README.md based on consensus. I feel the pyproject.toml and uv.lock files need fixing though, it uses python version 3.11 - should be bumped up to 3.12, our Dockerfile uses this version anyway. I wasn't able to get the project up and running following the README that currently exists.

@kartpop kartpop merged commit 4724a2b into main Sep 23, 2025
2 of 3 checks passed
@kartpop kartpop deleted the feature/celery branch September 23, 2025 01:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants