Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ OPENAI_API_KEY=""
KAAPI_GUARDRAILS_AUTH=""
KAAPI_GUARDRAILS_URL=""

OTEL_ENABLED=true
OTEL_SERVICE_NAME=kaapi-backend

SMTP_HOST=
SMTP_PORT=
SMTP_TLS=True
Expand Down
23 changes: 21 additions & 2 deletions backend/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
from opentelemetry import trace
from pydantic import ValidationError
from sqlmodel import Session

Expand Down Expand Up @@ -42,6 +43,19 @@ def get_db() -> Generator[Session, None, None]:
TokenDep = Annotated[str, Depends(reusable_oauth2)]


def _set_tenant_span_attributes(auth_context: AuthContext) -> None:
"""Tag the active OTel span with tenant context so traces in Sentry can be
filtered by user / org / project IDs."""
span = trace.get_current_span()
if not span.is_recording():
return
span.set_attribute("user.id", str(auth_context.user.id))
if auth_context.organization:
span.set_attribute("tenant.org_id", auth_context.organization.id)
if auth_context.project:
span.set_attribute("tenant.project_id", auth_context.project.id)


def _authenticate_with_jwt(session: Session, token: str) -> AuthContext:
"""Validate a JWT token and return the authenticated user context."""
try:
Expand Down Expand Up @@ -147,16 +161,21 @@ def get_auth_context(
if not auth_context.project.is_active:
raise HTTPException(status_code=403, detail="Inactive Project")

_set_tenant_span_attributes(auth_context)
return auth_context

# 2. Try Authorization: Bearer <token> header
if token:
return _authenticate_with_jwt(session, token)
auth_context = _authenticate_with_jwt(session, token)
_set_tenant_span_attributes(auth_context)
return auth_context

# 3. Try access_token cookie
cookie_token = request.cookies.get("access_token")
if cookie_token:
return _authenticate_with_jwt(session, cookie_token)
auth_context = _authenticate_with_jwt(session, cookie_token)
_set_tenant_span_attributes(auth_context)
return auth_context

raise HTTPException(status_code=401, detail="Invalid Authorization format")

Expand Down
149 changes: 84 additions & 65 deletions backend/app/api/routes/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from app.api.deps import SessionDep, AuthContextDep
from app.api.permissions import Permission, require_permission
from app.core.telemetry import log_context
from app.crud import (
CollectionCrud,
CollectionJobCrud,
Expand Down Expand Up @@ -89,51 +90,60 @@ def create_collection(
current_user: AuthContextDep,
request: CreationRequest,
):
if request.callback_url:
validate_callback_url(str(request.callback_url))

if request.name:
ensure_unique_name(session, current_user.project_.id, request.name)
with log_context(
tag="collection",
system="collection",
lifecycle="api.collection.create",
action="create",
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
):
if request.callback_url:
validate_callback_url(str(request.callback_url))

if request.name:
ensure_unique_name(session, current_user.project_.id, request.name)

unique_documents = list(dict.fromkeys(request.documents))

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
)
)

unique_documents = list(dict.fromkeys(request.documents))
# True if both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
create_service.start_job(
db=session,
request=request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)
Comment on lines +107 to 132
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pass the deduplicated request to the worker.

Line 107 deduplicates documents for CollectionJobCreate, but Line 127 still dispatches the original request; duplicate document IDs can make the job record report one set while the Celery worker processes another.

🐛 Proposed fix
         unique_documents = list(dict.fromkeys(request.documents))
+        normalized_request = request.model_copy(
+            update={"documents": unique_documents}
+        )
 
         collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
         collection_job = collection_job_crud.create(
@@
         create_service.start_job(
             db=session,
-            request=request,
+            request=normalized_request,
             collection_job_id=collection_job.id,
             project_id=current_user.project_.id,
             organization_id=current_user.organization_.id,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
unique_documents = list(dict.fromkeys(request.documents))
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
)
)
unique_documents = list(dict.fromkeys(request.documents))
# True if both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
create_service.start_job(
db=session,
request=request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)
unique_documents = list(dict.fromkeys(request.documents))
normalized_request = request.model_copy(
update={"documents": unique_documents}
)
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
)
)
# True if both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)
create_service.start_job(
db=session,
request=normalized_request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/collections.py` around lines 107 - 132, The job record
uses deduplicated IDs (unique_documents) but create_service.start_job is still
passed the original request, so the worker may process duplicates; update the
call to create_service.start_job to pass a request object whose documents field
is the deduplicated list (use unique_documents or set request.documents =
unique_documents before the call, or create a shallow copy of request with
documents=[str(doc_id) for doc_id in unique_documents]) so the worker receives
the same deduplicated document list used to create the CollectionJob
(references: unique_documents, CollectionJobCreate, create_service.start_job).

)

# True iff both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)

create_service.start_job(
db=session,
request=request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)

metadata = None
if not with_assistant:
metadata = {
"note": (
"This job will create a vector store only (no Assistant). "
"Assistant creation happens when both 'model' and 'instructions' are included."
)
}

return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job), metadata=metadata
)
metadata = None
if not with_assistant:
metadata = {
"note": (
"This job will create a vector store only (no Assistant). "
"Assistant creation happens when both 'model' and 'instructions' are included."
)
}

return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job),
metadata=metadata,
)


@router.delete(
Expand All @@ -149,37 +159,46 @@ def delete_collection(
collection_id: UUID = FastPath(description="Collection to delete"),
request: CallbackRequest | None = Body(default=None),
):
if request and request.callback_url:
validate_callback_url(str(request.callback_url))

_ = CollectionCrud(session, current_user.project_.id).read_one(collection_id)

deletion_request = DeletionRequest(
with log_context(
tag="collection",
system="collection",
lifecycle="api.collection.delete",
action="delete",
collection_id=collection_id,
callback_url=request.callback_url if request else None,
)
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
):
if request and request.callback_url:
validate_callback_url(str(request.callback_url))

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.DELETE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
_ = CollectionCrud(session, current_user.project_.id).read_one(collection_id)

deletion_request = DeletionRequest(
collection_id=collection_id,
callback_url=request.callback_url if request else None,
)
)

delete_service.start_job(
db=session,
request=deletion_request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
)
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.DELETE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
collection_id=collection_id,
)
)

delete_service.start_job(
db=session,
request=deletion_request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
)

return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job)
)
return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job)
)


@router.get(
Expand Down
Loading
Loading