Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""add/alter columns in collections table

Revision ID: 3389c67fdcb4
Revises: 8757b005d681
Create Date: 2025-06-20 18:08:16.585843

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes
from sqlalchemy.dialects import postgresql


# revision identifiers, used by Alembic.
revision = "3389c67fdcb4"
down_revision = "8757b005d681"
branch_labels = None
depends_on = None

collection_status_enum = postgresql.ENUM(
"processing",
"successful",
"failed",
name="collectionstatus",
create_type=False, # we create manually to avoid duplicate issues
)


def upgrade():
collection_status_enum.create(op.get_bind(), checkfirst=True)
op.add_column(
"collection", sa.Column("organization_id", sa.Integer(), nullable=False)
)
op.add_column("collection", sa.Column("project_id", sa.Integer(), nullable=True))
op.add_column(
"collection",
sa.Column(
"status",
collection_status_enum,
nullable=False,
server_default="processing",
),
)
op.add_column("collection", sa.Column("updated_at", sa.DateTime(), nullable=False))
op.alter_column(
"collection", "llm_service_id", existing_type=sa.VARCHAR(), nullable=True
)
op.alter_column(
"collection", "llm_service_name", existing_type=sa.VARCHAR(), nullable=True
)
op.create_foreign_key(
None,
"collection",
"organization",
["organization_id"],
["id"],
ondelete="CASCADE",
)
op.create_foreign_key(
None, "collection", "project", ["project_id"], ["id"], ondelete="CASCADE"
)


def downgrade():
op.drop_constraint(None, "collection", type_="foreignkey")
op.drop_constraint(None, "collection", type_="foreignkey")
op.alter_column(
"collection", "llm_service_name", existing_type=sa.VARCHAR(), nullable=False
)
op.alter_column(
"collection", "llm_service_id", existing_type=sa.VARCHAR(), nullable=False
)
op.drop_column("collection", "updated_at")
op.drop_column("collection", "status")
op.drop_column("collection", "project_id")
op.drop_column("collection", "organization_id")
24 changes: 24 additions & 0 deletions backend/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,30 @@ def get_current_user_org(
CurrentUserOrg = Annotated[UserOrganization, Depends(get_current_user_org)]


def get_current_user_org_project(
current_user: CurrentUser, session: SessionDep, request: Request
) -> UserProjectOrg:
api_key = request.headers.get("X-API-KEY")
organization_id = None
project_id = None

if api_key:
api_key_record = get_api_key_by_value(session, api_key)
if api_key_record:
validate_organization(session, api_key_record.organization_id)
organization_id = api_key_record.organization_id
project_id = api_key_record.project_id

return UserProjectOrg(
**current_user.model_dump(),
organization_id=organization_id,
project_id=project_id,
)


CurrentUserOrgProject = Annotated[UserProjectOrg, Depends(get_current_user_org_project)]


def get_current_active_superuser(current_user: CurrentUser) -> User:
if not current_user.is_superuser:
raise HTTPException(
Expand Down
122 changes: 76 additions & 46 deletions backend/app/api/routes/collections.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import logging
import time
import warnings
from uuid import UUID, uuid4
from typing import Any, List, Optional
Expand All @@ -11,13 +12,14 @@
from pydantic import BaseModel, Field, HttpUrl
from sqlalchemy.exc import NoResultFound, MultipleResultsFound, SQLAlchemyError

from app.api.deps import CurrentUser, SessionDep
from app.api.deps import CurrentUser, SessionDep, CurrentUserOrgProject
from app.core.cloud import AmazonCloudStorage
from app.core.config import settings
from app.core.util import now, raise_from_unknown, post_callback
from app.crud import DocumentCrud, CollectionCrud, DocumentCollectionCrud
from app.crud.rag import OpenAIVectorStoreCrud, OpenAIAssistantCrud
from app.models import Collection, Document
from app.models.collection import CollectionStatus
from app.utils import APIResponse, load_description

router = APIRouter(prefix="/collections", tags=["collections"])
Expand Down Expand Up @@ -173,61 +175,77 @@ def do_create_collection(
request: CreationRequest,
payload: ResponsePayload,
):
start_time = time.time()
client = OpenAI(api_key=settings.OPENAI_API_KEY)
if request.callback_url is None:
callback = SilentCallback(payload)
else:
callback = WebHookCallback(request.callback_url, payload)

#
# Create the assistant and vector store
#

vector_store_crud = OpenAIVectorStoreCrud(client)
try:
vector_store = vector_store_crud.create()
except OpenAIError as err:
callback.fail(str(err))
return
callback = (
SilentCallback(payload)
if request.callback_url is None
else WebHookCallback(request.callback_url, payload)
)

storage = AmazonCloudStorage(current_user)
document_crud = DocumentCrud(session, current_user.id)
assistant_crud = OpenAIAssistantCrud(client)
vector_store_crud = OpenAIVectorStoreCrud(client)
collection_crud = CollectionCrud(session, current_user.id)

docs = request(document_crud)
kwargs = dict(request.extract_super_type(AssistantOptions))
try:
updates = vector_store_crud.update(vector_store.id, storage, docs)
documents = list(updates)
assistant = assistant_crud.create(vector_store.id, **kwargs)
except Exception as err: # blanket to handle SQL and OpenAI errors
logging.error(f"File Search setup error: {err} ({type(err).__name__})")
vector_store_crud.delete(vector_store.id)
callback.fail(str(err))
return
vector_store = vector_store_crud.create()

#
# Store the results
#
docs = list(request(document_crud))
flat_docs = [doc for sublist in docs for doc in sublist]

collection_crud = CollectionCrud(session, current_user.id)
collection = Collection(
id=UUID(payload.key),
llm_service_id=assistant.id,
llm_service_name=request.model,
)
try:
collection_crud.create(collection, documents)
except SQLAlchemyError as err:
_backout(assistant_crud, assistant.id)
callback.fail(str(err))
return
file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname}
file_sizes_kb = [
storage.get_file_size_kb(doc.object_store_url) for doc in flat_docs
]

logging.info(
f"[VectorStore Update] Uploading {len(flat_docs)} documents to vector store {vector_store.id}"
)
list(vector_store_crud.update(vector_store.id, storage, docs))
logging.info(f"[VectorStore Upload] Upload completed")

assistant_options = dict(request.extract_super_type(AssistantOptions))
logging.info(
f"[Assistant Create] Creating assistant with options: {assistant_options}"
)
assistant = assistant_crud.create(vector_store.id, **assistant_options)
logging.info(f"[Assistant Create] Assistant created: {assistant.id}")

collection = collection_crud.read_one(UUID(payload.key))
collection.llm_service_id = assistant.id
collection.llm_service_name = request.model
collection.status = CollectionStatus.successful
collection.updated_at = now()

if flat_docs:
logging.info(
f"[DocumentCollection] Linking {len(flat_docs)} documents to collection {collection.id}"
)
DocumentCollectionCrud(session).create(collection, flat_docs)

collection_crud._update(collection)

#
# Send back successful response
#
elapsed = time.time() - start_time
logging.info(
f"Collection created: {collection.id} | Time: {elapsed:.2f}s | "
f"Files: {len(flat_docs)} | Sizes: {file_sizes_kb} KB | Types: {list(file_exts)}"
)
callback.success(collection.model_dump(mode="json"))

callback.success(collection.model_dump(mode="json"))
except Exception as err:
logging.error(f"[Collection Creation Failed] {err} ({type(err).__name__})")
if "assistant" in locals():
_backout(assistant_crud, assistant.id)
try:
collection = collection_crud.read_one(UUID(payload.key))
collection.status = CollectionStatus.failed
collection.updated_at = now()
collection_crud._update(collection)
except Exception as suberr:
logging.warning(f"[Collection Status Update Failed] {suberr}")
callback.fail(str(err))


@router.post(
Expand All @@ -236,14 +254,26 @@ def do_create_collection(
)
def create_collection(
session: SessionDep,
current_user: CurrentUser,
current_user: CurrentUserOrgProject,
request: CreationRequest,
background_tasks: BackgroundTasks,
):
this = inspect.currentframe()
route = router.url_path_for(this.f_code.co_name)
payload = ResponsePayload("processing", route)

collection = Collection(
id=UUID(payload.key),
owner_id=current_user.id,
organization_id=current_user.organization_id,
project_id=current_user.project_id,
status=CollectionStatus.processing,
)

collection_crud = CollectionCrud(session, current_user.id)
collection_crud.create(collection)

# 2. Launch background task
background_tasks.add_task(
do_create_collection,
session,
Expand Down
8 changes: 7 additions & 1 deletion backend/app/core/cloud/storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os

# import logging
import functools as ft
from pathlib import Path
from dataclasses import dataclass, asdict
Expand Down Expand Up @@ -125,6 +124,13 @@ def stream(self, url: str) -> StreamingBody:
except ClientError as err:
raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err

def get_file_size_kb(self, url: str) -> float:
name = SimpleStorageName.from_url(url)
kwargs = asdict(name)
response = self.aws.client.head_object(**kwargs)
size_bytes = response["ContentLength"]
return round(size_bytes / 1024, 2)

def delete(self, url: str) -> None:
name = SimpleStorageName.from_url(url)
kwargs = asdict(name)
Expand Down
28 changes: 20 additions & 8 deletions backend/app/crud/collection.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import functools as ft
from uuid import UUID
from typing import Optional

import logging
from sqlmodel import Session, func, select, and_

from app.models import Document, Collection, DocumentCollection
from app.core.util import now
from app.models.collection import CollectionStatus

from .document_collection import DocumentCollectionCrud

Expand Down Expand Up @@ -43,13 +44,24 @@ def _exists(self, collection: Collection):

return bool(present)

def create(self, collection: Collection, documents: list[Document]):
if self._exists(collection):
raise FileExistsError("Collection already present")

collection = self._update(collection)
dc_crud = DocumentCollectionCrud(self.session)
dc_crud.create(collection, documents)
def create(
self,
collection: Collection,
documents: Optional[list[Document]] = None,
):
try:
existing = self.read_one(collection.id)
if existing.status == CollectionStatus.failed:
self._update(collection)
else:
raise FileExistsError("Collection already present")
except:
self.session.add(collection)
self.session.commit()

if documents:
dc_crud = DocumentCollectionCrud(self.session)
dc_crud.create(collection, documents)

return collection

Expand Down
Loading