From de1b099306c1195473f59a2f5ae87e5f7328b952 Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Mon, 20 Apr 2026 17:53:57 +0530 Subject: [PATCH 1/6] Enhance telemetry and logging for collection and LLM job services - Integrated OpenTelemetry tracing into collection creation and deletion processes to improve observability. - Added logging context for better traceability during job execution. - Refactored job execution methods to include detailed span attributes and error handling. - Updated callback mechanisms to ensure success and failure responses are properly logged and sent. - Improved error handling in LLM job execution, including telemetry for provider calls and response handling. - Updated the lock file to reflect changes in Python version requirements. --- .env.example | 3 + .gitignore | 2 + backend/app/api/deps.py | 26 +- backend/app/api/routes/collections.py | 149 ++-- backend/app/api/routes/llm.py | 148 ++-- backend/app/celery/celery_app.py | 57 +- backend/app/celery/tasks/job_execution.py | 193 +++-- backend/app/celery/utils.py | 127 ++- backend/app/core/config.py | 2 + backend/app/core/db.py | 2 + backend/app/core/langfuse/langfuse.py | 30 +- backend/app/core/logger.py | 60 +- backend/app/core/middleware.py | 67 +- backend/app/core/sentry_filters.py | 46 + backend/app/core/telemetry.py | 559 +++++++++++++ backend/app/main.py | 55 +- .../services/collections/create_collection.py | 276 +++--- .../services/collections/delete_collection.py | 173 ++-- backend/app/services/llm/jobs.py | 786 +++++++++++------- backend/uv.lock | 2 +- 20 files changed, 2008 insertions(+), 755 deletions(-) create mode 100644 backend/app/core/sentry_filters.py create mode 100644 backend/app/core/telemetry.py diff --git a/.env.example b/.env.example index 6d9b21b3f..c05e3e17a 100644 --- a/.env.example +++ b/.env.example @@ -85,6 +85,9 @@ OPENAI_API_KEY="" KAAPI_GUARDRAILS_AUTH="" KAAPI_GUARDRAILS_URL="" +OTEL_ENABLED=true +OTEL_SERVICE_NAME=kaapi-backend + SMTP_HOST= SMTP_PORT= SMTP_TLS=True diff --git a/.gitignore b/.gitignore index ad2127f45..d0aba9b71 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ ENV/ # /backend/app/logs + +/observability/ diff --git a/backend/app/api/deps.py b/backend/app/api/deps.py index a99e0d824..dd37a747a 100644 --- a/backend/app/api/deps.py +++ b/backend/app/api/deps.py @@ -5,6 +5,7 @@ from fastapi import Depends, HTTPException, Request, status from fastapi.security import APIKeyHeader, OAuth2PasswordBearer from jwt.exceptions import ExpiredSignatureError, InvalidTokenError +from opentelemetry import trace from pydantic import ValidationError from sqlmodel import Session @@ -42,6 +43,22 @@ def get_db() -> Generator[Session, None, None]: TokenDep = Annotated[str, Depends(reusable_oauth2)] +def _set_tenant_span_attributes(auth_context: AuthContext) -> None: + """Tag the active OTel span with tenant context so traces in Sentry can be + filtered by user / org / project.""" + span = trace.get_current_span() + if not span.is_recording(): + return + span.set_attribute("user.id", str(auth_context.user.id)) + span.set_attribute("user.email", auth_context.user.email) + if auth_context.organization: + span.set_attribute("tenant.org_id", auth_context.organization.id) + span.set_attribute("tenant.org_name", auth_context.organization.name) + if auth_context.project: + span.set_attribute("tenant.project_id", auth_context.project.id) + span.set_attribute("tenant.project_name", auth_context.project.name) + + def _authenticate_with_jwt(session: Session, token: str) -> AuthContext: """Validate a JWT token and return the authenticated user context.""" try: @@ -147,16 +164,21 @@ def get_auth_context( if not auth_context.project.is_active: raise HTTPException(status_code=403, detail="Inactive Project") + _set_tenant_span_attributes(auth_context) return auth_context # 2. Try Authorization: Bearer header if token: - return _authenticate_with_jwt(session, token) + auth_context = _authenticate_with_jwt(session, token) + _set_tenant_span_attributes(auth_context) + return auth_context # 3. Try access_token cookie cookie_token = request.cookies.get("access_token") if cookie_token: - return _authenticate_with_jwt(session, cookie_token) + auth_context = _authenticate_with_jwt(session, cookie_token) + _set_tenant_span_attributes(auth_context) + return auth_context raise HTTPException(status_code=401, detail="Invalid Authorization format") diff --git a/backend/app/api/routes/collections.py b/backend/app/api/routes/collections.py index 28155c9b5..30718f1f3 100644 --- a/backend/app/api/routes/collections.py +++ b/backend/app/api/routes/collections.py @@ -7,6 +7,7 @@ from app.api.deps import SessionDep, AuthContextDep from app.api.permissions import Permission, require_permission +from app.core.telemetry import log_context from app.crud import ( CollectionCrud, CollectionJobCrud, @@ -89,51 +90,60 @@ def create_collection( current_user: AuthContextDep, request: CreationRequest, ): - if request.callback_url: - validate_callback_url(str(request.callback_url)) - - if request.name: - ensure_unique_name(session, current_user.project_.id, request.name) + with log_context( + tag="collection", + system="collection", + lifecycle="api.collection.create", + action="create", + project_id=current_user.project_.id, + organization_id=current_user.organization_.id, + ): + if request.callback_url: + validate_callback_url(str(request.callback_url)) + + if request.name: + ensure_unique_name(session, current_user.project_.id, request.name) + + unique_documents = list(dict.fromkeys(request.documents)) + + collection_job_crud = CollectionJobCrud(session, current_user.project_.id) + collection_job = collection_job_crud.create( + CollectionJobCreate( + action_type=CollectionActionType.CREATE, + project_id=current_user.project_.id, + status=CollectionJobStatus.PENDING, + docs_num=len(unique_documents), + documents=[str(doc_id) for doc_id in unique_documents], + ) + ) - unique_documents = list(dict.fromkeys(request.documents)) + # True iff both model and instructions were provided in the request body + with_assistant = bool( + getattr(request, "model", None) and getattr(request, "instructions", None) + ) - collection_job_crud = CollectionJobCrud(session, current_user.project_.id) - collection_job = collection_job_crud.create( - CollectionJobCreate( - action_type=CollectionActionType.CREATE, + create_service.start_job( + db=session, + request=request, + collection_job_id=collection_job.id, project_id=current_user.project_.id, - status=CollectionJobStatus.PENDING, - docs_num=len(unique_documents), - documents=[str(doc_id) for doc_id in unique_documents], + organization_id=current_user.organization_.id, + with_assistant=with_assistant, ) - ) - - # True iff both model and instructions were provided in the request body - with_assistant = bool( - getattr(request, "model", None) and getattr(request, "instructions", None) - ) - create_service.start_job( - db=session, - request=request, - collection_job_id=collection_job.id, - project_id=current_user.project_.id, - organization_id=current_user.organization_.id, - with_assistant=with_assistant, - ) - - metadata = None - if not with_assistant: - metadata = { - "note": ( - "This job will create a vector store only (no Assistant). " - "Assistant creation happens when both 'model' and 'instructions' are included." - ) - } - - return APIResponse.success_response( - CollectionJobImmediatePublic.model_validate(collection_job), metadata=metadata - ) + metadata = None + if not with_assistant: + metadata = { + "note": ( + "This job will create a vector store only (no Assistant). " + "Assistant creation happens when both 'model' and 'instructions' are included." + ) + } + + return APIResponse.success_response( + CollectionJobImmediatePublic.model_validate(collection_job), + metadata=metadata, + ) @router.delete( @@ -149,37 +159,46 @@ def delete_collection( collection_id: UUID = FastPath(description="Collection to delete"), request: CallbackRequest | None = Body(default=None), ): - if request and request.callback_url: - validate_callback_url(str(request.callback_url)) - - _ = CollectionCrud(session, current_user.project_.id).read_one(collection_id) - - deletion_request = DeletionRequest( + with log_context( + tag="collection", + system="collection", + lifecycle="api.collection.delete", + action="delete", collection_id=collection_id, - callback_url=request.callback_url if request else None, - ) + project_id=current_user.project_.id, + organization_id=current_user.organization_.id, + ): + if request and request.callback_url: + validate_callback_url(str(request.callback_url)) - collection_job_crud = CollectionJobCrud(session, current_user.project_.id) - collection_job = collection_job_crud.create( - CollectionJobCreate( - action_type=CollectionActionType.DELETE, - project_id=current_user.project_.id, - status=CollectionJobStatus.PENDING, + _ = CollectionCrud(session, current_user.project_.id).read_one(collection_id) + + deletion_request = DeletionRequest( collection_id=collection_id, + callback_url=request.callback_url if request else None, ) - ) - delete_service.start_job( - db=session, - request=deletion_request, - collection_job_id=collection_job.id, - project_id=current_user.project_.id, - organization_id=current_user.organization_.id, - ) + collection_job_crud = CollectionJobCrud(session, current_user.project_.id) + collection_job = collection_job_crud.create( + CollectionJobCreate( + action_type=CollectionActionType.DELETE, + project_id=current_user.project_.id, + status=CollectionJobStatus.PENDING, + collection_id=collection_id, + ) + ) + + delete_service.start_job( + db=session, + request=deletion_request, + collection_job_id=collection_job.id, + project_id=current_user.project_.id, + organization_id=current_user.organization_.id, + ) - return APIResponse.success_response( - CollectionJobImmediatePublic.model_validate(collection_job) - ) + return APIResponse.success_response( + CollectionJobImmediatePublic.model_validate(collection_job) + ) @router.get( diff --git a/backend/app/api/routes/llm.py b/backend/app/api/routes/llm.py index dd091359c..73ba0e4bc 100644 --- a/backend/app/api/routes/llm.py +++ b/backend/app/api/routes/llm.py @@ -2,9 +2,11 @@ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException +from opentelemetry import trace from app.api.deps import AuthContextDep, SessionDep from app.api.permissions import Permission, require_permission +from app.core.telemetry import log_context from app.crud.jobs import JobCrud from app.crud.llm import get_llm_calls_by_job_id from app.models import ( @@ -58,36 +60,52 @@ def llm_call( project_id = _current_user.project_.id organization_id = _current_user.organization_.id - if request.callback_url: - validate_callback_url(str(request.callback_url)) - - job_id = start_job( - db=session, - request=request, + with log_context( + tag="llm-call", + system="llm-call", + lifecycle="api.llm.call", project_id=project_id, organization_id=organization_id, - ) + callback_enabled=request.callback_url is not None, + ): + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("kaapi.organization_id", organization_id) + span.set_attribute("llm.callback_enabled", request.callback_url is not None) + + if request.callback_url: + validate_callback_url(str(request.callback_url)) + + job_id = start_job( + db=session, + request=request, + project_id=project_id, + organization_id=organization_id, + ) - # Fetch job details to return immediate response - job_crud = JobCrud(session=session) - job = job_crud.get(job_id=job_id, project_id=project_id) + if span.is_recording(): + span.set_attribute("llm.job_id", str(job_id)) - if not job: - raise HTTPException(status_code=404, detail="Job not found") + job_crud = JobCrud(session=session) + job = job_crud.get(job_id=job_id, project_id=project_id) - message = "Your response is being generated and will be delivered via callback." - if not request.callback_url: - message = "Your response is being generated" + if not job: + raise HTTPException(status_code=404, detail="Job not found") - job_response = LLMJobImmediatePublic( - job_id=job.id, - status=job.status.value, - message=message, - job_inserted_at=job.created_at, - job_updated_at=job.updated_at, - ) + message = "Your response is being generated and will be delivered via callback." + if not request.callback_url: + message = "Your response is being generated" + + job_response = LLMJobImmediatePublic( + job_id=job.id, + status=job.status.value, + message=message, + job_inserted_at=job.created_at, + job_updated_at=job.updated_at, + ) - return APIResponse.success_response(data=job_response) + return APIResponse.success_response(data=job_response) @router.get( @@ -108,47 +126,55 @@ def get_llm_call_status( project_id = _current_user.project_.id - job_crud = JobCrud(session=session) - job = job_crud.get(job_id=job_id, project_id=project_id) - - if not job: - raise HTTPException(status_code=404, detail="Job not found") - - llm_call_response = None - if job.status.value == JobStatus.SUCCESS: - llm_calls = get_llm_calls_by_job_id( - session=session, job_id=job_id, project_id=project_id - ) - - if llm_calls: - # Get the first LLM call from the list which will be the only call for the job id - # since we initially won't be using this endpoint for llm chains - llm_call = llm_calls[0] - - llm_response = LLMResponse( - provider_response_id=llm_call.provider_response_id or "", - conversation_id=llm_call.conversation_id, - provider=llm_call.provider, - model=llm_call.model, - output=llm_call.content, + with log_context( + tag="llm-call", + system="llm-call", + lifecycle="api.llm.call.status", + job_id=job_id, + project_id=project_id, + organization_id=_current_user.organization_.id, + ): + job_crud = JobCrud(session=session) + job = job_crud.get(job_id=job_id, project_id=project_id) + + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + llm_call_response = None + if job.status.value == JobStatus.SUCCESS: + llm_calls = get_llm_calls_by_job_id( + session=session, job_id=job_id, project_id=project_id ) - if not llm_call.usage: - logger.warning( - f"[get_llm_call] Missing usage data for llm_call job_id={job_id}, project_id={project_id}" + if llm_calls: + # Get the first LLM call from the list which will be the only call for the job id + # since we initially won't be using this endpoint for llm chains + llm_call = llm_calls[0] + + llm_response = LLMResponse( + provider_response_id=llm_call.provider_response_id or "", + conversation_id=llm_call.conversation_id, + provider=llm_call.provider, + model=llm_call.model, + output=llm_call.content, ) - llm_call_response = LLMCallResponse( - response=llm_response, - usage=Usage(**llm_call.usage), - provider_raw_response=None, - ) + if not llm_call.usage: + logger.warning( + f"[get_llm_call] Missing usage data for llm_call job_id={job_id}, project_id={project_id}" + ) - job_response = LLMJobPublic( - job_id=job.id, - status=job.status.value, - llm_response=llm_call_response, - error_message=job.error_message, - ) + llm_call_response = LLMCallResponse( + response=llm_response, + usage=Usage(**llm_call.usage), + provider_raw_response=None, + ) + + job_response = LLMJobPublic( + job_id=job.id, + status=job.status.value, + llm_response=llm_call_response, + error_message=job.error_message, + ) - return APIResponse.success_response(data=job_response) + return APIResponse.success_response(data=job_response) diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index ae5acb224..04c5c95dc 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -1,10 +1,19 @@ import logging from celery import Celery -from celery.signals import task_failure, task_postrun, task_prerun, worker_process_init +from celery.signals import ( + task_failure, + task_postrun, + task_prerun, + worker_process_init, +) from kombu import Exchange, Queue from app.core.config import settings +from app.core.logger import configure_logging +from app.core.sentry_filters import before_send_transaction_filter + +configure_logging(service_name="kaapi-celery") logger = logging.getLogger(__name__) @@ -69,15 +78,49 @@ def log_pool_status_failure( @worker_process_init.connect def warm_llm_modules(**_) -> None: - """Import LLM service modules in each worker process right after fork. + """Initialize each forked Celery worker process. - This runs once per worker before any task arrives, so LLM calls - (the most latency-sensitive path) never pay a cold-import penalty. - The main process is unaffected, keeping overall memory low. + - Initialize Sentry so task transactions, errors, and logs ship to Sentry. + - Set up OpenTelemetry so Celery tasks emit spans bridged into Sentry. + - Pre-import LLM modules so first-call latency doesn't pay a cold-import cost. """ - import app.services.llm.jobs # noqa: F401 + if settings.SENTRY_DSN: + import sentry_sdk + from sentry_sdk.integrations.celery import CeleryIntegration + from sentry_sdk.integrations.httpx import HttpxIntegration + from sentry_sdk.integrations.logging import LoggingIntegration + from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration + + sentry_sdk.init( + dsn=str(settings.SENTRY_DSN), + environment=settings.ENVIRONMENT, + release=settings.API_VERSION, + instrumenter="otel", + traces_sample_rate=1.0, + enable_logs=True, + before_send_transaction=before_send_transaction_filter, + integrations=[ + LoggingIntegration( + level=logging.INFO, + sentry_logs_level=logging.INFO, + ), + ], + disabled_integrations=[ + CeleryIntegration(), + SqlalchemyIntegration(), + HttpxIntegration(), + ], + ) - logger.info("[warm_llm_modules] LLM modules pre-loaded in worker process") + from app.core.telemetry import flush_telemetry, setup_telemetry + + setup_telemetry(service_name="kaapi-celery") + + @task_postrun.connect(weak=False) + def _flush_otel_after_task(**_: object) -> None: + flush_telemetry() + + import app.services.llm.jobs # noqa: F401 # Create Celery instance diff --git a/backend/app/celery/tasks/job_execution.py b/backend/app/celery/tasks/job_execution.py index a1663179d..8dd20091a 100644 --- a/backend/app/celery/tasks/job_execution.py +++ b/backend/app/celery/tasks/job_execution.py @@ -2,6 +2,9 @@ from asgi_correlation_id import correlation_id from celery import current_task +from opentelemetry import context as otel_context +from opentelemetry import trace +from opentelemetry.propagate import extract from app.celery.celery_app import celery_app @@ -13,17 +16,60 @@ def _set_trace(trace_id: str) -> None: logger.info(f"[_set_trace] Set correlation ID: {trace_id}") +def _extract_parent_context(task_instance) -> otel_context.Context: + """Extract OTel parent context from Celery headers if available.""" + headers = getattr(task_instance.request, "headers", None) or {} + carrier: dict[str, str] = {} + + if isinstance(headers, dict): + for key, value in headers.items(): + if isinstance(value, str): + carrier[str(key)] = value + + nested = headers.get("otel", {}) + if isinstance(nested, dict): + for key, value in nested.items(): + if isinstance(value, str): + carrier[str(key)] = value + + return extract(carrier) + + +def _run_with_otel_parent(task_instance, fn): + """Attach extracted parent context and execute function. + + When Celery auto-instrumentation is active, there is already a current + `run/...` span. Re-attaching extracted parent context here would make + service spans become siblings of `run/...` instead of children. + + We only attach extracted context as a fallback when no active span exists. + """ + current_ctx = trace.get_current_span().get_span_context() + if current_ctx and current_ctx.is_valid: + return fn() + + parent_ctx = _extract_parent_context(task_instance) + token = otel_context.attach(parent_ctx) + try: + return fn() + finally: + otel_context.detach(token) + + @celery_app.task(bind=True, queue="high_priority", priority=9) def run_llm_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): from app.services.llm.jobs import execute_job _set_trace(trace_id) - return execute_job( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -32,12 +78,15 @@ def run_llm_chain_job(self, project_id: int, job_id: str, trace_id: str, **kwarg from app.services.llm.jobs import execute_chain_job _set_trace(trace_id) - return execute_chain_job( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_chain_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -46,12 +95,15 @@ def run_response_job(self, project_id: int, job_id: str, trace_id: str, **kwargs from app.services.response.jobs import execute_job _set_trace(trace_id) - return execute_job( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -60,12 +112,15 @@ def run_doctransform_job(self, project_id: int, job_id: str, trace_id: str, **kw from app.services.doctransform.job import execute_job _set_trace(trace_id) - return execute_job( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -76,12 +131,15 @@ def run_create_collection_job( from app.services.collections.create_collection import execute_job _set_trace(trace_id) - return execute_job( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -92,12 +150,15 @@ def run_delete_collection_job( from app.services.collections.delete_collection import execute_job _set_trace(trace_id) - return execute_job( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_job( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -108,12 +169,15 @@ def run_stt_batch_submission( from app.services.stt_evaluations.batch_job import execute_batch_submission _set_trace(trace_id) - return execute_batch_submission( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_batch_submission( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -124,12 +188,15 @@ def run_stt_metric_computation( from app.services.stt_evaluations.metric_job import execute_metric_computation _set_trace(trace_id) - return execute_metric_computation( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_metric_computation( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -140,12 +207,15 @@ def run_tts_batch_submission( from app.services.tts_evaluations.batch_job import execute_batch_submission _set_trace(trace_id) - return execute_batch_submission( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_batch_submission( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) @@ -158,10 +228,13 @@ def run_tts_result_processing( ) _set_trace(trace_id) - return execute_tts_result_processing( - project_id=project_id, - job_id=job_id, - task_id=current_task.request.id, - task_instance=self, - **kwargs, + return _run_with_otel_parent( + self, + lambda: execute_tts_result_processing( + project_id=project_id, + job_id=job_id, + task_id=current_task.request.id, + task_instance=self, + **kwargs, + ), ) diff --git a/backend/app/celery/utils.py b/backend/app/celery/utils.py index e4b2a2e3f..5ebbf624a 100644 --- a/backend/app/celery/utils.py +++ b/backend/app/celery/utils.py @@ -6,20 +6,31 @@ from typing import Any, Dict from celery.result import AsyncResult +from opentelemetry.propagate import inject from app.celery.celery_app import celery_app logger = logging.getLogger(__name__) +def _enqueue_with_trace_context(task, **kwargs) -> str: + """Publish Celery task with explicit trace context headers.""" + otel_headers: dict[str, str] = {} + inject(otel_headers) + celery_headers = dict(otel_headers) + celery_headers["otel"] = otel_headers + async_result = task.apply_async(kwargs=kwargs, headers=celery_headers) + return async_result.id + + def start_llm_job(project_id: int, job_id: str, trace_id: str = "N/A", **kwargs) -> str: from app.celery.tasks.job_execution import run_llm_job - task = run_llm_job.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_llm_job, project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs ) - logger.info(f"[start_llm_job] Started job {job_id} with Celery task {task.id}") - return task.id + logger.info(f"[start_llm_job] Started job {job_id} with Celery task {task_id}") + return task_id def start_llm_chain_job( @@ -27,13 +38,17 @@ def start_llm_chain_job( ) -> str: from app.celery.tasks.job_execution import run_llm_chain_job - task = run_llm_chain_job.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_llm_chain_job, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_llm_chain_job] Started job {job_id} with Celery task {task.id}" + f"[start_llm_chain_job] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def start_response_job( @@ -41,11 +56,15 @@ def start_response_job( ) -> str: from app.celery.tasks.job_execution import run_response_job - task = run_response_job.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_response_job, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) - logger.info(f"[start_response_job] Started job {job_id} with Celery task {task.id}") - return task.id + logger.info(f"[start_response_job] Started job {job_id} with Celery task {task_id}") + return task_id def start_doctransform_job( @@ -53,13 +72,17 @@ def start_doctransform_job( ) -> str: from app.celery.tasks.job_execution import run_doctransform_job - task = run_doctransform_job.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_doctransform_job, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_doctransform_job] Started job {job_id} with Celery task {task.id}" + f"[start_doctransform_job] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def start_create_collection_job( @@ -67,13 +90,17 @@ def start_create_collection_job( ) -> str: from app.celery.tasks.job_execution import run_create_collection_job - task = run_create_collection_job.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_create_collection_job, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_create_collection_job] Started job {job_id} with Celery task {task.id}" + f"[start_create_collection_job] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def start_delete_collection_job( @@ -81,13 +108,17 @@ def start_delete_collection_job( ) -> str: from app.celery.tasks.job_execution import run_delete_collection_job - task = run_delete_collection_job.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_delete_collection_job, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_delete_collection_job] Started job {job_id} with Celery task {task.id}" + f"[start_delete_collection_job] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def start_stt_batch_submission( @@ -95,13 +126,17 @@ def start_stt_batch_submission( ) -> str: from app.celery.tasks.job_execution import run_stt_batch_submission - task = run_stt_batch_submission.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_stt_batch_submission, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_stt_batch_submission] Started job {job_id} with Celery task {task.id}" + f"[start_stt_batch_submission] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def start_stt_metric_computation( @@ -109,13 +144,17 @@ def start_stt_metric_computation( ) -> str: from app.celery.tasks.job_execution import run_stt_metric_computation - task = run_stt_metric_computation.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_stt_metric_computation, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_stt_metric_computation] Started job {job_id} with Celery task {task.id}" + f"[start_stt_metric_computation] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def start_tts_batch_submission( @@ -123,13 +162,17 @@ def start_tts_batch_submission( ) -> str: from app.celery.tasks.job_execution import run_tts_batch_submission - task = run_tts_batch_submission.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_tts_batch_submission, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_tts_batch_submission] Started job {job_id} with Celery task {task.id}" + f"[start_tts_batch_submission] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def start_tts_result_processing( @@ -137,13 +180,17 @@ def start_tts_result_processing( ) -> str: from app.celery.tasks.job_execution import run_tts_result_processing - task = run_tts_result_processing.delay( - project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs + task_id = _enqueue_with_trace_context( + run_tts_result_processing, + project_id=project_id, + job_id=job_id, + trace_id=trace_id, + **kwargs, ) logger.info( - f"[start_tts_result_processing] Started job {job_id} with Celery task {task.id}" + f"[start_tts_result_processing] Started job {job_id} with Celery task {task_id}" ) - return task.id + return task_id def get_task_status(task_id: str) -> Dict[str, Any]: diff --git a/backend/app/core/config.py b/backend/app/core/config.py index ec0692997..ee72442f8 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -135,6 +135,8 @@ def AWS_S3_BUCKET(self) -> str: return f"{self.AWS_S3_BUCKET_PREFIX}-{self.ENVIRONMENT}" LOG_DIR: str = os.path.join(os.path.dirname(os.path.dirname(__file__)), "logs") + OTEL_ENABLED: bool = False + OTEL_SERVICE_NAME: str = "kaapi-backend" # Celery Configuration CELERY_WORKER_CONCURRENCY: int | None = None diff --git a/backend/app/core/db.py b/backend/app/core/db.py index 91b0966ac..9f81df5a4 100644 --- a/backend/app/core/db.py +++ b/backend/app/core/db.py @@ -1,6 +1,7 @@ from sqlmodel import Session, create_engine, select from app import crud +from app.core.telemetry import instrument_db_engine from app.models import User, UserCreate @@ -32,6 +33,7 @@ def get_engine(): # Create a default engine for backward compatibility engine = get_engine() +instrument_db_engine(engine) # make sure all SQLModel models are imported (app.models) before initializing DB diff --git a/backend/app/core/langfuse/langfuse.py b/backend/app/core/langfuse/langfuse.py index a8a45f65b..49963a61a 100644 --- a/backend/app/core/langfuse/langfuse.py +++ b/backend/app/core/langfuse/langfuse.py @@ -1,15 +1,15 @@ import uuid import logging -from typing import Any, Callable, Dict, Optional from functools import wraps +from typing import Any, Callable, Optional from asgi_correlation_id import correlation_id from langfuse import Langfuse from langfuse.client import StatefulGenerationClient, StatefulTraceClient from app.models.llm import ( + LLMCallResponse, NativeCompletionConfig, QueryParams, - LLMCallResponse, TextOutput, AudioOutput, ) @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -def extract_output_value( +def extract_response_output( llm_output: TextOutput | AudioOutput | None, ) -> str | dict[str, Any]: """Extract output value from LLM output for logging/tracing. @@ -41,8 +41,7 @@ def extract_output_value( "mime_type": llm_output.content.mime_type, "length": len(llm_output.content.value), } - else: - return str(llm_output) + return str(llm_output) class LangfuseTracer: @@ -107,8 +106,8 @@ def _langfuse_call(self, fn: Callable, *args: Any, **kwargs: Any) -> Any: def start_trace( self, name: str, - input: Dict[str, Any], - metadata: Optional[Dict[str, Any]] = None, + input: Any, + metadata: Optional[dict[str, Any]] = None, tags: list[str] | None = None, ) -> None: if self._failed or not self.langfuse: @@ -127,8 +126,8 @@ def start_trace( def start_generation( self, name: str, - input: Dict[str, Any], - metadata: Optional[Dict[str, Any]] = None, + input: Any, + metadata: Optional[dict[str, Any]] = None, ) -> None: if self._failed or not self.trace: return @@ -142,8 +141,8 @@ def start_generation( def end_generation( self, - output: Dict[str, Any], - usage: Optional[Dict[str, Any]] = None, + output: dict[str, Any], + usage: Optional[dict[str, Any]] = None, model: Optional[str] = None, ) -> None: if self._failed or not self.generation: @@ -152,7 +151,7 @@ def end_generation( self.generation.end, output=output, usage=usage, model=model ) - def update_trace(self, tags: list[str], output: Dict[str, Any]) -> None: + def update_trace(self, tags: list[str], output: dict[str, Any]) -> None: if self._failed or not self.trace: return self._langfuse_call(self.trace.update, tags=tags, output=output) @@ -197,7 +196,6 @@ def decorator(func: Callable) -> Callable: def wrapper( completion_config: NativeCompletionConfig, query: QueryParams, **kwargs ): - # Skip observability if no credentials provided if not credentials: logger.info( "[observe_llm_execution] No credentials - skipping observability" @@ -222,7 +220,6 @@ def wrapper( failed = False def langfuse_call(fn, *args, **kwargs): - """Execute Langfuse operation safely. First failure disables further calls.""" nonlocal failed if failed: return None @@ -251,7 +248,6 @@ def langfuse_call(fn, *args, **kwargs): model=completion_config.params.get("model"), ) - # Execute the actual LLM call response: LLMCallResponse | None error: str | None response, error = func(completion_config, query, **kwargs) @@ -262,7 +258,7 @@ def langfuse_call(fn, *args, **kwargs): generation.end, output={ "status": "success", - "output": extract_output_value(response.response.output), + "output": extract_response_output(response.response.output), }, usage_details={ "input": response.usage.input_tokens, @@ -275,7 +271,7 @@ def langfuse_call(fn, *args, **kwargs): trace.update, output={ "status": "success", - "output": extract_output_value(response.response.output), + "output": extract_response_output(response.response.output), }, session_id=session_id or response.response.conversation_id, ) diff --git a/backend/app/core/logger.py b/backend/app/core/logger.py index ff1470354..fd2abb3dd 100644 --- a/backend/app/core/logger.py +++ b/backend/app/core/logger.py @@ -11,7 +11,8 @@ LOGGING_LEVEL = logging.INFO LOGGING_FORMAT = ( - "%(asctime)s - [%(correlation_id)s] - %(levelname)s - %(name)s - %(message)s" + "%(asctime)s - [%(service_name)s] - [%(correlation_id)s] - " + "%(levelname)s - %(name)s - %(message)s" ) @@ -21,26 +22,47 @@ def filter(self, record: logging.LogRecord) -> bool: return True -# Suppress info logs from LiteLLM +class ServiceNameFilter(logging.Filter): + def __init__(self, service_name: str) -> None: + super().__init__() + self._service_name = service_name + + def filter(self, record: logging.LogRecord) -> bool: + record.service_name = self._service_name + return True + + logging.getLogger("LiteLLM").setLevel(logging.WARNING) +logging.getLogger("opentelemetry").setLevel(logging.WARNING) +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("celery.worker.consumer.connection").setLevel(logging.WARNING) +logging.getLogger("celery.worker.consumer.mingle").setLevel(logging.WARNING) +logging.getLogger("celery.apps.worker").setLevel(logging.WARNING) -# Create root logger -logger = logging.getLogger() -logger.setLevel(LOGGING_LEVEL) -# Formatter -formatter = logging.Formatter(LOGGING_FORMAT) +def configure_logging(service_name: str | None = None) -> None: + root_logger = logging.getLogger() + if getattr(root_logger, "_kaapi_logging_configured", False): + return -# Stream handler (console) -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(formatter) -stream_handler.addFilter(CorrelationIdFilter()) -logger.addHandler(stream_handler) + root_logger.setLevel(LOGGING_LEVEL) -# Rotating file handler -file_handler = RotatingFileHandler( - LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5 -) -file_handler.setFormatter(formatter) -file_handler.addFilter(CorrelationIdFilter()) -logger.addHandler(file_handler) + formatter = logging.Formatter(LOGGING_FORMAT) + resolved_service_name = service_name or settings.OTEL_SERVICE_NAME + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + stream_handler.addFilter(CorrelationIdFilter()) + stream_handler.addFilter(ServiceNameFilter(resolved_service_name)) + + file_handler = RotatingFileHandler( + LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5 + ) + file_handler.setFormatter(formatter) + file_handler.addFilter(CorrelationIdFilter()) + file_handler.addFilter(ServiceNameFilter(resolved_service_name)) + + root_logger.handlers.clear() + root_logger.addHandler(stream_handler) + root_logger.addHandler(file_handler) + root_logger._kaapi_logging_configured = True diff --git a/backend/app/core/middleware.py b/backend/app/core/middleware.py index 4ad3d12e3..4a985e267 100644 --- a/backend/app/core/middleware.py +++ b/backend/app/core/middleware.py @@ -1,22 +1,77 @@ import logging import time + +import sentry_sdk from fastapi import Request, Response +from opentelemetry import trace logger = logging.getLogger("http_request_logger") async def http_request_logger(request: Request, call_next) -> Response: start_time = time.time() + method = request.method + route = request.url.path + + # Set request-level dimensions as early as possible. + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("http.request.method", method) + span.set_attribute("http.request_method", method) + span.set_attribute("http.method", method) + span.set_attribute("http.route", route) + + if sentry_sdk.get_client().is_active(): + sentry_sdk.set_tag("http.method", method) + sentry_sdk.set_tag("http.request.method", method) + sentry_sdk.set_tag("http.route", route) + try: response = await call_next(request) - except Exception as e: + except Exception: + # Capture status for failed requests too so dashboards stay consistent. + status = 500 + if span.is_recording(): + span.set_attribute("http.status_code", status) + span.set_attribute("http.response.status_code", status) + if sentry_sdk.get_client().is_active(): + sentry_sdk.set_tag("http.status_code", str(status)) + sentry_sdk.set_tag("http.response.status_code", str(status)) logger.exception("Unhandled exception during request") raise - process_time = (time.time() - start_time) * 1000 # ms - client_ip = request.client.host if request.client else "unknown" + duration_ms = (time.time() - start_time) * 1000 + status = response.status_code + + if span.is_recording(): + span.set_attribute("http.status_code", status) + span.set_attribute("http.response.status_code", status) + span.set_attribute("http.request.duration_ms", round(duration_ms, 2)) + + logger.info(f"{method} {route} - {status} [{duration_ms:.2f}ms]") + + try: + if sentry_sdk.get_client().is_active(): + sentry_sdk.set_tag("http.status_code", str(status)) + sentry_sdk.set_tag("http.response.status_code", str(status)) + + attrs = { + "http.method": method, + "http.route": route, + "http.status_code": str(status), + } + sentry_sdk.metrics.count("http.server.request.count", 1, attributes=attrs) + sentry_sdk.metrics.distribution( + "http.server.request.duration", + duration_ms, + unit="millisecond", + attributes=attrs, + ) + if status >= 400: + sentry_sdk.metrics.count( + "http.server.request.error", 1, attributes=attrs + ) + except Exception: + logger.debug("[http_request_logger] Sentry metric emit failed") - logger.info( - f"{request.method} {request.url.path} - {response.status_code} [{process_time:.2f}ms]" - ) return response diff --git a/backend/app/core/sentry_filters.py b/backend/app/core/sentry_filters.py new file mode 100644 index 000000000..07c6b9ef3 --- /dev/null +++ b/backend/app/core/sentry_filters.py @@ -0,0 +1,46 @@ +import re +from typing import Any + + +_SQL_OR_CONNECT = re.compile(r"^(select|insert|update|delete|connect)\b", re.IGNORECASE) +_HTTP_SEND_RECEIVE = re.compile(r"http (send|receive)$", re.IGNORECASE) +_DB_QUERY_SPAN = re.compile(r"^db\.query$", re.IGNORECASE) + + +def before_send_transaction_filter( + event: dict[str, Any], hint: dict[str, Any] +) -> dict[str, Any] | None: + """Drop low-signal spans before they ship to Sentry. + + Filters out: + - ASGI lifecycle spans ending with `http send` / `http receive` + - DB spans carrying `db.system` + - SQL / `connect` spans matched by description prefix + - Custom DB query spans (`db.query`) + """ + spans = event.get("spans") + if not isinstance(spans, list): + return event + + filtered: list[dict[str, Any]] = [] + for span in spans: + if not isinstance(span, dict): + continue + + data = span.get("data") if isinstance(span.get("data"), dict) else {} + desc = str(span.get("description") or span.get("name") or "").strip() + op = str(span.get("op") or "").strip() + + if _HTTP_SEND_RECEIVE.search(desc): + continue + if _DB_QUERY_SPAN.search(desc) or _DB_QUERY_SPAN.search(op): + continue + if data.get("db.system") is not None: + continue + if _SQL_OR_CONNECT.match(desc): + continue + + filtered.append(span) + + event["spans"] = filtered + return event diff --git a/backend/app/core/telemetry.py b/backend/app/core/telemetry.py new file mode 100644 index 000000000..26246c52d --- /dev/null +++ b/backend/app/core/telemetry.py @@ -0,0 +1,559 @@ +import json +import logging +import os +import time +from contextlib import contextmanager +from contextvars import ContextVar +from threading import Lock +from typing import TYPE_CHECKING, Any, Iterator + +import sentry_sdk +from opentelemetry import context as otel_context +from opentelemetry import trace +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from opentelemetry.instrumentation.requests import RequestsInstrumentor +from opentelemetry.instrumentation.utils import _SUPPRESS_HTTP_INSTRUMENTATION_KEY +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider + +from app.core.config import settings + +if TYPE_CHECKING: + from app.models.llm.response import LLMCallResponse + +logger = logging.getLogger(__name__) + +_log_context_var: ContextVar[dict[str, str] | None] = ContextVar( + "kaapi_log_context", default=None +) + +_celery_metrics_lock = Lock() +_celery_active_tasks = 0 + + +def _emit_sentry_metric( + metric_type: str, + name: str, + value: float, + *, + unit: str | None = None, + attributes: dict[str, str | int | float] | None = None, +) -> None: + """Best-effort Sentry metric emission. No-op if SDK is not active.""" + try: + if not sentry_sdk.get_client().is_active(): + return + if metric_type == "count": + sentry_sdk.metrics.count( + name=name, value=value, unit=unit, attributes=attributes + ) + elif metric_type == "gauge": + sentry_sdk.metrics.gauge( + name=name, value=value, unit=unit, attributes=attributes + ) + elif metric_type == "distribution": + sentry_sdk.metrics.distribution( + name=name, value=value, unit=unit, attributes=attributes + ) + except Exception: + logger.debug("[_emit_sentry_metric] Failed to emit %s (%s)", name, metric_type) + + +@contextmanager +def log_context( + *, tag: str | None = None, **fields: str | int | float | bool | None +) -> Iterator[None]: + """Attach structured log context for the current execution scope. + + Example: + with log_context(tag="llm-call", job_id=job_id): + logger.info("...") + """ + current = _log_context_var.get() or {} + payload = dict(current) + if tag: + payload["tag"] = tag + payload.setdefault("system", tag) + for key, value in fields.items(): + if value is None: + continue + payload[key] = str(value) + token = _log_context_var.set(payload) + try: + yield + finally: + _log_context_var.reset(token) + + +class LogContextFilter(logging.Filter): + """Attach structured context fields from `log_context(...)` to LogRecords.""" + + _LLM_CALL_PREFIXES = ( + "app.services.llm", + "app.api.routes.llm", + "app.crud.llm", + ) + _COLLECTION_PREFIXES = ( + "app.services.collections", + "app.api.routes.collections", + "app.crud.collection", + ) + + def filter(self, record: logging.LogRecord) -> bool: + context_payload = _log_context_var.get() + has_explicit_tag = False + if context_payload: + for key, value in context_payload.items(): + setattr(record, key, value) + has_explicit_tag = bool(context_payload.get("tag")) + + if not has_explicit_tag: + logger_name = record.name or "" + if logger_name.startswith(self._LLM_CALL_PREFIXES): + record.tag = "llm-call" + if not hasattr(record, "system"): + record.system = "llm-call" + elif logger_name.startswith(self._COLLECTION_PREFIXES): + record.tag = "collection" + if not hasattr(record, "system"): + record.system = "collection" + + if not hasattr(record, "lifecycle"): + span = trace.get_current_span() + if span is not None and span.is_recording(): + span_name = getattr(span, "name", None) + if isinstance(span_name, str) and span_name: + record.lifecycle = span_name + return True + + +def _build_resource(service_name: str | None = None) -> Resource: + return Resource.create( + { + SERVICE_NAME: service_name or settings.OTEL_SERVICE_NAME, + "deployment.environment": settings.ENVIRONMENT, + "service.version": settings.API_VERSION, + } + ) + + +def setup_telemetry(service_name: str | None = None) -> None: + """Initialize OpenTelemetry tracing and bridge spans into Sentry. + + Sentry is the single sink: + - Traces: OTel TracerProvider -> SentrySpanProcessor -> Sentry + - Logs: stdlib logging -> Sentry LoggingIntegration (configured at + sentry_sdk.init time; see app/main.py and celery_app.py) + - Metrics: code calls _emit_sentry_metric / sentry_sdk.metrics.* directly + + Args: + service_name: Override OTEL_SERVICE_NAME (e.g. "kaapi-celery" in workers). + """ + root_logger = logging.getLogger() + log_context_filter = LogContextFilter() + if not any(isinstance(f, LogContextFilter) for f in root_logger.filters): + root_logger.addFilter(log_context_filter) + for handler in root_logger.handlers: + if not any(isinstance(f, LogContextFilter) for f in handler.filters): + handler.addFilter(log_context_filter) + + if not settings.OTEL_ENABLED: + logger.info("[setup_telemetry] OTEL_ENABLED is False, skipping") + return + + resource = _build_resource(service_name) + tracer_provider = TracerProvider(resource=resource) + + # Bridge OTel spans into Sentry as Sentry transactions and spans, with full attribute and error capture. + if settings.SENTRY_DSN: + from sentry_sdk.integrations.opentelemetry import SentrySpanProcessor + + tracer_provider.add_span_processor(SentrySpanProcessor()) + + trace.set_tracer_provider(tracer_provider) + + # Auto-instrumentation — generates OTel spans the SentrySpanProcessor forwards + LoggingInstrumentor().instrument(set_logging_format=False) + CeleryInstrumentor().instrument() + HTTPXClientInstrumentor().instrument() + RequestsInstrumentor().instrument() + + logger.debug( + "[setup_telemetry] OpenTelemetry initialized (service=%s, sink=Sentry)", + service_name or settings.OTEL_SERVICE_NAME, + ) + + +def _extract_task_meta(task: object | None) -> tuple[str, str]: + task_name = getattr(task, "name", "unknown") if task is not None else "unknown" + task_queue = "unknown" + request = getattr(task, "request", None) if task is not None else None + delivery_info = getattr(request, "delivery_info", None) + if isinstance(delivery_info, dict) and delivery_info.get("routing_key"): + task_queue = str(delivery_info.get("routing_key")) + return str(task_name), task_queue + + +def _emit_celery_worker_gauges(active: int, pid: str) -> None: + pid_attrs = {"worker.pid": pid} + _emit_sentry_metric( + "gauge", "celery.worker.active", 1 if active > 0 else 0, attributes=pid_attrs + ) + _emit_sentry_metric( + "gauge", "celery.worker.idle", 0 if active > 0 else 1, attributes=pid_attrs + ) + + +def record_celery_task_started(task: object | None) -> None: + """Emit Celery task-start metrics to Sentry.""" + global _celery_active_tasks + if not settings.OTEL_ENABLED: + return + + task_name, task_queue = _extract_task_meta(task) + pid = str(os.getpid()) + attrs = {"task.name": task_name, "task.queue": task_queue, "worker.pid": pid} + + with _celery_metrics_lock: + _celery_active_tasks += 1 + active = _celery_active_tasks + + _emit_sentry_metric("count", "celery.task.total", 1, attributes=attrs) + _emit_sentry_metric("gauge", "celery.task.active", active, attributes=attrs) + _emit_celery_worker_gauges(active, pid) + + +def record_celery_task_finished(task: object | None, state: str | None) -> None: + """Emit Celery task-completion metrics to Sentry.""" + global _celery_active_tasks + if not settings.OTEL_ENABLED: + return + + task_name, task_queue = _extract_task_meta(task) + task_state = (state or "UNKNOWN").upper() + pid = str(os.getpid()) + attrs = { + "task.name": task_name, + "task.queue": task_queue, + "task.state": task_state, + "worker.pid": pid, + } + + with _celery_metrics_lock: + _celery_active_tasks = max(0, _celery_active_tasks - 1) + active = _celery_active_tasks + + if task_state == "SUCCESS": + _emit_sentry_metric("count", "celery.task.completed", 1, attributes=attrs) + elif task_state in {"FAILURE", "REVOKED"}: + _emit_sentry_metric("count", "celery.task.failed", 1, attributes=attrs) + + _emit_sentry_metric("gauge", "celery.task.active", active, attributes=attrs) + _emit_celery_worker_gauges(active, pid) + + +def _llm_call_attrs( + provider: str, + model: str, + operation: str, + organization_id: int | None, + project_id: int | None, +) -> dict[str, str]: + attrs: dict[str, str] = { + "gen_ai.system": provider, + "gen_ai.request.model": model, + "gen_ai.operation.name": operation, + } + if organization_id is not None: + attrs["kaapi.organization_id"] = str(organization_id) + if project_id is not None: + attrs["kaapi.project_id"] = str(project_id) + return attrs + + +def record_llm_call_started( + provider: str, + model: str, + operation: str, + organization_id: int | None = None, + project_id: int | None = None, +) -> None: + """Emit LLM call-start metric to Sentry.""" + if not settings.OTEL_ENABLED: + return + attrs = _llm_call_attrs(provider, model, operation, organization_id, project_id) + _emit_sentry_metric("count", "llm.call.total", 1, attributes=attrs) + + +def record_llm_call_finished( + provider: str, + model: str, + operation: str, + duration_ms: float, + input_tokens: int | None = None, + output_tokens: int | None = None, + total_tokens: int | None = None, + error: bool = False, + organization_id: int | None = None, + project_id: int | None = None, +) -> None: + """Emit LLM call-completion metrics (latency, tokens, errors) to Sentry.""" + if not settings.OTEL_ENABLED: + return + attrs = _llm_call_attrs(provider, model, operation, organization_id, project_id) + + _emit_sentry_metric( + "distribution", + "llm.call.duration", + duration_ms, + unit="millisecond", + attributes=attrs, + ) + if error: + _emit_sentry_metric("count", "llm.call.errors", 1, attributes=attrs) + if input_tokens is not None: + _emit_sentry_metric("count", "llm.tokens.input", input_tokens, attributes=attrs) + if output_tokens is not None: + _emit_sentry_metric( + "count", "llm.tokens.output", output_tokens, attributes=attrs + ) + if total_tokens is not None: + _emit_sentry_metric("count", "llm.tokens.total", total_tokens, attributes=attrs) + + +def set_gen_ai_request_attributes( + span: trace.Span, + *, + provider: str, + model: str, + operation: str, + organization_id: int | None, + project_id: int | None, + params: dict[str, Any] | None = None, +) -> None: + """Set OTel GenAI request attributes on `span` (semantic-convention keys + kaapi ids).""" + span.set_attribute("gen_ai.system", provider) + span.set_attribute("gen_ai.provider.name", provider) + span.set_attribute("gen_ai.operation.name", operation) + if model: + span.set_attribute("gen_ai.request.model", model) + if organization_id is not None: + span.set_attribute("kaapi.organization_id", organization_id) + span.set_attribute("gen_ai.request.organization_id", organization_id) + if project_id is not None: + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("gen_ai.request.project_id", project_id) + + params = params or {} + for attr_key, param_key in ( + ("gen_ai.request.temperature", "temperature"), + ("gen_ai.request.max_tokens", "max_tokens"), + ("gen_ai.request.top_p", "top_p"), + ("gen_ai.request.presence_penalty", "presence_penalty"), + ("gen_ai.request.frequency_penalty", "frequency_penalty"), + ): + if param_key in params: + span.set_attribute(attr_key, params.get(param_key)) + + tools = params.get("tools") + if tools is not None: + span.set_attribute("gen_ai.request.available_tools", json.dumps(tools)) + + +def set_gen_ai_response_attributes( + span: trace.Span, *, response: "LLMCallResponse" +) -> None: + """Set OTel GenAI response attributes (usage, model) on `span`.""" + usage = response.usage + if usage: + span.set_attribute("gen_ai.usage.input_tokens", usage.input_tokens) + span.set_attribute("gen_ai.usage.output_tokens", usage.output_tokens) + span.set_attribute("gen_ai.usage.total_tokens", usage.total_tokens) + if getattr(usage, "reasoning_tokens", None) is not None: + span.set_attribute( + "gen_ai.usage.output_tokens.reasoning", usage.reasoning_tokens + ) + + if response.response and response.response.model: + span.set_attribute("gen_ai.response.model", response.response.model) + + +@contextmanager +def suppress_http_instrumentation() -> Iterator[None]: + """Suppress OTel HTTP client auto-instrumentation for the wrapped block. + + Used around LLM provider calls so the outbound HTTP call does not emit a + redundant child span — the LLM-level span already carries the trace. + """ + token = otel_context.attach( + otel_context.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True) + ) + try: + yield + finally: + otel_context.detach(token) + + +def record_db_query_finished( + *, + duration_ms: float, + operation: str | None = None, + error: bool = False, +) -> None: + """Emit DB query metrics to Sentry.""" + if not settings.OTEL_ENABLED: + return + + attrs: dict[str, str] = {} + if operation: + attrs["db.operation"] = operation + + _emit_sentry_metric("count", "db.query.total", 1, attributes=attrs) + _emit_sentry_metric( + "distribution", + "db.query.duration", + duration_ms, + unit="millisecond", + attributes=attrs, + ) + if error: + _emit_sentry_metric("count", "db.query.failed", 1, attributes=attrs) + + +def record_db_pool_stats( + *, + active: int, + idle: int, + total: int, + overflow: int, +) -> None: + """Emit SQLAlchemy pool stats as Sentry gauges.""" + if not settings.OTEL_ENABLED: + return + + _emit_sentry_metric("gauge", "db.pool.active", active) + _emit_sentry_metric("gauge", "db.pool.idle", idle) + _emit_sentry_metric("gauge", "db.pool.total", total) + _emit_sentry_metric("gauge", "db.pool.overflow", overflow) + + +def flush_telemetry(timeout_millis: int = 10000) -> None: + """Force-flush OTel spans into Sentry, then flush Sentry's transport. + + Called from Celery task_postrun: workers can recycle via max_tasks_per_child + before the SentrySpanProcessor's internal queue drains, otherwise dropping + closing spans and ERROR breadcrumbs from the just-finished task. + """ + if not settings.OTEL_ENABLED: + return + try: + tp = trace.get_tracer_provider() + if hasattr(tp, "force_flush"): + tp.force_flush(timeout_millis=timeout_millis) + except Exception: + logger.exception("[flush_telemetry] Failed to flush tracer provider") + + try: + if sentry_sdk.get_client().is_active(): + sentry_sdk.flush(timeout=timeout_millis / 1000) + except Exception: + logger.exception("[flush_telemetry] Failed to flush Sentry") + + +def instrument_app(app: object) -> None: + """Instrument the FastAPI app. Call after the app is created.""" + if not settings.OTEL_ENABLED: + return + # Health checks are high-volume/noise and should not generate traces. + FastAPIInstrumentor.instrument_app( # type: ignore[arg-type] + app, + excluded_urls=r"^/health/?$", + ) + logger.debug("[instrument_app] FastAPI instrumented with OpenTelemetry") + + +def instrument_db_engine(engine: object) -> None: + """Instrument SQLAlchemy engine with DB query spans + pool gauges.""" + if not settings.OTEL_ENABLED: + return + if getattr(engine, "_kaapi_db_telemetry_instrumented", False): + return + + try: + from sqlalchemy import event + except Exception: + logger.exception("[instrument_db_engine] Failed importing SQLAlchemy events") + return + + def _pool_snapshot(pool: Any) -> tuple[int, int, int, int] | None: + if not all(hasattr(pool, attr) for attr in ("checkedout", "size", "overflow")): + return None + active = int(pool.checkedout()) + idle = int(pool.checkedin()) if hasattr(pool, "checkedin") else 0 + configured_size = int(pool.size()) + overflow = int(pool.overflow()) + total = max(configured_size + overflow, active + idle) + return active, idle, total, overflow + + def _emit_pool_metrics(pool: Any) -> None: + snapshot = _pool_snapshot(pool) + if not snapshot: + return + active, idle, total, overflow = snapshot + record_db_pool_stats(active=active, idle=idle, total=total, overflow=overflow) + + @event.listens_for(engine, "before_cursor_execute") + def _before_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ) -> None: + del cursor, parameters, executemany + context._kaapi_db_started_at = time.perf_counter() + context._kaapi_db_operation = ( + str(statement).split(None, 1)[0].upper() if statement else "UNKNOWN" + ) + _emit_pool_metrics(conn.engine.pool) + + @event.listens_for(engine, "after_cursor_execute") + def _after_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ) -> None: + del cursor, statement, parameters, executemany + started_at = getattr(context, "_kaapi_db_started_at", None) + duration_ms = ( + (time.perf_counter() - started_at) * 1000 if started_at is not None else 0.0 + ) + operation = getattr(context, "_kaapi_db_operation", None) + record_db_query_finished( + duration_ms=duration_ms, operation=operation, error=False + ) + _emit_pool_metrics(conn.engine.pool) + + @event.listens_for(engine, "handle_error") + def _handle_error(exception_context) -> None: + context = exception_context.execution_context + if context is None: + return + started_at = getattr(context, "_kaapi_db_started_at", None) + duration_ms = ( + (time.perf_counter() - started_at) * 1000 if started_at is not None else 0.0 + ) + operation = getattr(context, "_kaapi_db_operation", None) + record_db_query_finished( + duration_ms=duration_ms, operation=operation, error=True + ) + + @event.listens_for(engine.pool, "checkout") + def _on_checkout(dbapi_connection, connection_record, connection_proxy) -> None: + del dbapi_connection, connection_record, connection_proxy + _emit_pool_metrics(engine.pool) + + @event.listens_for(engine.pool, "checkin") + def _on_checkin(dbapi_connection, connection_record) -> None: + del dbapi_connection, connection_record + _emit_pool_metrics(engine.pool) + + setattr(engine, "_kaapi_db_telemetry_instrumented", True) + logger.debug("[instrument_db_engine] SQLAlchemy DB telemetry enabled") diff --git a/backend/app/main.py b/backend/app/main.py index 47a27f371..cb965fe4d 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,4 +1,12 @@ +import logging + import sentry_sdk +from sentry_sdk.integrations.celery import CeleryIntegration +from sentry_sdk.integrations.fastapi import FastApiIntegration +from sentry_sdk.integrations.httpx import HttpxIntegration +from sentry_sdk.integrations.logging import LoggingIntegration +from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration +from sentry_sdk.integrations.starlette import StarletteIntegration from fastapi import FastAPI from fastapi.routing import APIRoute @@ -8,20 +16,49 @@ from app.api.docs.openapi_config import tags_metadata, customize_openapi_schema from app.core.config import settings from app.core.exception_handlers import register_exception_handlers +from app.core.logger import configure_logging from app.core.middleware import http_request_logger +from app.core.sentry_filters import before_send_transaction_filter +from app.core.telemetry import instrument_app, setup_telemetry from app.load_env import load_environment # Load environment variables load_environment() +configure_logging() + + +if settings.SENTRY_DSN: + sentry_sdk.init( + dsn=str(settings.SENTRY_DSN), + environment=settings.ENVIRONMENT, + release=settings.API_VERSION, + instrumenter="otel", + traces_sample_rate=1.0, + enable_logs=True, + before_send_transaction=before_send_transaction_filter, + integrations=[ + LoggingIntegration( + level=logging.INFO, + sentry_logs_level=logging.INFO, + ), + ], + disabled_integrations=[ + FastApiIntegration(), + StarletteIntegration(), + SqlalchemyIntegration(), + CeleryIntegration(), + HttpxIntegration(), + ], + ) +setup_telemetry() -def custom_generate_unique_id(route: APIRoute) -> str: - return f"{route.tags[0]}-{route.name}" +def custom_generate_unique_id(route: APIRoute) -> str: + tag = route.tags[0] if route.tags else "default" + return f"{tag}-{route.name}" -if settings.SENTRY_DSN and settings.ENVIRONMENT != "development": - sentry_sdk.init(dsn=str(settings.SENTRY_DSN), enable_tracing=True) app = FastAPI( title=settings.PROJECT_NAME, @@ -58,3 +95,13 @@ def custom_openapi(): app.include_router(api_router, prefix=settings.API_V1_STR) register_exception_handlers(app) + +instrument_app(app) + + +# health check endpoint for uptime monitoring +@app.get("/health", include_in_schema=False) +async def health() -> dict[str, str | float]: + return { + "status": "ok", + } diff --git a/backend/app/services/collections/create_collection.py b/backend/app/services/collections/create_collection.py index eb37fd039..06cc05802 100644 --- a/backend/app/services/collections/create_collection.py +++ b/backend/app/services/collections/create_collection.py @@ -2,11 +2,13 @@ import time from uuid import UUID, uuid4 +from opentelemetry import trace from sqlmodel import Session from asgi_correlation_id import correlation_id from app.core.cloud import get_cloud_storage from app.core.db import engine +from app.core.telemetry import log_context from app.crud import ( CollectionCrud, DocumentCrud, @@ -31,6 +33,7 @@ logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) def start_job( @@ -41,28 +44,36 @@ def start_job( with_assistant: bool, organization_id: int, ) -> str: - trace_id = correlation_id.get() or "N/A" - - job_crud = CollectionJobCrud(db, project_id) - collection_job = job_crud.update( - collection_job_id, CollectionJobUpdate(trace_id=trace_id) - ) - - task_id = start_create_collection_job( + with log_context( + tag="collection", + lifecycle="collection.create.start_job", + action="create", + collection_job_id=collection_job_id, project_id=project_id, - job_id=str(collection_job_id), - trace_id=trace_id, - request=request.model_dump(mode="json"), - with_assistant=with_assistant, organization_id=organization_id, - ) + ): + trace_id = correlation_id.get() or "N/A" - logger.info( - "[create_collection.start_job] Job scheduled to create collection | " - f"collection_job_id={collection_job_id}, project_id={project_id}, task_id={task_id}" - ) + job_crud = CollectionJobCrud(db, project_id) + collection_job = job_crud.update( + collection_job_id, CollectionJobUpdate(trace_id=trace_id) + ) + + task_id = start_create_collection_job( + project_id=project_id, + job_id=str(collection_job_id), + trace_id=trace_id, + request=request.model_dump(mode="json"), + with_assistant=with_assistant, + organization_id=organization_id, + ) - return collection_job_id + logger.info( + "[create_collection.start_job] Job scheduled to create collection | " + f"collection_job_id={collection_job_id}, project_id={project_id}, task_id={task_id}" + ) + + return collection_job_id def build_success_payload( @@ -151,126 +162,145 @@ def execute_job( """ start_time = time.time() - # Keeping the references for potential backout/cleanup on failure collection_job = None result = None creation_request = None provider = None - try: - creation_request = CreationRequest(**request) - if ( - with_assistant - ): # this will be removed once dalgo switches to vector store creation only - creation_request.provider = "openai" + with log_context( + tag="collection", + lifecycle="collection.create.execute_job", + action="create", + collection_job_id=job_id, + task_id=task_id, + project_id=project_id, + organization_id=organization_id, + ), tracer.start_as_current_span("collections.create.execute_job") as span: + span.set_attribute("collection.job_id", str(job_id)) + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("kaapi.organization_id", organization_id) + + try: + creation_request = CreationRequest(**request) + if with_assistant: + creation_request.provider = "openai" + + span.set_attribute("collection.provider", str(creation_request.provider)) + + job_uuid = UUID(job_id) + + with Session(engine) as session: + document_crud = DocumentCrud(session, project_id) + flat_docs = document_crud.read_each(creation_request.documents) + + file_exts = { + doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname + } + total_size_kb = sum(doc.file_size_kb or 0 for doc in flat_docs) + total_size_mb = round(total_size_kb / 1024, 2) + span.set_attribute("collection.documents.count", len(flat_docs)) + span.set_attribute("collection.documents.total_size_mb", total_size_mb) + + with Session(engine) as session: + collection_job_crud = CollectionJobCrud(session, project_id) + collection_job = collection_job_crud.read_one(job_uuid) + collection_job = collection_job_crud.update( + job_uuid, + CollectionJobUpdate( + task_id=task_id, + status=CollectionJobStatus.PROCESSING, + total_size_mb=total_size_mb, + ), + ) - job_uuid = UUID(job_id) + storage = get_cloud_storage(session=session, project_id=project_id) + provider = get_llm_provider( + session=session, + provider=creation_request.provider, + project_id=project_id, + organization_id=organization_id, + ) - with Session(engine) as session: - document_crud = DocumentCrud(session, project_id) - flat_docs = document_crud.read_each(creation_request.documents) + with tracer.start_as_current_span("collections.create.provider"): + result = provider.create( + collection_request=creation_request, + storage=storage, + documents=flat_docs, + ) - file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname} - total_size_kb = sum(doc.file_size_kb or 0 for doc in flat_docs) - total_size_mb = round(total_size_kb / 1024, 2) + llm_service_id = result.llm_service_id + llm_service_name = result.llm_service_name + + with Session(engine) as session: + collection_crud = CollectionCrud(session, project_id) + collection_id = uuid4() + + collection = Collection( + id=collection_id, + project_id=project_id, + llm_service_id=llm_service_id, + llm_service_name=llm_service_name, + provider=creation_request.provider, + name=creation_request.name, + description=creation_request.description, + ) + collection_crud.create(collection) + collection = collection_crud.read_one(collection.id) + + if flat_docs: + DocumentCollectionCrud(session).create(collection, flat_docs) + + collection_job_crud = CollectionJobCrud(session, project_id) + collection_job = collection_job_crud.update( + collection_job.id, + CollectionJobUpdate( + status=CollectionJobStatus.SUCCESSFUL, + collection_id=collection.id, + ), + ) - with Session(engine) as session: - collection_job_crud = CollectionJobCrud(session, project_id) - collection_job = collection_job_crud.read_one(job_uuid) - collection_job = collection_job_crud.update( - job_uuid, - CollectionJobUpdate( - task_id=task_id, - status=CollectionJobStatus.PROCESSING, - total_size_mb=total_size_mb, - ), - ) + success_payload = build_success_payload(collection_job, collection) - storage = get_cloud_storage(session=session, project_id=project_id) + span.set_attribute("collection.id", str(collection_id)) - provider = get_llm_provider( - session=session, - provider=creation_request.provider, - project_id=project_id, - organization_id=organization_id, + elapsed = time.time() - start_time + logger.info( + "[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Total Size: %s MB | Types: %s", + collection_id, + elapsed, + len(flat_docs), + collection_job.total_size_mb, + list(file_exts), ) - result = provider.create( - collection_request=creation_request, - storage=storage, - documents=flat_docs, - ) - - llm_service_id = result.llm_service_id - llm_service_name = result.llm_service_name - - with Session(engine) as session: - collection_crud = CollectionCrud(session, project_id) - - collection_id = uuid4() - - collection = Collection( - id=collection_id, - project_id=project_id, - llm_service_id=llm_service_id, - llm_service_name=llm_service_name, - provider=creation_request.provider, - name=creation_request.name, - description=creation_request.description, + if creation_request.callback_url: + send_callback(creation_request.callback_url, success_payload) + + except Exception as err: + span.record_exception(err) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(err))) + logger.error( + "[create_collection.execute_job] Collection Creation Failed | {'collection_job_id': '%s', 'error': '%s'}", + job_id, + str(err), + exc_info=True, ) - collection_crud.create(collection) - collection = collection_crud.read_one(collection.id) - if flat_docs: - DocumentCollectionCrud(session).create(collection, flat_docs) + if provider is not None and result is not None: + try: + provider.delete(result) + except Exception: + logger.warning( + "[create_collection.execute_job] Provider cleanup failed" + ) - collection_job_crud = CollectionJobCrud(session, project_id) - collection_job = collection_job_crud.update( - collection_job.id, - CollectionJobUpdate( - status=CollectionJobStatus.SUCCESSFUL, - collection_id=collection.id, - ), + collection_job = _mark_job_failed( + project_id=project_id, + job_id=job_id, + err=err, + collection_job=collection_job, ) - success_payload = build_success_payload(collection_job, collection) - - elapsed = time.time() - start_time - logger.info( - "[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Total Size: %s MB | Types: %s", - collection_id, - elapsed, - len(flat_docs), - collection_job.total_size_mb, - list(file_exts), - ) - - if creation_request.callback_url: - send_callback(creation_request.callback_url, success_payload) - - except Exception as err: - logger.error( - "[create_collection.execute_job] Collection Creation Failed | {'collection_job_id': '%s', 'error': '%s'}", - job_id, - str(err), - exc_info=True, - ) - - if provider is not None and result is not None: - try: - provider.delete(result) - except Exception: - logger.warning( - "[create_collection.execute_job] Provider cleanup failed" - ) - - collection_job = _mark_job_failed( - project_id=project_id, - job_id=job_id, - err=err, - collection_job=collection_job, - ) - - if creation_request and creation_request.callback_url and collection_job: - failure_payload = build_failure_payload(collection_job, str(err)) - send_callback(creation_request.callback_url, failure_payload) + if creation_request and creation_request.callback_url and collection_job: + failure_payload = build_failure_payload(collection_job, str(err)) + send_callback(creation_request.callback_url, failure_payload) diff --git a/backend/app/services/collections/delete_collection.py b/backend/app/services/collections/delete_collection.py index 8d10cc119..88c85694d 100644 --- a/backend/app/services/collections/delete_collection.py +++ b/backend/app/services/collections/delete_collection.py @@ -1,6 +1,7 @@ import logging from uuid import UUID +from opentelemetry import trace from sqlmodel import Session from asgi_correlation_id import correlation_id @@ -17,10 +18,12 @@ from app.services.collections.helpers import extract_error_message from app.services.collections.providers.registry import get_llm_provider from app.celery.utils import start_delete_collection_job +from app.core.telemetry import log_context from app.utils import send_callback, APIResponse logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) def start_job( @@ -30,27 +33,36 @@ def start_job( collection_job_id: UUID, organization_id: int, ) -> str: - trace_id = correlation_id.get() or "N/A" - - job_crud = CollectionJobCrud(db, project_id) - collection_job = job_crud.update( - collection_job_id, CollectionJobUpdate(trace_id=trace_id) - ) - - task_id = start_delete_collection_job( + with log_context( + tag="collection", + lifecycle="collection.delete.start_job", + action="delete", + collection_job_id=collection_job_id, + collection_id=request.collection_id, project_id=project_id, - job_id=str(collection_job_id), - trace_id=trace_id, - collection_id=str(request.collection_id), - request=request.model_dump(mode="json"), organization_id=organization_id, - ) + ): + trace_id = correlation_id.get() or "N/A" - logger.info( - "[delete_collection.start_job] Job scheduled to delete collection | " - f"Job_id={collection_job_id}, project_id={project_id}, task_id={task_id}, collection_id={request.collection_id}" - ) - return collection_job_id + job_crud = CollectionJobCrud(db, project_id) + collection_job = job_crud.update( + collection_job_id, CollectionJobUpdate(trace_id=trace_id) + ) + + task_id = start_delete_collection_job( + project_id=project_id, + job_id=str(collection_job_id), + trace_id=trace_id, + collection_id=str(request.collection_id), + request=request.model_dump(mode="json"), + organization_id=organization_id, + ) + + logger.info( + "[delete_collection.start_job] Job scheduled to delete collection | " + f"Job_id={collection_job_id}, project_id={project_id}, task_id={task_id}, collection_id={request.collection_id}" + ) + return collection_job_id def build_success_payload(collection_job: CollectionJob, collection_id: UUID) -> dict: @@ -155,60 +167,79 @@ def execute_job( collection_job = None - try: - with Session(engine) as session: - collection_job_crud = CollectionJobCrud(session, project_id) - collection_job = collection_job_crud.read_one(job_uuid) - collection_job = collection_job_crud.update( - job_uuid, - CollectionJobUpdate( - task_id=task_id, - status=CollectionJobStatus.PROCESSING, - ), + with log_context( + tag="collection", + lifecycle="collection.delete.execute_job", + action="delete", + collection_job_id=job_id, + collection_id=collection_id, + task_id=task_id, + project_id=project_id, + organization_id=organization_id, + ), tracer.start_as_current_span("collections.delete.execute_job") as span: + span.set_attribute("collection.id", str(collection_id)) + span.set_attribute("collection.job_id", str(job_uuid)) + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("kaapi.organization_id", organization_id) + + try: + with Session(engine) as session: + collection_job_crud = CollectionJobCrud(session, project_id) + collection_job = collection_job_crud.read_one(job_uuid) + collection_job = collection_job_crud.update( + job_uuid, + CollectionJobUpdate( + task_id=task_id, + status=CollectionJobStatus.PROCESSING, + ), + ) + + collection = CollectionCrud(session, project_id).read_one(collection_id) + span.set_attribute("collection.provider", str(collection.provider)) + + provider = get_llm_provider( + session=session, + provider=collection.provider, + project_id=project_id, + organization_id=organization_id, + ) + + with tracer.start_as_current_span("collections.delete.provider"): + provider.delete(collection) + + with Session(engine) as session: + CollectionCrud(session, project_id).delete_by_id(collection_id) + + collection_job_crud = CollectionJobCrud(session, project_id) + collection_job_crud.update( + collection_job.id, + CollectionJobUpdate( + status=CollectionJobStatus.SUCCESSFUL, + error_message=None, + ), + ) + collection_job = collection_job_crud.read_one(collection_job.id) + + logger.info( + "[delete_collection.execute_job] Collection deleted successfully | " + "{'collection_id': '%s', 'job_id': '%s'}", + str(collection_id), + str(job_uuid), ) - - collection = CollectionCrud(session, project_id).read_one(collection_id) - - provider = get_llm_provider( - session=session, - provider=collection.provider, + if deletion_request.callback_url and collection_job: + success_payload = build_success_payload( + collection_job=collection_job, + collection_id=collection_id, + ) + send_callback(deletion_request.callback_url, success_payload) + + except Exception as err: + span.record_exception(err) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(err))) + _mark_job_failed_and_callback( project_id=project_id, - organization_id=organization_id, - ) - - provider.delete(collection) - - with Session(engine) as session: - CollectionCrud(session, project_id).delete_by_id(collection_id) - - collection_job_crud = CollectionJobCrud(session, project_id) - collection_job_crud.update( - collection_job.id, - CollectionJobUpdate( - status=CollectionJobStatus.SUCCESSFUL, - error_message=None, - ), - ) - collection_job = collection_job_crud.read_one(collection_job.id) - - logger.info( - "[delete_collection.execute_job] Collection deleted successfully | " - "{'collection_id': '%s', 'job_id': '%s'}", - str(collection_id), - str(job_uuid), - ) - if deletion_request.callback_url and collection_job: - success_payload = build_success_payload( - collection_job=collection_job, collection_id=collection_id, + job_id=job_uuid, + err=err, + callback_url=deletion_request.callback_url, ) - send_callback(deletion_request.callback_url, success_payload) - - except Exception as err: - _mark_job_failed_and_callback( - project_id=project_id, - collection_id=collection_id, - job_id=job_uuid, - err=err, - callback_url=deletion_request.callback_url, - ) diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index 484141376..a54c22352 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -1,14 +1,26 @@ import logging +import time from contextlib import contextmanager +from typing import Any from uuid import UUID from asgi_correlation_id import correlation_id from fastapi import HTTPException +from opentelemetry import trace from sqlmodel import Session from app.celery.utils import start_llm_chain_job, start_llm_job from app.core.db import engine from app.core.langfuse.langfuse import observe_llm_execution +from app.core.telemetry import ( + set_gen_ai_request_attributes, + set_gen_ai_response_attributes, + flush_telemetry, + log_context, + record_llm_call_finished, + record_llm_call_started, + suppress_http_instrumentation, +) from app.crud.config import ConfigVersionCrud from app.crud.credentials import get_provider_credential from app.crud.jobs import JobCrud @@ -37,45 +49,81 @@ from app.utils import APIResponse, cleanup_temp_file, resolve_input, send_callback logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + + +def _execute_provider_call( + *, + func, + completion_config: Any, + query: QueryParams, + credentials: dict | None, + session_id: str | None, + **kwargs: Any, +) -> tuple[Any, Any]: + kwargs.pop("organization_id", None) + kwargs.pop("project_id", None) + kwargs.pop("telemetry_span", None) + + decorated = observe_llm_execution( + session_id=session_id, + credentials=credentials, + )(func) + + with suppress_http_instrumentation(): + return decorated(completion_config, query, **kwargs) def start_job( db: Session, request: LLMCallRequest, project_id: int, organization_id: int ) -> UUID: """Create an LLM job and schedule Celery task.""" - trace_id = correlation_id.get() or "N/A" - job_crud = JobCrud(session=db) - job = job_crud.create( - job_type=JobType.LLM_API, trace_id=trace_id, project_id=project_id - ) - - logger.info( - f"[start_job] Created job | job_id={job.id}, status={job.status}, project_id={project_id}" - ) - - try: - task_id = start_llm_job( - project_id=project_id, - job_id=str(job.id), - trace_id=trace_id, - request_data=request.model_dump(mode="json"), - organization_id=organization_id, - ) - except Exception as e: - logger.error( - f"[start_job] Error starting Celery task: {str(e)} | job_id={job.id}, project_id={project_id}", - exc_info=True, + with log_context( + tag="llm-call", + lifecycle="llm.call.start_job", + project_id=project_id, + organization_id=organization_id, + ), tracer.start_as_current_span("llm.start_job") as span: + span.set_attribute("kaapi.project_id", project_id) + span.set_attribute("kaapi.organization_id", organization_id) + + trace_id = correlation_id.get() or "N/A" + job_crud = JobCrud(session=db) + job = job_crud.create( + job_type=JobType.LLM_API, trace_id=trace_id, project_id=project_id ) - job_update = JobUpdate(status=JobStatus.FAILED, error_message=str(e)) - job_crud.update(job_id=job.id, job_update=job_update) - raise HTTPException( - status_code=500, detail="Internal server error while executing LLM call" + span.set_attribute("llm.job_id", str(job.id)) + + logger.info( + f"[start_job] Created job | job_id={job.id}, status={job.status}, project_id={project_id}" ) - logger.info( - f"[start_job] Job scheduled for LLM call | job_id={job.id}, project_id={project_id}, task_id={task_id}" - ) - return job.id + try: + task_id = start_llm_job( + project_id=project_id, + job_id=str(job.id), + trace_id=trace_id, + request_data=request.model_dump(mode="json"), + organization_id=organization_id, + ) + except Exception as e: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + logger.error( + f"[start_job] Error starting Celery task: {str(e)} | job_id={job.id}, project_id={project_id}", + exc_info=True, + ) + job_update = JobUpdate(status=JobStatus.FAILED, error_message=str(e)) + job_crud.update(job_id=job.id, job_update=job_update) + raise HTTPException( + status_code=500, detail="Internal server error while executing LLM call" + ) + + span.set_attribute("celery.task_id", str(task_id)) + logger.info( + f"[start_job] Job scheduled for LLM call | job_id={job.id}, project_id={project_id}, task_id={task_id}" + ) + return job.id def start_chain_job( @@ -88,34 +136,41 @@ def start_chain_job( job_type=JobType.LLM_CHAIN, trace_id=trace_id, project_id=project_id ) - logger.info( - f"[start_chain_job] Created job | job_id={job.id}, status={job.status}, project_id={project_id}" - ) - - try: - task_id = start_llm_chain_job( - project_id=project_id, - job_id=str(job.id), - trace_id=trace_id, - request_data=request.model_dump(mode="json"), - organization_id=organization_id, - ) - except Exception as e: - logger.error( - f"[start_chain_job] Error starting Celery task: {str(e)} | job_id={job.id}, project_id={project_id}", - exc_info=True, - ) - job_update = JobUpdate(status=JobStatus.FAILED, error_message=str(e)) - job_crud.update(job_id=job.id, job_update=job_update) - raise HTTPException( - status_code=500, - detail="Internal server error while executing LLM chain job", + with log_context( + tag="llm-chain", + lifecycle="llm.chain.start_job", + job_id=job.id, + project_id=project_id, + organization_id=organization_id, + ): + logger.info( + f"[start_chain_job] Created job | job_id={job.id}, status={job.status}, project_id={project_id}" ) - logger.info( - f"[start_chain_job] Job scheduled for LLM chain job | job_id={job.id}, project_id={project_id}, task_id={task_id}" - ) - return job.id + try: + task_id = start_llm_chain_job( + project_id=project_id, + job_id=str(job.id), + trace_id=trace_id, + request_data=request.model_dump(mode="json"), + organization_id=organization_id, + ) + except Exception as e: + logger.error( + f"[start_chain_job] Error starting Celery task: {str(e)} | job_id={job.id}, project_id={project_id}", + exc_info=True, + ) + job_update = JobUpdate(status=JobStatus.FAILED, error_message=str(e)) + job_crud.update(job_id=job.id, job_update=job_update) + raise HTTPException( + status_code=500, + detail="Internal server error while executing LLM chain job", + ) + + logger.info( + f"[start_chain_job] Job scheduled for LLM chain job | job_id={job.id}, project_id={project_id}, task_id={task_id}" + ) + return job.id def handle_job_error( @@ -125,10 +180,13 @@ def handle_job_error( ) -> dict: """Handle job failure uniformly — send callback and update DB.""" if callback_url: - send_callback( - callback_url=callback_url, - data=callback_response.model_dump(), - ) + with tracer.start_as_current_span("llm.send_callback") as cb_span: + cb_span.set_attribute("callback.url", callback_url) + cb_span.set_attribute("callback.status", "failure") + send_callback( + callback_url=callback_url, + data=callback_response.model_dump(), + ) with Session(engine) as session: JobCrud(session=session).update( @@ -337,31 +395,41 @@ def execute_llm_call( try: with Session(engine) as session: - if config.is_stored_config: - config_crud = ConfigVersionCrud( - session=session, project_id=project_id, config_id=config.id - ) - config_blob, error = resolve_config_blob(config_crud, config) - logger.info(f"----the resolved config blob is {config_blob}") - if error: - return BlockResult(error=error) - else: - config_blob = config.blob + with tracer.start_as_current_span("llm.resolve_config") as cfg_span: + cfg_span.set_attribute("llm.job_id", str(job_id)) + cfg_span.set_attribute("llm.config.is_stored", config.is_stored_config) + if config.is_stored_config: + cfg_span.set_attribute("llm.config.id", str(config.id)) + cfg_span.set_attribute("llm.config.version", str(config.version)) + config_crud = ConfigVersionCrud( + session=session, project_id=project_id, config_id=config.id + ) + config_blob, error = resolve_config_blob(config_crud, config) + if error: + cfg_span.set_status(trace.Status(trace.StatusCode.ERROR, error)) + return BlockResult(error=error) + else: + config_blob = config.blob if config_blob.prompt_template and isinstance(query.input, TextInput): template = config_blob.prompt_template.template interpolated = template.replace("{{input}}", query.input.content.value) query.input.content.value = interpolated - query, input_error = apply_input_guardrails( - config_blob=config_blob, - query=query, - job_id=job_id, - project_id=project_id, - organization_id=organization_id, - ) - if input_error: - return BlockResult(error=input_error) + with tracer.start_as_current_span("llm.guardrails.input") as guard_span: + guard_span.set_attribute("llm.job_id", str(job_id)) + query, input_error = apply_input_guardrails( + config_blob=config_blob, + query=query, + job_id=job_id, + project_id=project_id, + organization_id=organization_id, + ) + if input_error: + guard_span.set_status( + trace.Status(trace.StatusCode.ERROR, input_error) + ) + return BlockResult(error=input_error) completion_config = config_blob.completion original_provider = completion_config.provider @@ -374,6 +442,8 @@ def execute_llm_call( request_metadata = {} request_metadata.setdefault("warnings", []).extend(warnings) + model_name = str(completion_config.params.get("model") or "") + resolved_config_blob = ConfigBlob( completion=completion_config, prompt_template=config_blob.prompt_template, @@ -381,33 +451,45 @@ def execute_llm_call( output_guardrails=config_blob.output_guardrails, ) - try: - llm_call_request = LLMCallRequest( - query=query, - config=config, - request_metadata=request_metadata, - ) - llm_call = create_llm_call( - session, - request=llm_call_request, - job_id=job_id, - project_id=project_id, - organization_id=organization_id, - resolved_config=resolved_config_blob, - original_provider=original_provider, - chain_id=chain_id, + with tracer.start_as_current_span("llm.create_call_record") as create_span: + create_span.set_attribute("llm.job_id", str(job_id)) + create_span.set_attribute( + "llm.provider", str(completion_config.provider) ) - llm_call_id = llm_call.id - logger.info( - f"[execute_llm_call] Created LLM call record | " - f"llm_call_id={llm_call_id}, job_id={job_id}" - ) - except Exception as e: - logger.error( - f"[execute_llm_call] Failed to create LLM call record: {e} | job_id={job_id}", - exc_info=True, - ) - return BlockResult(error=f"Failed to create LLM call record: {str(e)}") + if model_name: + create_span.set_attribute("llm.request.model", model_name) + try: + llm_call_request = LLMCallRequest( + query=query, + config=config, + request_metadata=request_metadata, + ) + llm_call = create_llm_call( + session, + request=llm_call_request, + job_id=job_id, + project_id=project_id, + organization_id=organization_id, + resolved_config=resolved_config_blob, + original_provider=original_provider, + chain_id=chain_id, + ) + llm_call_id = llm_call.id + create_span.set_attribute("llm.call_id", str(llm_call_id)) + logger.info( + f"[execute_llm_call] Created LLM call record | " + f"llm_call_id={llm_call_id}, job_id={job_id}" + ) + except Exception as e: + create_span.record_exception(e) + create_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + logger.error( + f"[execute_llm_call] Failed to create LLM call record: {e} | job_id={job_id}", + exc_info=True, + ) + return BlockResult( + error=f"Failed to create LLM call record: {str(e)}" + ) try: provider_instance = get_llm_provider( @@ -423,42 +505,143 @@ def execute_llm_call( if query.conversation and query.conversation.id: conversation_id = query.conversation.id - # Apply Langfuse observability decorator to provider execute method - decorated_execute = observe_llm_execution( - credentials=langfuse_credentials, - session_id=conversation_id, - )(provider_instance.execute) + operation = "chat" + provider_name = str(completion_config.provider) + model_name = str(completion_config.params.get("model") or "") + completion_type = str(completion_config.type or "") + record_llm_call_started( + provider=provider_name, + model=model_name, + operation=operation, + organization_id=organization_id, + project_id=project_id, + ) + provider_started_at = time.perf_counter() + response = None + error = None + + # Wrap the provider call in a `chat ` span so Sentry's AI Insights + # module recognises it (op=gen_ai.chat) and surfaces tokens / model / + # messages on the trace. This is the span AI Insights keys off — keep it + # as the parent of `llm.provider.execute`. + ai_span_name = f"chat {model_name}" if model_name else f"chat {provider_name}" + with tracer.start_as_current_span(ai_span_name) as ai_span: + ai_span.set_attribute("sentry.op", "gen_ai.chat") + ai_span.set_attribute("gen_ai.request.organization_id", organization_id) + ai_span.set_attribute("gen_ai.request.project_id", project_id) + if completion_type: + ai_span.set_attribute("completion_type", completion_type) + set_gen_ai_request_attributes( + ai_span, + provider=provider_name, + model=model_name, + operation=operation, + organization_id=organization_id, + project_id=project_id, + params=completion_config.params or {}, + ) + + try: + with resolved_input_context(query.input) as resolved_input: + with tracer.start_as_current_span( + "llm.provider.execute" + ) as provider_span: + provider_span.set_attribute("llm.provider", provider_name) + provider_span.set_attribute( + "llm.operation.name", "provider.execute" + ) + if completion_type: + provider_span.set_attribute( + "completion_type", completion_type + ) + if model_name: + provider_span.set_attribute("llm.request.model", model_name) + + response, error = _execute_provider_call( + func=provider_instance.execute, + completion_config=completion_config, + query=query, + credentials=langfuse_credentials, + session_id=conversation_id, + organization_id=organization_id, + project_id=project_id, + telemetry_span=provider_span, + resolved_input=resolved_input, + include_provider_raw_response=include_provider_raw_response, + ) + except ValueError as ve: + ai_span.set_status(trace.Status(trace.StatusCode.ERROR, str(ve))) + record_llm_call_finished( + provider=provider_name, + model=model_name, + operation=operation, + duration_ms=(time.perf_counter() - provider_started_at) * 1000, + error=True, + organization_id=organization_id, + project_id=project_id, + ) + return BlockResult(error=str(ve), llm_call_id=llm_call_id) - # Resolve input and execute LLM (context manager handles cleanup) - try: - with resolved_input_context(query.input) as resolved_input: - response, error = decorated_execute( - completion_config=completion_config, - query=query, - resolved_input=resolved_input, - include_provider_raw_response=include_provider_raw_response, + if response: + set_gen_ai_response_attributes(ai_span, response=response) + usage = response.usage + if usage: + ai_span.set_attribute("llm.usage.total_tokens", usage.total_tokens) + ai_span.set_attribute("kaapi_llm_input_tokens", usage.input_tokens) + ai_span.set_attribute( + "kaapi_llm_output_tokens", usage.output_tokens + ) + ai_span.set_attribute("kaapi_llm_total_tokens", usage.total_tokens) + else: + ai_span.set_status( + trace.Status(trace.StatusCode.ERROR, error or "Unknown error") ) - except ValueError as ve: - return BlockResult(error=str(ve), llm_call_id=llm_call_id) + + # Push the just-finished LLM span promptly instead of waiting for task teardown. + flush_telemetry(timeout_millis=10000) if response: with Session(engine) as session: if llm_call_id: - try: - update_llm_call_response( - session, - llm_call_id=llm_call_id, - provider_response_id=response.response.provider_response_id, - content=response.response.output.model_dump(), - usage=response.usage.model_dump(), - conversation_id=response.response.conversation_id, - ) - except Exception as e: - logger.error( - f"[execute_llm_call] Failed to update LLM call record: {e} | " - f"llm_call_id={llm_call_id}", - exc_info=True, - ) + with tracer.start_as_current_span( + "llm.update_call_record" + ) as update_span: + update_span.set_attribute("llm.call_id", str(llm_call_id)) + update_span.set_attribute("llm.job_id", str(job_id)) + try: + update_llm_call_response( + session, + llm_call_id=llm_call_id, + provider_response_id=response.response.provider_response_id, + content=response.response.output.model_dump(), + usage=response.usage.model_dump(), + conversation_id=response.response.conversation_id, + ) + except Exception as e: + update_span.record_exception(e) + update_span.set_status( + trace.Status(trace.StatusCode.ERROR, str(e)) + ) + logger.error( + f"[execute_llm_call] Failed to update LLM call record: {e} | " + f"llm_call_id={llm_call_id}", + exc_info=True, + ) + + duration_ms = (time.perf_counter() - provider_started_at) * 1000 + record_llm_call_finished( + provider=provider_name, + model=model_name, + operation=operation, + duration_ms=duration_ms, + input_tokens=response.usage.input_tokens, + output_tokens=response.usage.output_tokens, + total_tokens=response.usage.total_tokens, + error=False, + organization_id=organization_id, + project_id=project_id, + ) + result = BlockResult( response=response, llm_call_id=llm_call_id, @@ -466,22 +649,37 @@ def execute_llm_call( metadata=request_metadata, ) - result, output_error = apply_output_guardrails( - config_blob=config_blob, - result=result, - job_id=job_id, - project_id=project_id, - organization_id=organization_id, - ) - if output_error: - return BlockResult(error=output_error, llm_call_id=llm_call_id) + with tracer.start_as_current_span( + "llm.guardrails.output" + ) as out_guard_span: + out_guard_span.set_attribute("llm.job_id", str(job_id)) + result, output_error = apply_output_guardrails( + config_blob=config_blob, + result=result, + job_id=job_id, + project_id=project_id, + organization_id=organization_id, + ) + if output_error: + out_guard_span.set_status( + trace.Status(trace.StatusCode.ERROR, output_error) + ) + return BlockResult(error=output_error, llm_call_id=llm_call_id) return result - return BlockResult( - error=error or "Unknown error occurred", - llm_call_id=llm_call_id, + duration_ms = (time.perf_counter() - provider_started_at) * 1000 + record_llm_call_finished( + provider=provider_name, + model=model_name, + operation=operation, + duration_ms=duration_ms, + error=True, + organization_id=organization_id, + project_id=project_id, ) + error_message = error or "Unknown error occurred" + return BlockResult(error=error_message, llm_call_id=llm_call_id) except Exception as e: logger.error( @@ -508,78 +706,94 @@ def execute_job( dict: Serialized APIResponse[LLMCallResponse] on success, APIResponse[None] on failure """ request = LLMCallRequest(**request_data) - job_uuid = UUID(job_id) # Renamed to avoid shadowing parameter + job_uuid = UUID(job_id) callback_url_str = str(request.callback_url) if request.callback_url else None - logger.info( - f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}" - ) + with log_context( + tag="llm-call", + lifecycle="llm.call.execute_job", + job_id=job_uuid, + task_id=task_id, + project_id=project_id, + organization_id=organization_id, + ): + logger.info( + f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}" + ) - try: - with Session(engine) as session: - job_crud = JobCrud(session=session) - job_crud.update( - job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) - ) + try: + with Session(engine) as session: + job_crud = JobCrud(session=session) + job_crud.update( + job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) + ) + + langfuse_credentials = get_provider_credential( + session=session, + org_id=organization_id, + project_id=project_id, + provider="langfuse", + ) - langfuse_credentials = get_provider_credential( - session=session, - org_id=organization_id, + result = execute_llm_call( + config=request.config, + query=request.query, + job_id=job_uuid, project_id=project_id, - provider="langfuse", + organization_id=organization_id, + request_metadata=request.request_metadata, + langfuse_credentials=langfuse_credentials, + include_provider_raw_response=request.include_provider_raw_response, ) - result = execute_llm_call( - config=request.config, - query=request.query, - job_id=job_uuid, - project_id=project_id, - organization_id=organization_id, - request_metadata=request.request_metadata, - langfuse_credentials=langfuse_credentials, - include_provider_raw_response=request.include_provider_raw_response, - ) - - logger.info( - f"[execute_job] Error if any during execution of job: {result.error}" - ) - - if result.success: - callback_response = APIResponse.success_response( - data=result.response, metadata=result.metadata + logger.info( + f"[execute_job] Error if any during execution of job: {result.error}" ) - if callback_url_str: - send_callback( - callback_url=callback_url_str, - data=callback_response.model_dump(), - ) - with Session(engine) as session: - JobCrud(session=session).update( - job_id=job_uuid, job_update=JobUpdate(status=JobStatus.SUCCESS) + if result.success: + callback_response = APIResponse.success_response( + data=result.response, metadata=result.metadata ) - logger.info( - f"[execute_job] Successfully completed LLM job | job_id={job_id}, " - f"tokens={result.usage.total_tokens}" - ) - return callback_response.model_dump() + if callback_url_str: + with tracer.start_as_current_span("llm.send_callback") as cb_span: + cb_span.set_attribute("callback.url", callback_url_str) + cb_span.set_attribute("callback.status", "success") + cb_span.set_attribute("llm.job_id", str(job_uuid)) + send_callback( + callback_url=callback_url_str, + data=callback_response.model_dump(), + ) - callback_response = APIResponse.failure_response( - error=result.error or "Unknown error occurred", - metadata=request.request_metadata, - ) - return handle_job_error(job_uuid, callback_url_str, callback_response) + with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_uuid, job_update=JobUpdate(status=JobStatus.SUCCESS) + ) + logger.info( + f"[execute_job] Successfully completed LLM job | job_id={job_id}, " + f"tokens={result.usage.total_tokens}" + ) + return callback_response.model_dump() - except Exception as e: - callback_response = APIResponse.failure_response( - error="Unexpected error occurred", - metadata=request.request_metadata, - ) - logger.error( - f"[execute_job] Unexpected error: {str(e)} | job_id={job_uuid}, task_id={task_id}", - exc_info=True, - ) - return handle_job_error(job_uuid, callback_url_str, callback_response) + error_message = result.error or "Unknown error occurred" + callback_response = APIResponse.failure_response( + error=error_message, + metadata=request.request_metadata, + ) + return handle_job_error(job_uuid, callback_url_str, callback_response) + + except Exception as e: + callback_response = APIResponse.failure_response( + error="Unexpected error occurred", + metadata=request.request_metadata, + ) + logger.error( + f"[execute_job] Unexpected error: {str(e)} | job_id={job_uuid}, task_id={task_id}", + exc_info=True, + ) + return handle_job_error(job_uuid, callback_url_str, callback_response) + finally: + # Ensure task spans are pushed promptly so Sentry dashboards update faster. + flush_telemetry() def execute_chain_job( @@ -604,89 +818,103 @@ def execute_chain_job( callback_url_str = str(request.callback_url) if request.callback_url else None chain_uuid = None - logger.info( - f"[execute_chain_job] Starting chain execution | " - f"job_id={job_uuid}, total_blocks={len(request.blocks)}" - ) + with log_context( + tag="llm-chain", + lifecycle="llm.chain.execute_job", + job_id=job_uuid, + task_id=task_id, + project_id=project_id, + organization_id=organization_id, + total_blocks=len(request.blocks), + ): + logger.info( + f"[execute_chain_job] Starting chain execution | " + f"job_id={job_uuid}, total_blocks={len(request.blocks)}" + ) - try: - with Session(engine) as session: - chain_record = create_llm_chain( - session, + try: + with Session(engine) as session: + chain_record = create_llm_chain( + session, + job_id=job_uuid, + project_id=project_id, + organization_id=organization_id, + total_blocks=len(request.blocks), + input=serialize_input(request.query.input), + configs=[block.model_dump(mode="json") for block in request.blocks], + ) + chain_uuid = chain_record.id + + logger.info( + f"[execute_chain_job] Created chain record | " + f"chain_id={chain_uuid}, job_id={job_uuid}" + ) + + langfuse_credentials = get_provider_credential( + session=session, + org_id=organization_id, + project_id=project_id, + provider="langfuse", + ) + + context = ChainContext( job_id=job_uuid, + chain_id=chain_uuid, project_id=project_id, organization_id=organization_id, + langfuse_credentials=langfuse_credentials, + request_metadata=request.request_metadata, total_blocks=len(request.blocks), - input=serialize_input(request.query.input), - configs=[block.model_dump(mode="json") for block in request.blocks], + callback_url=str(request.callback_url) + if request.callback_url + else None, + intermediate_callback_flags=[ + block.intermediate_callback for block in request.blocks + ], ) - chain_uuid = chain_record.id - logger.info( - f"[execute_chain_job] Created chain record | " - f"chain_id={chain_uuid}, job_id={job_uuid}" - ) + blocks = [ + ChainBlock( + config=block.config, + index=i, + context=context, + include_provider_raw_response=block.include_provider_raw_response, + ) + for i, block in enumerate(request.blocks) + ] - langfuse_credentials = get_provider_credential( - session=session, - org_id=organization_id, - project_id=project_id, - provider="langfuse", - ) + chain = LLMChain(blocks, context) - context = ChainContext( - job_id=job_uuid, - chain_id=chain_uuid, - project_id=project_id, - organization_id=organization_id, - langfuse_credentials=langfuse_credentials, - request_metadata=request.request_metadata, - total_blocks=len(request.blocks), - callback_url=str(request.callback_url) if request.callback_url else None, - intermediate_callback_flags=[ - block.intermediate_callback for block in request.blocks - ], - ) + executor = ChainExecutor(chain=chain, context=context, request=request) + return executor.run() - blocks = [ - ChainBlock( - config=block.config, - index=i, - context=context, - include_provider_raw_response=block.include_provider_raw_response, + except Exception as e: + logger.error( + f"[execute_chain_job] Failed: {e} | job_id={job_uuid}", + exc_info=True, ) - for i, block in enumerate(request.blocks) - ] - - chain = LLMChain(blocks, context) - - executor = ChainExecutor(chain=chain, context=context, request=request) - return executor.run() - except Exception as e: - logger.error( - f"[execute_chain_job] Failed: {e} | job_id={job_uuid}", - exc_info=True, - ) - - if chain_uuid: - try: - with Session(engine) as session: - update_llm_chain_status( - session, - chain_id=chain_uuid, - status=ChainStatus.FAILED, - error=str(e), + if chain_uuid: + try: + with Session(engine) as session: + update_llm_chain_status( + session, + chain_id=chain_uuid, + status=ChainStatus.FAILED, + error=str(e), + ) + except Exception: + logger.error( + f"[execute_chain_job] Failed to update chain status: {e} | " + f"chain_id={chain_uuid}", + exc_info=True, ) - except Exception: - logger.error( - f"[execute_chain_job] Failed to update chain status: {e} | " - f"chain_id={chain_uuid}", - exc_info=True, - ) - callback_response = APIResponse.failure_response( - error="Unexpected error occurred", - metadata=request.request_metadata, - ) - return handle_job_error(job_uuid, callback_url_str, callback_response) + callback_response = APIResponse.failure_response( + error="Unexpected error occurred", + metadata=request.request_metadata, + ) + return handle_job_error(job_uuid, callback_url_str, callback_response) + finally: + # Ensure task spans are pushed promptly so Sentry dashboards update faster. + flush_telemetry() diff --git a/backend/uv.lock b/backend/uv.lock index 81cb37173..347957ddc 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12, <4.0" resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", From 997107285c1e61e07af6da96fb726f3dbf384557 Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Mon, 20 Apr 2026 18:03:52 +0530 Subject: [PATCH 2/6] Add OpenTelemetry dependencies for enhanced observability --- backend/pyproject.toml | 8 ++ backend/uv.lock | 179 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 781f26c29..b96cd0050 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -20,6 +20,14 @@ dependencies = [ "bcrypt==4.0.1", "pydantic-settings<3.0.0,>=2.2.1", "sentry-sdk[fastapi]>=2.20.0", + "opentelemetry-api>=1.30.0", + "opentelemetry-sdk>=1.30.0", + "opentelemetry-instrumentation>=0.51b0", + "opentelemetry-instrumentation-fastapi>=0.51b0", + "opentelemetry-instrumentation-celery>=0.51b0", + "opentelemetry-instrumentation-httpx>=0.51b0", + "opentelemetry-instrumentation-requests>=0.51b0", + "opentelemetry-instrumentation-logging>=0.51b0", "pyjwt<3.0.0,>=2.8.0", "boto3>=1.37.20", "moto[s3]>=5.1.1", diff --git a/backend/uv.lock b/backend/uv.lock index 347957ddc..ef8cf856c 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -240,6 +240,14 @@ dependencies = [ { name = "numpy" }, { name = "openai" }, { name = "openai-responses" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-instrumentation-celery" }, + { name = "opentelemetry-instrumentation-fastapi" }, + { name = "opentelemetry-instrumentation-httpx" }, + { name = "opentelemetry-instrumentation-logging" }, + { name = "opentelemetry-instrumentation-requests" }, + { name = "opentelemetry-sdk" }, { name = "pandas" }, { name = "passlib", extra = ["bcrypt"] }, { name = "pre-commit" }, @@ -294,6 +302,14 @@ requires-dist = [ { name = "numpy", specifier = ">=1.24.0" }, { name = "openai", specifier = ">=1.100.1" }, { name = "openai-responses" }, + { name = "opentelemetry-api", specifier = ">=1.30.0" }, + { name = "opentelemetry-instrumentation", specifier = ">=0.51b0" }, + { name = "opentelemetry-instrumentation-celery", specifier = ">=0.51b0" }, + { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.51b0" }, + { name = "opentelemetry-instrumentation-httpx", specifier = ">=0.51b0" }, + { name = "opentelemetry-instrumentation-logging", specifier = ">=0.51b0" }, + { name = "opentelemetry-instrumentation-requests", specifier = ">=0.51b0" }, + { name = "opentelemetry-sdk", specifier = ">=1.30.0" }, { name = "pandas", specifier = ">=2.3.2" }, { name = "passlib", extras = ["bcrypt"], specifier = ">=1.7.4,<2.0.0" }, { name = "pre-commit", specifier = ">=3.8.0" }, @@ -337,6 +353,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d9/ab/6936e2663c47a926e0659437b9333ad87d1ff49b1375d239026e0a268eba/asgi_correlation_id-4.3.4-py3-none-any.whl", hash = "sha256:36ce69b06c7d96b4acb89c7556a4c4f01a972463d3d49c675026cbbd08e9a0a2", size = 15262, upload-time = "2024-10-17T11:44:28.739Z" }, ] +[[package]] +name = "asgiref" +version = "3.11.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/63/40/f03da1264ae8f7cfdbf9146542e5e7e8100a4c66ab48e791df9a03d3f6c0/asgiref-3.11.1.tar.gz", hash = "sha256:5f184dc43b7e763efe848065441eac62229c9f7b0475f41f80e207a114eda4ce", size = 38550, upload-time = "2026-02-03T13:30:14.33Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/0a/a72d10ed65068e115044937873362e6e32fab1b7dce0046aeb224682c989/asgiref-3.11.1-py3-none-any.whl", hash = "sha256:e8667a091e69529631969fd45dc268fa79b99c92c5fcdda727757e52146ec133", size = 24345, upload-time = "2026-02-03T13:30:13.039Z" }, +] + [[package]] name = "attrs" version = "25.4.0" @@ -2242,6 +2267,160 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/98/50c755503a55550f170d0211297bc8791b8bf10bf04cb16b4b95ca71d1e3/openai_responses-0.13.1-py3-none-any.whl", hash = "sha256:b5fc7fb15f546b551757864c1dfaeb01b8a4fc0c353961bd6d0d45ff26389721", size = 51887, upload-time = "2025-12-02T21:37:40.15Z" }, ] +[[package]] +name = "opentelemetry-api" +version = "1.41.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "importlib-metadata" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/47/8e/3778a7e87801d994869a9396b9fc2a289e5f9be91ff54a27d41eace494b0/opentelemetry_api-1.41.0.tar.gz", hash = "sha256:9421d911326ec12dee8bc933f7839090cad7a3f13fcfb0f9e82f8174dc003c09", size = 71416, upload-time = "2026-04-09T14:38:34.544Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/58/ee/99ab786653b3bda9c37ade7e24a7b607a1b1f696063172768417539d876d/opentelemetry_api-1.41.0-py3-none-any.whl", hash = "sha256:0e77c806e6a89c9e4f8d372034622f3e1418a11bdbe1c80a50b3d3397ad0fa4f", size = 69007, upload-time = "2026-04-09T14:38:11.833Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "packaging" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f9/fd/b8e90bb340957f059084376f94cff336b0e871a42feba7d3f7342365e987/opentelemetry_instrumentation-0.62b0.tar.gz", hash = "sha256:aa1b0b9ab2e1722c2a8a5384fb016fc28d30bba51826676c8036074790d2861e", size = 34042, upload-time = "2026-04-09T14:40:22.843Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/00/b6/3356d2e335e3c449c5183e9b023f30f04f1b7073a6583c68745ea2e704b1/opentelemetry_instrumentation-0.62b0-py3-none-any.whl", hash = "sha256:30d4e76486eae64fb095264a70c2c809c4bed17b73373e53091470661f7d477c", size = 34158, upload-time = "2026-04-09T14:39:21.428Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-asgi" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "asgiref" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f1/38/999bf777774878971c2716de4b7a03cd57a7decb4af25090e703b79fa0e5/opentelemetry_instrumentation_asgi-0.62b0.tar.gz", hash = "sha256:93cde8c62e5918a3c1ff9ba020518127300e5e0816b7e8b14baf46a26ba619fc", size = 26779, upload-time = "2026-04-09T14:40:26.566Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/cf/29df82f5870178143bdb5c9a7be044b9f78c71e1c5dcf995242e86d80158/opentelemetry_instrumentation_asgi-0.62b0-py3-none-any.whl", hash = "sha256:89b62a6f996b260b162f515c25e6d78e39286e4cbe2f935899e51b32f31027e2", size = 17011, upload-time = "2026-04-09T14:39:27.305Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-celery" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/01/b4/20a3c8c669dc45aa3703c0370041d67e8be613f1829523cdaf634a5f9626/opentelemetry_instrumentation_celery-0.62b0.tar.gz", hash = "sha256:55e8fa48e5b886bcca448fa32e28a6cc2165157745e8328de479a826d3903095", size = 14808, upload-time = "2026-04-09T14:40:31.603Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f6/60/cf951e6bd6ec62ec55bd2384e0ba9841ea38f2d128c773d85dc60da97172/opentelemetry_instrumentation_celery-0.62b0-py3-none-any.whl", hash = "sha256:cadfd3e65287a36099dce5ba7e05d98e4c5f9479a455241e01d140ecc5c10935", size = 13864, upload-time = "2026-04-09T14:39:35.009Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-fastapi" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-instrumentation-asgi" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/37/09/92740c6d114d1bef392557a03ae6de64065c83c1b331dae9b57fe718497c/opentelemetry_instrumentation_fastapi-0.62b0.tar.gz", hash = "sha256:e4748e4e575077e08beaf2c5d2f369da63dd90882d89d73c4192a97356637dec", size = 25056, upload-time = "2026-04-09T14:40:36.438Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/64/bb/186ffe0fde0ad33ceb50e1d3596cc849b732d3b825592a6a507a40c8c49b/opentelemetry_instrumentation_fastapi-0.62b0-py3-none-any.whl", hash = "sha256:06d3272ad15f9daea5a0a27c32831aff376110a4b0394197120256ef6d610e6e", size = 13482, upload-time = "2026-04-09T14:39:43.446Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-httpx" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/77/a7/63e2c6325c8e99cd9b8e0229a8b61c37520ee537214a2c8d514e84486a94/opentelemetry_instrumentation_httpx-0.62b0.tar.gz", hash = "sha256:d865398db3f3c289ba226e355bf4d94460a4301c0c8916e3136caea55ae18000", size = 24182, upload-time = "2026-04-09T14:40:38.719Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c0/5e/7d5fc28487637871b015128cd5dbb3c36f6d343a9098b893bd803d5a9cca/opentelemetry_instrumentation_httpx-0.62b0-py3-none-any.whl", hash = "sha256:c7660b939c12608fec67743126e9b4dc23dceef0ed631c415924966b0d1579e3", size = 17200, upload-time = "2026-04-09T14:39:46.618Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-logging" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/47/8b81b7f007addf4dfd2b2f0753326e62822e1fec674605d0ec7c39d479b0/opentelemetry_instrumentation_logging-0.62b0.tar.gz", hash = "sha256:61f23be960e047054b3aa38998e7d1eb1fd9bef6f52097e28bc113af8b6f3bd8", size = 18968, upload-time = "2026-04-09T14:40:40.733Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c1/36/135fd0f4a154ea225de0a418d36f8cf9cf273ad31d41a3a2aaa91862b817/opentelemetry_instrumentation_logging-0.62b0-py3-none-any.whl", hash = "sha256:9a27b6c3d419170d96dedcea7d38cc0418f0dd1054365f52499a0a1eb70b8faf", size = 17489, upload-time = "2026-04-09T14:39:49.384Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-requests" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/20/77/bbc89f2264e03b53fc3e9462a69415f30c23e88c7f0f5e6ebf594471355e/opentelemetry_instrumentation_requests-0.62b0.tar.gz", hash = "sha256:4534f961729393e8070cd5b779fa42937f5b7380ef481107ffd4042b31816ce2", size = 18398, upload-time = "2026-04-09T14:40:49.615Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/97/54/a73d688d969283f344de96c43366912addb94bdbef413bcebac22979545e/opentelemetry_instrumentation_requests-0.62b0-py3-none-any.whl", hash = "sha256:edf61785ecb3ec6923e33c24074c82067f286a418f817b2b82546956d120e6d6", size = 14209, upload-time = "2026-04-09T14:40:02.987Z" }, +] + +[[package]] +name = "opentelemetry-sdk" +version = "1.41.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f8/0e/a586df1186f9f56b5a0879d52653effc40357b8e88fc50fe300038c3c08b/opentelemetry_sdk-1.41.0.tar.gz", hash = "sha256:7bddf3961131b318fc2d158947971a8e37e38b1cd23470cfb72b624e7cc108bd", size = 230181, upload-time = "2026-04-09T14:38:47.225Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2c/13/a7825118208cb32e6a4edcd0a99f925cbef81e77b3b0aedfd9125583c543/opentelemetry_sdk-1.41.0-py3-none-any.whl", hash = "sha256:a596f5687964a3e0d7f8edfdcf5b79cbca9c93c7025ebf5fb00f398a9443b0bd", size = 180214, upload-time = "2026-04-09T14:38:30.657Z" }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a3/b0/c14f723e86c049b7bf8ff431160d982519b97a7be2857ed2247377397a24/opentelemetry_semantic_conventions-0.62b0.tar.gz", hash = "sha256:cbfb3c8fc259575cf68a6e1b94083cc35adc4a6b06e8cf431efa0d62606c0097", size = 145753, upload-time = "2026-04-09T14:38:48.274Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/58/6c/5e86fa1759a525ef91c2d8b79d668574760ff3f900d114297765eb8786cb/opentelemetry_semantic_conventions-0.62b0-py3-none-any.whl", hash = "sha256:0ddac1ce59eaf1a827d9987ab60d9315fb27aea23304144242d1fcad9e16b489", size = 231619, upload-time = "2026-04-09T14:38:32.394Z" }, +] + +[[package]] +name = "opentelemetry-util-http" +version = "0.62b0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/9b/e7/830f7c57135158eb8a8efd3f94ab191a89e3b8a49bed314a35ee501da3f2/opentelemetry_util_http-0.62b0.tar.gz", hash = "sha256:a62e4b19b8a432c0de657f167dee3455516136bb9c6ed463ca8063019970d835", size = 11393, upload-time = "2026-04-09T14:40:59.442Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/7f/5c1b7d4385852b9e5eacd4e7f9d8b565d3d351d17463b24916ad098adf1a/opentelemetry_util_http-0.62b0-py3-none-any.whl", hash = "sha256:c20462808d8cc95b69b0dc4a3e02a9d36beb663347e96c931f51ffd78bd318ad", size = 9294, upload-time = "2026-04-09T14:40:19.014Z" }, +] + [[package]] name = "packaging" version = "24.2" From 8311d1902d34cd7509bf8dffc659a3b952dc21e5 Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Mon, 20 Apr 2026 18:07:32 +0530 Subject: [PATCH 3/6] Fix circular import issue for Celery instrumentation in telemetry setup --- backend/app/core/telemetry.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/app/core/telemetry.py b/backend/app/core/telemetry.py index 26246c52d..3e7897a46 100644 --- a/backend/app/core/telemetry.py +++ b/backend/app/core/telemetry.py @@ -10,7 +10,6 @@ import sentry_sdk from opentelemetry import context as otel_context from opentelemetry import trace -from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.instrumentation.logging import LoggingInstrumentor @@ -177,9 +176,15 @@ def setup_telemetry(service_name: str | None = None) -> None: # Auto-instrumentation — generates OTel spans the SentrySpanProcessor forwards LoggingInstrumentor().instrument(set_logging_format=False) - CeleryInstrumentor().instrument() HTTPXClientInstrumentor().instrument() RequestsInstrumentor().instrument() + try: + # Circular import fix + from opentelemetry.instrumentation.celery import CeleryInstrumentor + + CeleryInstrumentor().instrument() + except Exception: + logger.exception("[setup_telemetry] Failed to instrument Celery") logger.debug( "[setup_telemetry] OpenTelemetry initialized (service=%s, sink=Sentry)", From dea7c4d8759e06a0742a137ae3e5a864220f1ffb Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Mon, 20 Apr 2026 20:04:40 +0530 Subject: [PATCH 4/6] Refactor telemetry attributes and error handling in LLM and collection services --- backend/app/api/deps.py | 5 +---- backend/app/api/routes/llm.py | 10 ++++++++-- backend/app/services/collections/create_collection.py | 1 + backend/app/services/collections/delete_collection.py | 1 + backend/app/services/llm/jobs.py | 7 ++----- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/backend/app/api/deps.py b/backend/app/api/deps.py index dd37a747a..526c5877b 100644 --- a/backend/app/api/deps.py +++ b/backend/app/api/deps.py @@ -45,18 +45,15 @@ def get_db() -> Generator[Session, None, None]: def _set_tenant_span_attributes(auth_context: AuthContext) -> None: """Tag the active OTel span with tenant context so traces in Sentry can be - filtered by user / org / project.""" + filtered by user / org / project IDs.""" span = trace.get_current_span() if not span.is_recording(): return span.set_attribute("user.id", str(auth_context.user.id)) - span.set_attribute("user.email", auth_context.user.email) if auth_context.organization: span.set_attribute("tenant.org_id", auth_context.organization.id) - span.set_attribute("tenant.org_name", auth_context.organization.name) if auth_context.project: span.set_attribute("tenant.project_id", auth_context.project.id) - span.set_attribute("tenant.project_name", auth_context.project.name) def _authenticate_with_jwt(session: Session, token: str) -> AuthContext: diff --git a/backend/app/api/routes/llm.py b/backend/app/api/routes/llm.py index 73ba0e4bc..62d4dbb04 100644 --- a/backend/app/api/routes/llm.py +++ b/backend/app/api/routes/llm.py @@ -159,14 +159,20 @@ def get_llm_call_status( output=llm_call.content, ) - if not llm_call.usage: + usage_payload = llm_call.usage + if not usage_payload: logger.warning( f"[get_llm_call] Missing usage data for llm_call job_id={job_id}, project_id={project_id}" ) + usage_payload = { + "input_tokens": 0, + "output_tokens": 0, + "total_tokens": 0, + } llm_call_response = LLMCallResponse( response=llm_response, - usage=Usage(**llm_call.usage), + usage=Usage(**usage_payload), provider_raw_response=None, ) diff --git a/backend/app/services/collections/create_collection.py b/backend/app/services/collections/create_collection.py index 06cc05802..0ffedf96b 100644 --- a/backend/app/services/collections/create_collection.py +++ b/backend/app/services/collections/create_collection.py @@ -304,3 +304,4 @@ def execute_job( if creation_request and creation_request.callback_url and collection_job: failure_payload = build_failure_payload(collection_job, str(err)) send_callback(creation_request.callback_url, failure_payload) + raise diff --git a/backend/app/services/collections/delete_collection.py b/backend/app/services/collections/delete_collection.py index 88c85694d..99cfaa8bf 100644 --- a/backend/app/services/collections/delete_collection.py +++ b/backend/app/services/collections/delete_collection.py @@ -243,3 +243,4 @@ def execute_job( err=err, callback_url=deletion_request.callback_url, ) + raise diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index a54c22352..a37249d6c 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -597,9 +597,6 @@ def execute_llm_call( trace.Status(trace.StatusCode.ERROR, error or "Unknown error") ) - # Push the just-finished LLM span promptly instead of waiting for task teardown. - flush_telemetry(timeout_millis=10000) - if response: with Session(engine) as session: if llm_call_id: @@ -903,9 +900,9 @@ def execute_chain_job( status=ChainStatus.FAILED, error=str(e), ) - except Exception: + except Exception as update_err: logger.error( - f"[execute_chain_job] Failed to update chain status: {e} | " + f"[execute_chain_job] Failed to update chain status: {update_err} | " f"chain_id={chain_uuid}", exc_info=True, ) From 0f1b045e000490d17376d59e8ce83e9f2d9a27e3 Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Tue, 21 Apr 2026 10:42:37 +0530 Subject: [PATCH 5/6] Refactor observability-related code and improve error handling in job execution --- .gitignore | 2 - backend/app/api/routes/collections.py | 2 +- backend/app/celery/celery_app.py | 2 +- backend/app/core/middleware.py | 26 ++- backend/app/services/llm/jobs.py | 10 -- .../collections/test_create_collection.py | 40 +++-- .../collections/test_delete_collection.py | 170 ++++++++++++++++-- 7 files changed, 195 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index d0aba9b71..ad2127f45 100644 --- a/.gitignore +++ b/.gitignore @@ -21,5 +21,3 @@ ENV/ # /backend/app/logs - -/observability/ diff --git a/backend/app/api/routes/collections.py b/backend/app/api/routes/collections.py index 30718f1f3..0efbdb576 100644 --- a/backend/app/api/routes/collections.py +++ b/backend/app/api/routes/collections.py @@ -117,7 +117,7 @@ def create_collection( ) ) - # True iff both model and instructions were provided in the request body + # True if both model and instructions were provided in the request body with_assistant = bool( getattr(request, "model", None) and getattr(request, "instructions", None) ) diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index 04c5c95dc..2500947c8 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -77,7 +77,7 @@ def log_pool_status_failure( @worker_process_init.connect -def warm_llm_modules(**_) -> None: +def initialize_worker_process(**_) -> None: """Initialize each forked Celery worker process. - Initialize Sentry so task transactions, errors, and logs ship to Sentry. diff --git a/backend/app/core/middleware.py b/backend/app/core/middleware.py index 4a985e267..153a41947 100644 --- a/backend/app/core/middleware.py +++ b/backend/app/core/middleware.py @@ -8,33 +8,42 @@ logger = logging.getLogger("http_request_logger") +def _resolve_http_route(request: Request) -> str: + """ + Resolve the HTTP route for telemetry and logging. + Uses the route's path template if available, otherwise falls back to the raw path. + """ + route = request.scope.get("route") + templated = getattr(route, "path", None) + return templated or "unmatched" + + async def http_request_logger(request: Request, call_next) -> Response: start_time = time.time() method = request.method - route = request.url.path + raw_path = request.url.path - # Set request-level dimensions as early as possible. span = trace.get_current_span() if span.is_recording(): span.set_attribute("http.request.method", method) span.set_attribute("http.request_method", method) span.set_attribute("http.method", method) - span.set_attribute("http.route", route) if sentry_sdk.get_client().is_active(): sentry_sdk.set_tag("http.method", method) sentry_sdk.set_tag("http.request.method", method) - sentry_sdk.set_tag("http.route", route) try: response = await call_next(request) except Exception: - # Capture status for failed requests too so dashboards stay consistent. status = 500 + http_route = _resolve_http_route(request) if span.is_recording(): + span.set_attribute("http.route", http_route) span.set_attribute("http.status_code", status) span.set_attribute("http.response.status_code", status) if sentry_sdk.get_client().is_active(): + sentry_sdk.set_tag("http.route", http_route) sentry_sdk.set_tag("http.status_code", str(status)) sentry_sdk.set_tag("http.response.status_code", str(status)) logger.exception("Unhandled exception during request") @@ -42,22 +51,25 @@ async def http_request_logger(request: Request, call_next) -> Response: duration_ms = (time.time() - start_time) * 1000 status = response.status_code + http_route = _resolve_http_route(request) if span.is_recording(): + span.set_attribute("http.route", http_route) span.set_attribute("http.status_code", status) span.set_attribute("http.response.status_code", status) span.set_attribute("http.request.duration_ms", round(duration_ms, 2)) - logger.info(f"{method} {route} - {status} [{duration_ms:.2f}ms]") + logger.info(f"{method} {raw_path} - {status} [{duration_ms:.2f}ms]") try: if sentry_sdk.get_client().is_active(): + sentry_sdk.set_tag("http.route", http_route) sentry_sdk.set_tag("http.status_code", str(status)) sentry_sdk.set_tag("http.response.status_code", str(status)) attrs = { "http.method": method, - "http.route": route, + "http.route": http_route, "http.status_code": str(status), } sentry_sdk.metrics.count("http.server.request.count", 1, attributes=attrs) diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index a37249d6c..436b26a58 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -527,8 +527,6 @@ def execute_llm_call( ai_span_name = f"chat {model_name}" if model_name else f"chat {provider_name}" with tracer.start_as_current_span(ai_span_name) as ai_span: ai_span.set_attribute("sentry.op", "gen_ai.chat") - ai_span.set_attribute("gen_ai.request.organization_id", organization_id) - ai_span.set_attribute("gen_ai.request.project_id", project_id) if completion_type: ai_span.set_attribute("completion_type", completion_type) set_gen_ai_request_attributes( @@ -584,14 +582,6 @@ def execute_llm_call( if response: set_gen_ai_response_attributes(ai_span, response=response) - usage = response.usage - if usage: - ai_span.set_attribute("llm.usage.total_tokens", usage.total_tokens) - ai_span.set_attribute("kaapi_llm_input_tokens", usage.input_tokens) - ai_span.set_attribute( - "kaapi_llm_output_tokens", usage.output_tokens - ) - ai_span.set_attribute("kaapi_llm_total_tokens", usage.total_tokens) else: ai_span.set_status( trace.Status(trace.StatusCode.ERROR, error or "Unknown error") diff --git a/backend/app/tests/services/collections/test_create_collection.py b/backend/app/tests/services/collections/test_create_collection.py index 4d393ea1b..a2ffdfa5d 100644 --- a/backend/app/tests/services/collections/test_create_collection.py +++ b/backend/app/tests/services/collections/test_create_collection.py @@ -217,15 +217,16 @@ def test_execute_job_assistant_create_failure_marks_failed_and_deletes_collectio MockCrud.return_value.create.side_effect = Exception("DB constraint violation") task_id = str(uuid4()) - execute_job( - request=req.model_dump(), - project_id=project.id, - organization_id=project.organization_id, - task_id=task_id, - with_assistant=True, - job_id=str(job.id), - task_instance=None, - ) + with pytest.raises(Exception, match="DB constraint violation"): + execute_job( + request=req.model_dump(), + project_id=project.id, + organization_id=project.organization_id, + task_id=task_id, + with_assistant=True, + job_id=str(job.id), + task_instance=None, + ) mock_provider.delete.assert_called_once() @@ -435,15 +436,18 @@ def test_execute_job_failure_flow_callback_job_and_marks_failed( SessionCtor.return_value.__enter__.return_value = db SessionCtor.return_value.__exit__.return_value = False - execute_job( - request=sample_request.model_dump(), - project_id=project.id, - organization_id=project.organization_id, - task_id=str(task_id), - with_assistant=True, - job_id=str(job.id), - task_instance=None, - ) + with pytest.raises( + ValueError, match="Requested atleast 1 document retrieved 0" + ): + execute_job( + request=sample_request.model_dump(), + project_id=project.id, + organization_id=project.organization_id, + task_id=str(task_id), + with_assistant=True, + job_id=str(job.id), + task_instance=None, + ) updated_job = CollectionJobCrud(db, project.id).read_one(job.id) diff --git a/backend/app/tests/services/collections/test_delete_collection.py b/backend/app/tests/services/collections/test_delete_collection.py index d51545dc9..6c7bcd91b 100644 --- a/backend/app/tests/services/collections/test_delete_collection.py +++ b/backend/app/tests/services/collections/test_delete_collection.py @@ -1,6 +1,7 @@ from unittest.mock import patch, MagicMock from uuid import uuid4, UUID +import pytest from sqlmodel import Session from app.models.collection import DeletionRequest @@ -174,15 +175,16 @@ def test_execute_job_delete_failure_marks_job_failed( task_id = uuid4() req = DeletionRequest(collection_id=collection.id) - execute_job( - request=req.model_dump(mode="json"), - project_id=project.id, - organization_id=project.organization_id, - task_id=str(task_id), - job_id=str(job.id), - collection_id=str(collection.id), - task_instance=None, - ) + with pytest.raises(Exception, match="Remote deletion failed"): + execute_job( + request=req.model_dump(mode="json"), + project_id=project.id, + organization_id=project.organization_id, + task_id=str(task_id), + job_id=str(job.id), + collection_id=str(collection.id), + task_instance=None, + ) failed_job = CollectionJobCrud(db, project.id).read_one(job.id) assert failed_job.task_id == str(task_id) @@ -331,15 +333,16 @@ def test_execute_job_delete_remote_failure_with_callback_sends_failure_payload( task_id = uuid4() req = DeletionRequest(collection_id=collection.id, callback_url=callback_url) - execute_job( - request=req.model_dump(mode="json"), - project_id=project.id, - organization_id=project.organization_id, - task_id=str(task_id), - job_id=str(job.id), - collection_id=str(collection.id), - task_instance=None, - ) + with pytest.raises(Exception, match="Remote deletion failed"): + execute_job( + request=req.model_dump(mode="json"), + project_id=project.id, + organization_id=project.organization_id, + task_id=str(task_id), + job_id=str(job.id), + collection_id=str(collection.id), + task_instance=None, + ) failed_job = CollectionJobCrud(db, project.id).read_one(job.id) assert failed_job.task_id == str(task_id) @@ -365,3 +368,134 @@ def test_execute_job_delete_remote_failure_with_callback_sends_failure_payload( assert payload_arg["data"]["status"] == CollectionJobStatus.FAILED assert payload_arg["data"]["collection"]["id"] == str(collection.id) assert UUID(payload_arg["data"]["job_id"]) == job.id + + +@patch("app.services.collections.delete_collection.get_llm_provider") +def test_execute_job_local_delete_failure_after_remote_success_marks_failed( + mock_get_llm_provider: MagicMock, db +) -> None: + """ + When provider.delete() succeeds but the local CollectionCrud.delete_by_id fails: + - job should be marked FAILED with error_message set + - exception is re-raised + - provider.delete was already called (remote resource is gone) + """ + project = get_project(db) + + collection = get_vector_store_collection( + db, project, vector_store_id="vs_local_fail" + ) + + job = get_collection_job( + db, + project, + action_type=CollectionActionType.DELETE, + status=CollectionJobStatus.PENDING, + collection_id=collection.id, + ) + + mock_provider = MagicMock() + mock_provider.delete = MagicMock() + mock_get_llm_provider.return_value = mock_provider + + with patch( + "app.services.collections.delete_collection.Session" + ) as SessionCtor, patch( + "app.services.collections.delete_collection.CollectionCrud" + ) as MockCollectionCrud: + SessionCtor.return_value.__enter__.return_value = db + SessionCtor.return_value.__exit__.return_value = False + + collection_crud_instance = MockCollectionCrud.return_value + collection_crud_instance.read_one.return_value = collection + collection_crud_instance.delete_by_id.side_effect = Exception( + "Local DB delete failed" + ) + + task_id = uuid4() + req = DeletionRequest(collection_id=collection.id) + + with pytest.raises(Exception, match="Local DB delete failed"): + execute_job( + request=req.model_dump(mode="json"), + project_id=project.id, + organization_id=project.organization_id, + task_id=str(task_id), + job_id=str(job.id), + collection_id=str(collection.id), + task_instance=None, + ) + + failed_job = CollectionJobCrud(db, project.id).read_one(job.id) + assert failed_job.task_id == str(task_id) + assert failed_job.status == CollectionJobStatus.FAILED + assert ( + failed_job.error_message + and "Local DB delete failed" in failed_job.error_message + ) + + mock_provider.delete.assert_called_once_with(collection) + collection_crud_instance.delete_by_id.assert_called_once_with(collection.id) + + +@patch("app.services.collections.delete_collection.get_llm_provider") +def test_execute_job_provider_factory_failure_marks_job_failed( + mock_get_llm_provider: MagicMock, db +) -> None: + """ + When get_llm_provider itself raises (e.g. missing credentials): + - job should be marked FAILED with error_message set + - provider.delete is never called + - local collection is not deleted + """ + project = get_project(db) + + collection = get_vector_store_collection( + db, project, vector_store_id="vs_provider_fail" + ) + + job = get_collection_job( + db, + project, + action_type=CollectionActionType.DELETE, + status=CollectionJobStatus.PENDING, + collection_id=collection.id, + ) + + mock_get_llm_provider.side_effect = Exception("Provider credentials missing") + + with patch( + "app.services.collections.delete_collection.Session" + ) as SessionCtor, patch( + "app.services.collections.delete_collection.CollectionCrud" + ) as MockCollectionCrud: + SessionCtor.return_value.__enter__.return_value = db + SessionCtor.return_value.__exit__.return_value = False + + collection_crud_instance = MockCollectionCrud.return_value + collection_crud_instance.read_one.return_value = collection + + task_id = uuid4() + req = DeletionRequest(collection_id=collection.id) + + with pytest.raises(Exception, match="Provider credentials missing"): + execute_job( + request=req.model_dump(mode="json"), + project_id=project.id, + organization_id=project.organization_id, + task_id=str(task_id), + job_id=str(job.id), + collection_id=str(collection.id), + task_instance=None, + ) + + failed_job = CollectionJobCrud(db, project.id).read_one(job.id) + assert failed_job.task_id == str(task_id) + assert failed_job.status == CollectionJobStatus.FAILED + assert ( + failed_job.error_message + and "Provider credentials missing" in failed_job.error_message + ) + + collection_crud_instance.delete_by_id.assert_not_called() + mock_get_llm_provider.assert_called_once() From 43e4feb2077b82cde42d8a3f86712f18351895fc Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Tue, 21 Apr 2026 10:48:23 +0530 Subject: [PATCH 6/6] Refactor telemetry code by removing unused Celery metrics and related functions --- backend/app/core/telemetry.py | 73 ----------------------------------- 1 file changed, 73 deletions(-) diff --git a/backend/app/core/telemetry.py b/backend/app/core/telemetry.py index 3e7897a46..f267afb55 100644 --- a/backend/app/core/telemetry.py +++ b/backend/app/core/telemetry.py @@ -1,10 +1,8 @@ import json import logging -import os import time from contextlib import contextmanager from contextvars import ContextVar -from threading import Lock from typing import TYPE_CHECKING, Any, Iterator import sentry_sdk @@ -29,9 +27,6 @@ "kaapi_log_context", default=None ) -_celery_metrics_lock = Lock() -_celery_active_tasks = 0 - def _emit_sentry_metric( metric_type: str, @@ -192,74 +187,6 @@ def setup_telemetry(service_name: str | None = None) -> None: ) -def _extract_task_meta(task: object | None) -> tuple[str, str]: - task_name = getattr(task, "name", "unknown") if task is not None else "unknown" - task_queue = "unknown" - request = getattr(task, "request", None) if task is not None else None - delivery_info = getattr(request, "delivery_info", None) - if isinstance(delivery_info, dict) and delivery_info.get("routing_key"): - task_queue = str(delivery_info.get("routing_key")) - return str(task_name), task_queue - - -def _emit_celery_worker_gauges(active: int, pid: str) -> None: - pid_attrs = {"worker.pid": pid} - _emit_sentry_metric( - "gauge", "celery.worker.active", 1 if active > 0 else 0, attributes=pid_attrs - ) - _emit_sentry_metric( - "gauge", "celery.worker.idle", 0 if active > 0 else 1, attributes=pid_attrs - ) - - -def record_celery_task_started(task: object | None) -> None: - """Emit Celery task-start metrics to Sentry.""" - global _celery_active_tasks - if not settings.OTEL_ENABLED: - return - - task_name, task_queue = _extract_task_meta(task) - pid = str(os.getpid()) - attrs = {"task.name": task_name, "task.queue": task_queue, "worker.pid": pid} - - with _celery_metrics_lock: - _celery_active_tasks += 1 - active = _celery_active_tasks - - _emit_sentry_metric("count", "celery.task.total", 1, attributes=attrs) - _emit_sentry_metric("gauge", "celery.task.active", active, attributes=attrs) - _emit_celery_worker_gauges(active, pid) - - -def record_celery_task_finished(task: object | None, state: str | None) -> None: - """Emit Celery task-completion metrics to Sentry.""" - global _celery_active_tasks - if not settings.OTEL_ENABLED: - return - - task_name, task_queue = _extract_task_meta(task) - task_state = (state or "UNKNOWN").upper() - pid = str(os.getpid()) - attrs = { - "task.name": task_name, - "task.queue": task_queue, - "task.state": task_state, - "worker.pid": pid, - } - - with _celery_metrics_lock: - _celery_active_tasks = max(0, _celery_active_tasks - 1) - active = _celery_active_tasks - - if task_state == "SUCCESS": - _emit_sentry_metric("count", "celery.task.completed", 1, attributes=attrs) - elif task_state in {"FAILURE", "REVOKED"}: - _emit_sentry_metric("count", "celery.task.failed", 1, attributes=attrs) - - _emit_sentry_metric("gauge", "celery.task.active", active, attributes=attrs) - _emit_celery_worker_gauges(active, pid) - - def _llm_call_attrs( provider: str, model: str,