Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
import logging

from celery import Celery
from celery.signals import worker_process_init
from celery.signals import task_prerun, worker_process_init
from kombu import Exchange, Queue

from app.core.config import settings

logger = logging.getLogger(__name__)


@task_prerun.connect
def log_pool_status(task: "celery.Task", **_: object) -> None: # type: ignore[name-defined]
"""Log DB connection pool state before each task to detect connection leaks.

If checked_out equals pool size right when a task starts, connections
are being held across tasks (likely across LLM API calls) and leaking.
"""
from app.core.db import engine
from sqlalchemy.pool import QueuePool

pool = engine.pool
if isinstance(pool, QueuePool):
logger.info(
f"[pool] task={task.name} checked_out={pool.checkedout()} "
f"size={pool.size()} overflow={pool.overflow()}"
)


@worker_process_init.connect
def warm_llm_modules(**_) -> None:
"""Import LLM service modules in each worker process right after fork.
Expand Down
Loading