Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
b1610f2
feat: Integrate Celery with RabbitMQ and Redis for task management
kartpop Sep 5, 2025
1b061de
chore: Remove OpenAI API key and Langfuse public key from environment…
kartpop Sep 5, 2025
f911ccc
feat: Enhance Celery configuration with auto-detection of worker conc…
kartpop Sep 5, 2025
29a02cd
feat: Add Docker Compose configuration for Redis, RabbitMQ, PostgreSQ…
kartpop Sep 8, 2025
52b0849
feat: Update Docker configuration for backend and celery worker with …
kartpop Sep 8, 2025
7685ee1
refactor: Replace user_id with project_id in high and low priority ta…
kartpop Sep 8, 2025
3516f04
docs: Update README with development setup instructions and Docker usage
kartpop Sep 8, 2025
58865fb
docs: Update README with instructions for setting up virtual environment
kartpop Sep 8, 2025
626eefe
pre commit
avirajsingh7 Sep 18, 2025
0757b88
feat: Add trace_id parameter to high and low priority job functions f…
avirajsingh7 Sep 18, 2025
16ffb97
refactor: Improve logging in Celery beat scheduler and remove redunda…
avirajsingh7 Sep 22, 2025
5d768a4
refactor: Replace print statements with logging in Celery worker script
avirajsingh7 Sep 22, 2025
a742721
precommit
avirajsingh7 Sep 22, 2025
a256c45
refactor: Update queue names in Celery worker and remove unused param…
kartpop Sep 22, 2025
fb3f5c8
docs: Update README and .env.example for clarity on Celery worker con…
kartpop Sep 23, 2025
684e094
docs: Remove redundant line from README regarding MJML email template…
kartpop Sep 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,32 @@ AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=ap-south-1
AWS_S3_BUCKET_PREFIX="bucket-prefix-name"

# RabbitMQ Configuration (Celery Broker)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/

# Redis Configuration (Celery Result Backend)
Comment on lines +48 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid RabbitMQ “guest” user across containers.

RabbitMQ’s “guest” is restricted to localhost. Use a non-guest user and update docker-compose to create it; mirror the creds in RABBITMQ_USER/PASSWORD.

Apply in example env:

-RABBITMQ_USER=guest
-RABBITMQ_PASSWORD=guest
+RABBITMQ_USER=app
+RABBITMQ_PASSWORD=change_me
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# RabbitMQ Configuration (Celery Broker)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
# Redis Configuration (Celery Result Backend)
# RabbitMQ Configuration (Celery Broker)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=app
RABBITMQ_PASSWORD=change_me
RABBITMQ_VHOST=/
# Redis Configuration (Celery Result Backend)
🧰 Tools
🪛 dotenv-linter (3.3.0)

[warning] 52-52: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)

🤖 Prompt for AI Agents
In .env.example around lines 48 to 55, the file currently uses the RabbitMQ
default guest/localhost settings which won’t work across Docker containers;
replace RABBITMQ_USER and RABBITMQ_PASSWORD with placeholders for a non-guest
user (e.g. RABBITMQ_USER=your_rabbit_user,
RABBITMQ_PASSWORD=your_rabbit_password) and update RABBITMQ_HOST to the RabbitMQ
service name used in docker-compose (e.g. rabbitmq) and then update
docker-compose.yml to create that user and password (or use environment vars to
set RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS) so the example env and
compose file match.

Comment on lines +49 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm vhost URL handling (default “/”).

With RABBITMQ_VHOST=/ and the current URL builder, ensure the final AMQP URL encodes the vhost correctly (// or %2F) to avoid malformed broker URLs.


🌐 Web query:

For RabbitMQ AMQP URLs (Celery/kombu), how should the default vhost "/" be represented in the URL path: as a double slash ("//") or as "%2F"?

💡 Result:

Short answer: use %2F (percent-encoded). RabbitMQ's URI spec requires "/" in a vhost name to be percent-encoded as %2F; Celery/Kombu historically accept the shorthand // for the default vhost, but the correct, portable form is amqp://host/%2F. [1][2][3]

Sources:

  • RabbitMQ URI spec (vhost encoding). [1]
  • Celery docs example using 'amqp://...//'. [2]
  • Kombu docs/changelog explaining how the path maps to vhost and that '/' must be encoded. [3]

Use percent-encoded default vhost (%2F)
Change the example default from

RABBITMQ_VHOST=/

to

RABBITMQ_VHOST=%2F

so that the generated AMQP URL becomes amqp://…/%2F, as required by the RabbitMQ URI spec.

🧰 Tools
🪛 dotenv-linter (3.3.0)

[warning] 52-52: [UnorderedKey] The RABBITMQ_PASSWORD key should go before the RABBITMQ_PORT key

(UnorderedKey)

🤖 Prompt for AI Agents
In .env.example around lines 49 to 55, the RABBITMQ_VHOST default uses an
unencoded slash; update the example value to the percent-encoded form (%2F) so
generated AMQP URLs include %2F instead of /, ensuring the URI follows the
RabbitMQ spec; simply replace the RABBITMQ_VHOST entry from "/" to "%2F" in the
file.

REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=

# Celery Configuration
# Leave CELERY_WORKER_CONCURRENCY empty to auto-detect CPU cores, or set to specific number (e.g., number_of_cores * 2)
CELERY_WORKER_CONCURRENCY=
CELERY_WORKER_MAX_TASKS_PER_CHILD=1000
CELERY_WORKER_MAX_MEMORY_PER_CHILD=200000
CELERY_TASK_SOFT_TIME_LIMIT=300
CELERY_TASK_TIME_LIMIT=600
CELERY_TASK_MAX_RETRIES=3
CELERY_TASK_DEFAULT_RETRY_DELAY=60
CELERY_RESULT_EXPIRES=3600
CELERY_BROKER_POOL_LIMIT=10
CELERY_WORKER_PREFETCH_MULTIPLIER=1
CELERY_ENABLE_UTC=true
# India Standard Time (UTC+05:30)
CELERY_TIMEZONE=Asia/Kolkata
5 changes: 4 additions & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ RUN apt-get update && apt-get install -y \
# Install uv package manager
COPY --from=ghcr.io/astral-sh/uv:0.5.11 /uv /uvx /bin/

# Use a different venv path to avoid conflicts with volume mounts
ENV UV_PROJECT_ENVIRONMENT=/opt/venv

# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
ENV PATH="/opt/venv/bin:$PATH"

# Enable bytecode compilation and efficient dependency linking
ENV UV_COMPILE_BYTECODE=1
Expand Down
41 changes: 41 additions & 0 deletions backend/Dockerfile.celery
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Use Python 3.12 base image
FROM python:3.12

# Set environment variables
ENV PYTHONUNBUFFERED=1

# Set working directory
WORKDIR /app/

# Install system dependencies
RUN apt-get update && apt-get install -y curl poppler-utils

# Install uv package manager
COPY --from=ghcr.io/astral-sh/uv:0.5.11 /uv /uvx /bin/

# Use a different venv path to avoid conflicts with volume mounts
ENV UV_PROJECT_ENVIRONMENT=/opt/venv

# Place executables in the environment at the front of the path
ENV PATH="/opt/venv/bin:$PATH"

# Enable bytecode compilation and efficient dependency linking
ENV UV_COMPILE_BYTECODE=1
ENV UV_LINK_MODE=copy

# Copy dependency files
COPY pyproject.toml uv.lock ./

# Install dependencies
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-install-project

# Set Python path
ENV PYTHONPATH=/app

# Copy application files
COPY app /app/app
COPY alembic.ini /app/alembic.ini

# Default command for Celery worker
CMD ["uv", "run", "celery", "-A", "app.celery.celery_app", "worker", "--loglevel=info"]
3 changes: 3 additions & 0 deletions backend/app/celery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .celery_app import celery_app

__all__ = ["celery_app"]
35 changes: 35 additions & 0 deletions backend/app/celery/beat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Celery beat scheduler for cron jobs.
"""
import logging
from celery import Celery
from app.celery.celery_app import celery_app

logger = logging.getLogger(__name__)


def start_beat(loglevel: str = "info"):
"""
Start Celery beat scheduler.

Args:
loglevel: Logging level
"""
logger.info(f"Starting Celery beat scheduler")
# Start the beat scheduler
celery_app.start(["celery", "beat", "-l", loglevel])


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description="Start Celery beat scheduler")
parser.add_argument(
"--loglevel",
default="info",
choices=["debug", "info", "warning", "error"],
help="Logging level",
)

args = parser.parse_args()
start_beat(args.loglevel)
96 changes: 96 additions & 0 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from celery import Celery
from kombu import Queue, Exchange
from app.core.config import settings

# Create Celery instance
celery_app = Celery(
"ai_platform",
broker=settings.RABBITMQ_URL,
backend=settings.REDIS_URL,
include=["app.celery.tasks.job_execution"],
)

# Define exchanges and queues with priority
default_exchange = Exchange("default", type="direct")

# Celery configuration using environment variables
celery_app.conf.update(
# Queue configuration with priority support
task_queues=(
Queue(
"high_priority",
exchange=default_exchange,
routing_key="high",
queue_arguments={"x-max-priority": 10},
),
Queue(
"low_priority",
exchange=default_exchange,
routing_key="low",
queue_arguments={"x-max-priority": 1},
),
Queue("cron", exchange=default_exchange, routing_key="cron"),
Queue("default", exchange=default_exchange, routing_key="default"),
),
# Task routing
task_routes={
"app.celery.tasks.job_execution.execute_high_priority_task": {
"queue": "high_priority",
"priority": 9,
},
"app.celery.tasks.job_execution.execute_low_priority_task": {
"queue": "low_priority",
"priority": 1,
},
"app.celery.tasks.*_cron_*": {"queue": "cron"},
"app.celery.tasks.*": {"queue": "default"},
},
task_default_queue="default",
# Enable priority support
task_inherit_parent_priority=True,
worker_prefetch_multiplier=settings.CELERY_WORKER_PREFETCH_MULTIPLIER,
# Worker configuration from environment
worker_concurrency=settings.COMPUTED_CELERY_WORKER_CONCURRENCY,
worker_max_tasks_per_child=settings.CELERY_WORKER_MAX_TASKS_PER_CHILD,
worker_max_memory_per_child=settings.CELERY_WORKER_MAX_MEMORY_PER_CHILD,
# Security
worker_hijack_root_logger=False,
worker_log_color=False,
# Task execution from environment
task_soft_time_limit=settings.CELERY_TASK_SOFT_TIME_LIMIT,
task_time_limit=settings.CELERY_TASK_TIME_LIMIT,
task_reject_on_worker_lost=True,
task_ignore_result=False,
task_acks_late=True,
# Retry configuration from environment
task_default_retry_delay=settings.CELERY_TASK_DEFAULT_RETRY_DELAY,
task_max_retries=settings.CELERY_TASK_MAX_RETRIES,
# Task configuration from environment
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone=settings.CELERY_TIMEZONE,
enable_utc=settings.CELERY_ENABLE_UTC,
task_track_started=True,
task_always_eager=False,
# Result backend settings from environment
result_expires=settings.CELERY_RESULT_EXPIRES,
result_compression="gzip",
# Monitoring
worker_send_task_events=True,
task_send_sent_event=True,
# Connection settings from environment
broker_connection_retry_on_startup=True,
broker_pool_limit=settings.CELERY_BROKER_POOL_LIMIT,
# Beat configuration (for future cron jobs)
beat_schedule={
# Example cron job (commented out)
# "example-cron": {
# "task": "app.celery.tasks.example_cron_task",
# "schedule": 60.0, # Every 60 seconds
# },
},
)

# Auto-discover tasks
celery_app.autodiscover_tasks()
1 change: 1 addition & 0 deletions backend/app/celery/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__all__ = []
116 changes: 116 additions & 0 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import logging
import importlib
from celery import current_task
from asgi_correlation_id import correlation_id

from app.celery.celery_app import celery_app

logger = logging.getLogger(__name__)


@celery_app.task(bind=True, queue="high_priority")
def execute_high_priority_task(
self,
function_path: str,
project_id: int,
job_id: str,
trace_id: str,
**kwargs,
):
"""
High priority Celery task to execute any job function.
Use this for urgent operations that need immediate processing.

Args:
function_path: Import path to the execute_job function (e.g., "app.core.doctransform.service.execute_job")
project_id: ID of the project executing the job
job_id: ID of the job (should already exist in database)
trace_id: Trace/correlation ID to preserve context across Celery tasks
**kwargs: Additional arguments to pass to the execute_job function
"""
return _execute_job_internal(
self, function_path, project_id, job_id, "high_priority", trace_id, **kwargs
)


@celery_app.task(bind=True, queue="low_priority")
def execute_low_priority_task(
self,
function_path: str,
project_id: int,
job_id: str,
trace_id: str,
**kwargs,
):
"""
Low priority Celery task to execute any job function.
Use this for background operations that can wait.

Args:
function_path: Import path to the execute_job function (e.g., "app.core.doctransform.service.execute_job")
project_id: ID of the project executing the job
job_id: ID of the job (should already exist in database)
trace_id: Trace/correlation ID to preserve context across Celery tasks
**kwargs: Additional arguments to pass to the execute_job function
"""
return _execute_job_internal(
self, function_path, project_id, job_id, "low_priority", trace_id, **kwargs
)


def _execute_job_internal(
task_instance,
function_path: str,
project_id: int,
job_id: str,
priority: str,
trace_id: str,
**kwargs,
):
Comment on lines +61 to +69
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Harden dynamic import to prevent arbitrary code execution.

Validate function_path, restrict to allow‑listed prefixes, and ensure callability before invocation.

 def _execute_job_internal(
@@
-    try:
-        # Dynamically import and resolve the function
-        module_path, function_name = function_path.rsplit(".", 1)
+    try:
+        # Validate and resolve the function safely
+        if not function_path or "." not in function_path:
+            raise ValueError("Invalid function_path")
+        ALLOWED_FUNCTION_PREFIXES = ("app.core.", "app.services.")
+        if not function_path.startswith(ALLOWED_FUNCTION_PREFIXES):
+            raise ValueError(f"Disallowed function_path: {function_path}")
+
+        module_path, function_name = function_path.rsplit(".", 1)
         module = importlib.import_module(module_path)
-        execute_function = getattr(module, function_name)
+        execute_function = getattr(module, function_name, None)
+        if not callable(execute_function):
+            raise TypeError(f"Target is not callable: {function_path}")
@@
-        result = execute_function(
+        result = execute_function(
             project_id=project_id,
             job_id=job_id,
             task_id=task_id,
             task_instance=task_instance,  # For progress updates, retries if needed
             **kwargs,
         )

Also applies to: 88-106

"""
Internal function to execute job logic for both priority levels.

Args:
task_instance: Celery task instance (for progress updates, retries, etc.)
function_path: Import path to the execute_job function
project_id: ID of the project executing the job
job_id: ID of the job (should already exist in database)
priority: Priority level ("high_priority" or "low_priority")
trace_id: Trace/correlation ID to preserve context across Celery tasks
**kwargs: Additional arguments to pass to the execute_job function
"""
task_id = current_task.request.id

correlation_id.set(trace_id)
logger.info(f"Set correlation ID context: {trace_id} for job {job_id}")

try:
# Dynamically import and resolve the function
module_path, function_name = function_path.rsplit(".", 1)
module = importlib.import_module(module_path)
execute_function = getattr(module, function_name)

logger.info(
f"Executing {priority} job {job_id} (task {task_id}) using function {function_path}"
)

# Execute the business logic function with standardized parameters
result = execute_function(
project_id=project_id,
job_id=job_id,
task_id=task_id,
task_instance=task_instance, # For progress updates, retries if needed
**kwargs,
)

logger.info(
f"{priority.capitalize()} job {job_id} (task {task_id}) completed successfully"
)
return result

except Exception as exc:
logger.error(
f"{priority.capitalize()} job {job_id} (task {task_id}) failed: {exc}",
exc_info=True,
)
raise
Loading