diff --git a/backend/app/api/routes/api_keys.py b/backend/app/api/routes/api_keys.py index 1afecfc7..125df075 100644 --- a/backend/app/api/routes/api_keys.py +++ b/backend/app/api/routes/api_keys.py @@ -1,4 +1,4 @@ -import uuid +import logging from fastapi import APIRouter, Depends, HTTPException from sqlmodel import Session from app.api.deps import get_db, get_current_active_superuser @@ -14,6 +14,7 @@ from app.utils import APIResponse from app.core.exception_handlers import HTTPException +logger = logging.getLogger(__name__) router = APIRouter(prefix="/apikeys", tags=["API Keys"]) @@ -27,17 +28,18 @@ def create_key( """ Generate a new API key for the user's organization. """ - # Validate organization project = validate_project(session, project_id) existing_api_key = get_api_key_by_project_user(session, project_id, user_id) if existing_api_key: + logger.warning( + f"[create_key] API key already exists | project_id={project_id}, user_id={user_id}" + ) raise HTTPException( status_code=400, detail="API Key already exists for this user and project.", ) - # Create and return API key api_key = create_api_key( session, organization_id=project.organization_id, @@ -57,21 +59,18 @@ def list_keys( Retrieve all API keys for the given project. Superusers get all keys; regular users get only their own. """ - # Validate project project = validate_project(session=session, project_id=project_id) if current_user.is_superuser: - # Superuser: fetch all API keys for the project api_keys = get_api_keys_by_project(session=session, project_id=project_id) else: - # Regular user: fetch only their own API key user_api_key = get_api_key_by_project_user( session=session, project_id=project_id, user_id=current_user.id ) api_keys = [user_api_key] if user_api_key else [] - # Raise an exception if no API keys are found for the project if not api_keys: + logger.warning(f"[list_keys] No API keys found | project_id={project_id}") raise HTTPException( status_code=404, detail="No API keys found for this project.", @@ -91,6 +90,7 @@ def get_key( """ api_key = get_api_key(session, api_key_id) if not api_key: + logger.warning(f"[get_key] API key not found | api_key_id={api_key_id}") raise HTTPException(404, "API Key does not exist") return APIResponse.success_response(api_key) @@ -106,10 +106,11 @@ def revoke_key( Soft delete an API key (revoke access). """ api_key = get_api_key(session, api_key_id) - if not api_key: + logger.warning( + f"[apikey.revoke] API key not found or already deleted | api_key_id={api_key_id}" + ) raise HTTPException(404, "API key not found or already deleted") delete_api_key(session, api_key_id) - return APIResponse.success_response({"message": "API key revoked successfully"}) diff --git a/backend/app/api/routes/collections.py b/backend/app/api/routes/collections.py index 7c903930..68e4b6a8 100644 --- a/backend/app/api/routes/collections.py +++ b/backend/app/api/routes/collections.py @@ -22,6 +22,7 @@ from app.models.collection import CollectionStatus from app.utils import APIResponse, load_description +logger = logging.getLogger(__name__) router = APIRouter(prefix="/collections", tags=["collections"]) @@ -59,6 +60,9 @@ def model_post_init(self, __context: Any): self.documents = list(set(self.documents)) def __call__(self, crud: DocumentCrud): + logger.info( + f"[DocumentOptions.call] Starting batch iteration for documents | {{'batch_size': {self.batch_size}, 'total_documents': {len(self.documents)}}}" + ) (start, stop) = (0, self.batch_size) while True: view = self.documents[start:stop] @@ -130,9 +134,11 @@ def success(self, body): class SilentCallback(CallbackHandler): def fail(self, body): + logger.info(f"[SilentCallback.fail] Silent callback failure") return def success(self, body): + logger.info(f"[SilentCallback.success] Silent callback success") return @@ -140,32 +146,35 @@ class WebHookCallback(CallbackHandler): def __init__(self, url: HttpUrl, payload: ResponsePayload): super().__init__(payload) self.url = url + logger.info( + f"[WebHookCallback.init] Initialized webhook callback | {{'url': '{url}'}}" + ) def __call__(self, response: APIResponse, status: str): time = ResponsePayload.now() payload = replace(self.payload, status=status, time=time) response.metadata = asdict(payload) - + logger.info( + f"[WebHookCallback.call] Posting callback | {{'url': '{self.url}', 'status': '{status}'}}" + ) post_callback(self.url, response) def fail(self, body): + logger.warning(f"[WebHookCallback.fail] Callback failed | {{'body': '{body}'}}") self(APIResponse.failure_response(body), "incomplete") def success(self, body): + logger.info(f"[WebHookCallback.success] Callback succeeded") self(APIResponse.success_response(body), "complete") def _backout(crud: OpenAIAssistantCrud, assistant_id: str): try: crud.delete(assistant_id) - except OpenAIError: - warnings.warn( - ": ".join( - [ - f"Unable to remove assistant {assistant_id}", - "platform DB may be out of sync with OpenAI", - ] - ) + except OpenAIError as err: + logger.error( + f"[backout] Failed to delete assistant | {{'assistant_id': '{assistant_id}', 'error': '{str(err)}'}}", + exc_info=True, ) @@ -200,18 +209,10 @@ def do_create_collection( storage.get_file_size_kb(doc.object_store_url) for doc in flat_docs ] - logging.info( - f"[VectorStore Update] Uploading {len(flat_docs)} documents to vector store {vector_store.id}" - ) list(vector_store_crud.update(vector_store.id, storage, docs)) - logging.info(f"[VectorStore Upload] Upload completed") assistant_options = dict(request.extract_super_type(AssistantOptions)) - logging.info( - f"[Assistant Create] Creating assistant with options: {assistant_options}" - ) assistant = assistant_crud.create(vector_store.id, **assistant_options) - logging.info(f"[Assistant Create] Assistant created: {assistant.id}") collection = collection_crud.read_one(UUID(payload.key)) collection.llm_service_id = assistant.id @@ -220,22 +221,22 @@ def do_create_collection( collection.updated_at = now() if flat_docs: - logging.info( - f"[DocumentCollection] Linking {len(flat_docs)} documents to collection {collection.id}" - ) DocumentCollectionCrud(session).create(collection, flat_docs) collection_crud._update(collection) elapsed = time.time() - start_time logging.info( - f"Collection created: {collection.id} | Time: {elapsed:.2f}s | " + f"[do_create_collection] Collection created: {collection.id} | Time: {elapsed:.2f}s | " f"Files: {len(flat_docs)} | Sizes: {file_sizes_kb} KB | Types: {list(file_exts)}" ) callback.success(collection.model_dump(mode="json")) except Exception as err: - logging.error(f"[Collection Creation Failed] {err} ({type(err).__name__})") + logger.error( + f"[do_create_collection] Collection Creation Failed | {{'collection_id': '{payload.key}', 'error': '{str(err)}'}}", + exc_info=True, + ) if "assistant" in locals(): _backout(assistant_crud, assistant.id) try: @@ -244,7 +245,9 @@ def do_create_collection( collection.updated_at = now() collection_crud._update(collection) except Exception as suberr: - logging.warning(f"[Collection Status Update Failed] {suberr}") + logger.warning( + f"[do_create_collection] Failed to update collection status | {{'collection_id': '{payload.key}', 'reason': '{str(suberr)}'}}" + ) callback.fail(str(err)) @@ -282,6 +285,10 @@ def create_collection( payload, ) + logger.info( + f"[create_collection] Background task for collection creation scheduled | " + f"{{'collection_id': '{collection.id}'}}" + ) return APIResponse.success_response(data=None, metadata=asdict(payload)) @@ -301,12 +308,20 @@ def do_delete_collection( collection = collection_crud.read_one(request.collection_id) assistant = OpenAIAssistantCrud() data = collection_crud.delete(collection, assistant) + logger.info( + f"[do_delete_collection] Collection deleted successfully | {{'collection_id': '{collection.id}'}}" + ) callback.success(data.model_dump(mode="json")) except (ValueError, PermissionError, SQLAlchemyError) as err: + logger.error( + f"[do_delete_collection] Failed to delete collection | {{'collection_id': '{request.collection_id}', 'error': '{str(err)}'}}", + exc_info=True, + ) callback.fail(str(err)) except Exception as err: - warnings.warn( - 'Unexpected exception "{}": {}'.format(type(err).__name__, err), + logger.error( + f"[do_delete_collection] Unexpected error during deletion | {{'collection_id': '{request.collection_id}', 'error': '{str(err)}', 'error_type': '{type(err).__name__}'}}", + exc_info=True, ) callback.fail(str(err)) @@ -333,6 +348,10 @@ def delete_collection( payload, ) + logger.info( + f"[delete_collection] Background task for deletion scheduled | " + f"{{'collection_id': '{request.collection_id}'}}" + ) return APIResponse.success_response(data=None, metadata=asdict(payload)) diff --git a/backend/app/api/routes/documents.py b/backend/app/api/routes/documents.py index 9e6d1c2e..03d2cd98 100644 --- a/backend/app/api/routes/documents.py +++ b/backend/app/api/routes/documents.py @@ -1,3 +1,4 @@ +import logging from uuid import UUID, uuid4 from typing import List from pathlib import Path @@ -12,6 +13,7 @@ from app.core.cloud import AmazonCloudStorage from app.crud.rag import OpenAIAssistantCrud +logger = logging.getLogger(__name__) router = APIRouter(prefix="/documents", tags=["documents"]) @@ -43,6 +45,7 @@ def upload_doc( ): storage = AmazonCloudStorage(current_user) document_id = uuid4() + object_store_url = storage.put(src, Path(str(document_id))) crud = DocumentCrud(session, current_user.id) @@ -92,6 +95,7 @@ def permanent_delete_doc( document = d_crud.read_one(doc_id) c_crud.delete(document, a_crud) + storage.delete(document.object_store_url) d_crud.delete(doc_id) diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index ed098af6..e7832cd5 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -163,7 +163,8 @@ def process_response( except openai.OpenAIError as e: error_message = handle_openai_error(e) logger.error( - f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}, organization_id={organization_id}" + f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}, organization_id={organization_id}", + exc_info=True, ) callback_response = ResponsesAPIResponse.failure_response(error=error_message) @@ -196,8 +197,8 @@ async def responses( _session, request.assistant_id, _current_user.organization_id ) if not assistant: - logger.error( - f"Assistant not found: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" + logger.warning( + f"Assistant not found: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}", ) raise HTTPException( status_code=404, @@ -211,7 +212,7 @@ async def responses( project_id=request.project_id, ) if not credentials or "api_key" not in credentials: - logger.error( + logger.warning( f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" ) return { diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 2019d0e8..6ef88d4d 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -43,7 +43,7 @@ def send_callback(callback_url: str, data: dict): response.raise_for_status() return True except requests.RequestException as e: - logger.error(f"Callback failed: {str(e)}") + logger.error(f"Callback failed: {str(e)}", exc_info=True) return False diff --git a/backend/app/core/cloud/storage.py b/backend/app/core/cloud/storage.py index 0e8eaba4..0c225d91 100644 --- a/backend/app/core/cloud/storage.py +++ b/backend/app/core/cloud/storage.py @@ -1,5 +1,5 @@ import os - +import logging import functools as ft from pathlib import Path from dataclasses import dataclass, asdict @@ -12,6 +12,9 @@ from app.api.deps import CurrentUser from app.core.config import settings +from app.utils import mask_string + +logger = logging.getLogger(__name__) class CloudStorageError(Exception): @@ -31,25 +34,50 @@ def client(self): for i, j in cred_params: kwargs[i] = os.environ.get(j, getattr(settings, j)) - return boto3.client("s3", **kwargs) + client = boto3.client("s3", **kwargs) + return client def create(self): try: - # does the bucket exist... self.client.head_bucket(Bucket=settings.AWS_S3_BUCKET) except ValueError as err: + logger.error( + f"[AmazonCloudStorageClient.create] Invalid bucket configuration | " + f"{{'bucket': '{mask_string(settings.AWS_S3_BUCKET)}', 'error': '{str(err)}'}}", + exc_info=True, + ) raise CloudStorageError(err) from err except ClientError as err: response = int(err.response["Error"]["Code"]) if response != 404: + logger.error( + f"[AmazonCloudStorageClient.create] Unexpected AWS error | " + f"{{'bucket': '{mask_string(settings.AWS_S3_BUCKET)}', 'error': '{str(err)}', 'code': {response}}}", + exc_info=True, + ) raise CloudStorageError(err) from err - # ... if not create it - self.client.create_bucket( - Bucket=settings.AWS_S3_BUCKET, - CreateBucketConfiguration={ - "LocationConstraint": settings.AWS_DEFAULT_REGION, - }, + logger.warning( + f"[AmazonCloudStorageClient.create] Bucket not found, creating | " + f"{{'bucket': '{mask_string(settings.AWS_S3_BUCKET)}'}}" ) + try: + self.client.create_bucket( + Bucket=settings.AWS_S3_BUCKET, + CreateBucketConfiguration={ + "LocationConstraint": settings.AWS_DEFAULT_REGION, + }, + ) + logger.info( + f"[AmazonCloudStorageClient.create] Bucket created successfully | " + f"{{'bucket': '{mask_string(settings.AWS_S3_BUCKET)}'}}" + ) + except ClientError as create_err: + logger.error( + f"[AmazonCloudStorageClient.create] Failed to create bucket | " + f"{{'bucket': '{mask_string(settings.AWS_S3_BUCKET)}', 'error': '{str(create_err)}'}}", + exc_info=True, + ) + raise CloudStorageError(create_err) from create_err @dataclass(frozen=True) @@ -68,7 +96,6 @@ def to_url(self): } for k in ParseResult._fields: kwargs.setdefault(k) - return ParseResult(**kwargs) @classmethod @@ -77,7 +104,6 @@ def from_url(cls, url: str): path = Path(url.path) if path.is_absolute(): path = path.relative_to(path.root) - return cls(Bucket=url.netloc, Key=str(path)) @@ -100,18 +126,26 @@ def __init__(self, user: CurrentUser): def put(self, source: UploadFile, basename: Path) -> SimpleStorageName: key = Path(str(self.user.id), basename) destination = SimpleStorageName(str(key)) - kwargs = asdict(destination) + try: self.aws.client.upload_fileobj( source.file, ExtraArgs={ - # 'Metadata': self.user.model_dump(), "ContentType": source.content_type, }, **kwargs, ) + logger.info( + f"[AmazonCloudStorage.put] File uploaded successfully | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(destination.Bucket)}', 'key': '{mask_string(destination.Key)}'}}" + ) except ClientError as err: + logger.error( + f"[AmazonCloudStorage.put] AWS upload error | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(destination.Bucket)}', 'key': '{mask_string(destination.Key)}', 'error': '{str(err)}'}}", + exc_info=True, + ) raise CloudStorageError(f'AWS Error: "{err}"') from err return destination @@ -120,21 +154,53 @@ def stream(self, url: str) -> StreamingBody: name = SimpleStorageName.from_url(url) kwargs = asdict(name) try: - return self.aws.client.get_object(**kwargs).get("Body") + body = self.aws.client.get_object(**kwargs).get("Body") + logger.info( + f"[AmazonCloudStorage.stream] File streamed successfully | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}'}}" + ) + return body except ClientError as err: + logger.error( + f"[AmazonCloudStorage.stream] AWS stream error | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'error': '{str(err)}'}}", + exc_info=True, + ) raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err def get_file_size_kb(self, url: str) -> float: name = SimpleStorageName.from_url(url) kwargs = asdict(name) - response = self.aws.client.head_object(**kwargs) - size_bytes = response["ContentLength"] - return round(size_bytes / 1024, 2) + try: + response = self.aws.client.head_object(**kwargs) + size_bytes = response["ContentLength"] + size_kb = round(size_bytes / 1024, 2) + logger.info( + f"[AmazonCloudStorage.get_file_size_kb] File size retrieved successfully | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'size_kb': {size_kb}}}" + ) + return size_kb + except ClientError as err: + logger.error( + f"[AmazonCloudStorage.get_file_size_kb] AWS head object error | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'error': '{str(err)}'}}", + exc_info=True, + ) + raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err def delete(self, url: str) -> None: name = SimpleStorageName.from_url(url) kwargs = asdict(name) try: self.aws.client.delete_object(**kwargs) + logger.info( + f"[AmazonCloudStorage.delete] File deleted successfully | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}'}}" + ) except ClientError as err: + logger.error( + f"[AmazonCloudStorage.delete] AWS delete error | " + f"{{'user_id': '{self.user.id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'error': '{str(err)}'}}", + exc_info=True, + ) raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err diff --git a/backend/app/core/logger.py b/backend/app/core/logger.py index b51d207f..1ee17613 100644 --- a/backend/app/core/logger.py +++ b/backend/app/core/logger.py @@ -4,18 +4,28 @@ from app.core.config import settings LOG_DIR = settings.LOG_DIR -if not os.path.exists(LOG_DIR): - os.makedirs(LOG_DIR) +os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE_PATH = os.path.join(LOG_DIR, "app.log") LOGGING_LEVEL = logging.INFO -LOGGING_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +LOGGING_FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" -logging.basicConfig(level=LOGGING_LEVEL, format=LOGGING_FORMAT) +# Create root logger +logger = logging.getLogger() +logger.setLevel(LOGGING_LEVEL) -file_handler = RotatingFileHandler(LOG_FILE_PATH, maxBytes=10485760, backupCount=5) -file_handler.setLevel(LOGGING_LEVEL) -file_handler.setFormatter(logging.Formatter(LOGGING_FORMAT)) +# Formatter +formatter = logging.Formatter(LOGGING_FORMAT) -logging.getLogger("").addHandler(file_handler) +# Stream handler (console) +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(formatter) +logger.addHandler(stream_handler) + +# Rotating file handler +file_handler = RotatingFileHandler( + LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5 +) +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) diff --git a/backend/app/core/middleware.py b/backend/app/core/middleware.py new file mode 100644 index 00000000..4ad3d12e --- /dev/null +++ b/backend/app/core/middleware.py @@ -0,0 +1,22 @@ +import logging +import time +from fastapi import Request, Response + +logger = logging.getLogger("http_request_logger") + + +async def http_request_logger(request: Request, call_next) -> Response: + start_time = time.time() + try: + response = await call_next(request) + except Exception as e: + logger.exception("Unhandled exception during request") + raise + + process_time = (time.time() - start_time) * 1000 # ms + client_ip = request.client.host if request.client else "unknown" + + logger.info( + f"{request.method} {request.url.path} - {response.status_code} [{process_time:.2f}ms]" + ) + return response diff --git a/backend/app/crud/api_key.py b/backend/app/crud/api_key.py index e787514f..56fa3045 100644 --- a/backend/app/crud/api_key.py +++ b/backend/app/crud/api_key.py @@ -1,5 +1,6 @@ import uuid import secrets +import logging from sqlmodel import Session, select from app.core.security import ( get_password_hash, @@ -11,6 +12,8 @@ from app.core.exception_handlers import HTTPException from app.models.api_key import APIKey, APIKeyPublic +logger = logging.getLogger(__name__) + def generate_api_key() -> tuple[str, str]: """Generate a new API key and its hash.""" @@ -48,6 +51,9 @@ def create_api_key( api_key_dict = api_key.model_dump() api_key_dict["key"] = raw_key # Return the raw key to the user + logger.info( + f"[create_api_key] API key creation completed | {{'api_key_id': {api_key.id}, 'user_id': {user_id}, 'project_id': {project_id}}}" + ) return APIKeyPublic.model_validate(api_key_dict) @@ -66,8 +72,9 @@ def get_api_key(session: Session, api_key_id: int) -> APIKeyPublic | None: # Decrypt the key decrypted_key = decrypt_api_key(api_key.key) api_key_dict["key"] = decrypted_key - return APIKeyPublic.model_validate(api_key_dict) + + logger.warning(f"[get_api_key] API key not found | {{'api_key_id': {api_key_id}}}") return None @@ -77,12 +84,21 @@ def delete_api_key(session: Session, api_key_id: int) -> None: """ api_key = session.get(APIKey, api_key_id) + if not api_key: + logger.warning( + f"[delete_api_key] API key not found | {{'api_key_id': {api_key_id}}}" + ) + return + api_key.is_deleted = True api_key.deleted_at = now() api_key.updated_at = now() session.add(api_key) session.commit() + logger.info( + f"[delete_api_key] API key soft deleted successfully | {{'api_key_id': {api_key_id}}}" + ) def get_api_key_by_value(session: Session, api_key_value: str) -> APIKeyPublic | None: @@ -97,10 +113,12 @@ def get_api_key_by_value(session: Session, api_key_value: str) -> APIKeyPublic | decrypted_key = decrypt_api_key(api_key.key) if api_key_value == decrypted_key: api_key_dict = api_key.model_dump() - api_key_dict["key"] = decrypted_key - return APIKeyPublic.model_validate(api_key_dict) + + logger.warning( + f"[get_api_key_by_value] API key not found | {{'action': 'not_found'}}" + ) return None @@ -122,6 +140,9 @@ def get_api_key_by_project_user( api_key_dict["key"] = decrypt_api_key(api_key.key) return APIKeyPublic.model_validate(api_key_dict) + logger.warning( + f"[get_api_key_by_project_user] API key not found | {{'project_id': {project_id}, 'user_id': '{user_id}'}}" + ) return None diff --git a/backend/app/crud/collection.py b/backend/app/crud/collection.py index fe3e1b03..b08ddcf5 100644 --- a/backend/app/crud/collection.py +++ b/backend/app/crud/collection.py @@ -1,3 +1,4 @@ +import logging import functools as ft from uuid import UUID from typing import Optional @@ -10,6 +11,8 @@ from .document_collection import DocumentCollectionCrud +logger = logging.getLogger(__name__) + class CollectionCrud: def __init__(self, session: Session, owner_id: int): @@ -24,11 +27,21 @@ def _update(self, collection: Collection): self.owner_id, collection.owner_id, ) - raise PermissionError(err) + try: + raise PermissionError(err) + except PermissionError as e: + logger.error( + f"[CollectionCrud._update] Permission error | {{'collection_id': '{collection.id}', 'error': '{str(e)}'}}", + exc_info=True, + ) + raise self.session.add(collection) self.session.commit() self.session.refresh(collection) + logger.info( + f"[CollectionCrud._update] Collection updated successfully | {{'collection_id': '{collection.id}'}}" + ) return collection @@ -41,6 +54,9 @@ def _exists(self, collection: Collection): ) .scalar() ) + logger.info( + f"[CollectionCrud._exists] Existence check completed | {{'llm_service_id': '{collection.llm_service_id}', 'exists': {bool(present)}}}" + ) return bool(present) @@ -73,7 +89,8 @@ def read_one(self, collection_id: UUID): ) ) - return self.session.exec(statement).one() + collection = self.session.exec(statement).one() + return collection def read_all(self): statement = select(Collection).where( @@ -83,17 +100,29 @@ def read_all(self): ) ) - return self.session.exec(statement).all() + collections = self.session.exec(statement).all() + return collections @ft.singledispatchmethod def delete(self, model, remote): # remote should be an OpenAICrud - raise TypeError(type(model)) + try: + raise TypeError(type(model)) + except TypeError as err: + logger.error( + f"[CollectionCrud.delete] Invalid model type | {{'model_type': '{type(model).__name__}'}}", + exc_info=True, + ) + raise @delete.register def _(self, model: Collection, remote): remote.delete(model.llm_service_id) model.deleted_at = now() - return self._update(model) + collection = self._update(model) + logger.info( + f"[CollectionCrud.delete] Collection deleted successfully | {{'collection_id': '{model.id}'}}" + ) + return collection @delete.register def _(self, model: Document, remote): @@ -110,5 +139,8 @@ def _(self, model: Document, remote): for c in self.session.execute(statement): self.delete(c.Collection, remote) self.session.refresh(model) + logger.info( + f"[CollectionCrud.delete] Document deletion from collections completed | {{'document_id': '{model.id}'}}" + ) return model diff --git a/backend/app/crud/document.py b/backend/app/crud/document.py index ecb81890..f2f2e3a0 100644 --- a/backend/app/crud/document.py +++ b/backend/app/crud/document.py @@ -1,3 +1,4 @@ +import logging from uuid import UUID from typing import Optional, List @@ -7,6 +8,8 @@ from app.core.util import now from app.core.exception_handlers import HTTPException +logger = logging.getLogger(__name__) + class DocumentCrud: def __init__(self, session: Session, owner_id: int): @@ -23,6 +26,9 @@ def read_one(self, doc_id: UUID): result = self.session.exec(statement).one_or_none() if result is None: + logger.warning( + f"[DocumentCrud.read_one] Document not found | {{'doc_id': '{doc_id}', 'owner_id': {self.owner_id}}}" + ) raise HTTPException(status_code=404, detail="Document not found") return result @@ -38,16 +44,33 @@ def read_many( Document.deleted_at.is_(None), ) ) + if skip is not None: if skip < 0: - raise ValueError(f"Negative skip: {skip}") + try: + raise ValueError(f"Negative skip: {skip}") + except ValueError as err: + logger.error( + f"[DocumentCrud.read_many] Invalid skip value | {{'owner_id': {self.owner_id}, 'skip': {skip}, 'error': '{str(err)}'}}", + exc_info=True, + ) + raise statement = statement.offset(skip) + if limit is not None: if limit < 0: - raise ValueError(f"Negative limit: {limit}") + try: + raise ValueError(f"Negative limit: {limit}") + except ValueError as err: + logger.error( + f"[DocumentCrud.read_many] Invalid limit value | {{'owner_id': {self.owner_id}, 'limit': {limit}, 'error': '{str(err)}'}}", + exc_info=True, + ) + raise statement = statement.limit(limit) - return self.session.exec(statement).all() + documents = self.session.exec(statement).all() + return documents def read_each(self, doc_ids: List[UUID]): statement = select(Document).where( @@ -60,7 +83,14 @@ def read_each(self, doc_ids: List[UUID]): (m, n) = map(len, (results, doc_ids)) if m != n: - raise ValueError(f"Requested {n} retrieved {m}") + try: + raise ValueError(f"Requested {n} retrieved {m}") + except ValueError as err: + logger.error( + f"[DocumentCrud.read_each] Mismatch in retrieved documents | {{'owner_id': {self.owner_id}, 'requested_count': {n}, 'retrieved_count': {m}}}", + exc_info=True, + ) + raise return results @@ -72,13 +102,22 @@ def update(self, document: Document): self.owner_id, document.owner_id, ) - raise PermissionError(error) - + try: + raise PermissionError(error) + except PermissionError as err: + logger.error( + f"[DocumentCrud.update] Permission error | {{'doc_id': '{document.id}', 'error': '{str(err)}'}}", + exc_info=True, + ) + raise document.updated_at = now() self.session.add(document) self.session.commit() self.session.refresh(document) + logger.info( + f"[DocumentCrud.update] Document updated successfully | {{'doc_id': '{document.id}', 'owner_id': {self.owner_id}}}" + ) return document @@ -87,4 +126,8 @@ def delete(self, doc_id: UUID): document.deleted_at = now() document.updated_at = now() - return self.update(document) + updated_document = self.update(document) + logger.info( + f"[DocumentCrud.delete] Document deleted successfully | {{'doc_id': '{doc_id}', 'owner_id': {self.owner_id}}}" + ) + return updated_document diff --git a/backend/app/crud/rag/open_ai.py b/backend/app/crud/rag/open_ai.py index ae2f58c7..69479888 100644 --- a/backend/app/crud/rag/open_ai.py +++ b/backend/app/crud/rag/open_ai.py @@ -1,6 +1,5 @@ import json import logging -import warnings import functools as ft from typing import Iterable @@ -11,6 +10,8 @@ from app.core.config import settings from app.models import Document +logger = logging.getLogger(__name__) + def vs_ls(client: OpenAI, vector_store_id: str): kwargs = {} @@ -43,14 +44,25 @@ def __str__(self): return type(self).__name__ def __call__(self, resource, retries=1): + logger.info( + f"[ResourceCleaner.call] Starting resource cleanup | {{'cleaner_type': '{self}', 'resource': '{resource}', 'retries': {retries}}}" + ) for i in range(retries): try: self.clean(resource) + logger.info( + f"[ResourceCleaner.call] Resource cleaned successfully | {{'cleaner_type': '{self}', 'resource': '{resource}'}}" + ) return except OpenAIError as err: - logging.error(err) + logger.error( + f"[ResourceCleaner.call] OpenAI error during cleanup | {{'cleaner_type': '{self}', 'resource': '{resource}', 'error': '{str(err)}'}}", + exc_info=True, + ) - warnings.warn(f"[{self} {resource}] Cleanup failure") + logger.warning( + f"[ResourceCleaner.call] Cleanup failure after retries | {{'cleaner_type': '{self}', 'resource': '{resource}'}}" + ) def clean(self, resource): raise NotImplementedError() @@ -58,13 +70,22 @@ def clean(self, resource): class AssistantCleaner(ResourceCleaner): def clean(self, resource): + logger.info( + f"[AssistantCleaner.clean] Deleting assistant | {{'assistant_id': '{resource}'}}" + ) self.client.beta.assistants.delete(resource) class VectorStoreCleaner(ResourceCleaner): def clean(self, resource): + logger.info( + f"[VectorStoreCleaner.clean] Starting vector store cleanup | {{'vector_store_id': '{resource}'}}" + ) for i in vs_ls(self.client, resource): self.client.files.delete(i.id) + logger.info( + f"[VectorStoreCleaner.clean] Deleting vector store | {{'vector_store_id': '{resource}'}}" + ) self.client.vector_stores.delete(resource) @@ -75,9 +96,19 @@ def __init__(self, client=None): class OpenAIVectorStoreCrud(OpenAICrud): def create(self): - return self.client.vector_stores.create() + logger.info( + f"[OpenAIVectorStoreCrud.create] Creating vector store | {{'action': 'create'}}" + ) + vector_store = self.client.vector_stores.create() + logger.info( + f"[OpenAIVectorStoreCrud.create] Vector store created | {{'vector_store_id': '{vector_store.id}'}}" + ) + return vector_store def read(self, vector_store_id: str): + logger.info( + f"[OpenAIVectorStoreCrud.read] Reading files from vector store | {{'vector_store_id': '{vector_store_id}'}}" + ) yield from vs_ls(self.client, vector_store_id) def update( @@ -97,10 +128,16 @@ def update( files.append(f_obj) + logger.info( + f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}" + ) req = self.client.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store_id, files=files, ) + logger.info( + f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" + ) if req.file_counts.completed != req.file_counts.total: view = {x.fname: x for x in docs} for i in self.read(vector_store_id): @@ -112,25 +149,48 @@ def update( "error": "OpenAI document processing error", "documents": list(view.values()), } - raise InterruptedError(json.dumps(error, cls=BaseModelEncoder)) + try: + raise InterruptedError(json.dumps(error, cls=BaseModelEncoder)) + except InterruptedError as err: + logger.error( + f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'error': '{error['error']}', 'failed_documents': {len(error['documents'])}}}", + exc_info=True, + ) + raise while files: f_obj = files.pop() f_obj.close() + logger.info( + f"[OpenAIVectorStoreCrud.update] Closed file stream | {{'vector_store_id': '{vector_store_id}', 'filename': '{f_obj.name}'}}" + ) yield from docs def delete(self, vector_store_id: str, retries: int = 3): if retries < 1: - raise ValueError("Retries must be greater-than 1") + try: + raise ValueError("Retries must be greater-than 1") + except ValueError as err: + logger.error( + f"[OpenAIVectorStoreCrud.delete] Invalid retries value | {{'vector_store_id': '{vector_store_id}', 'retries': {retries}}}", + exc_info=True, + ) + raise cleaner = VectorStoreCleaner(self.client) cleaner(vector_store_id) + logger.info( + f"[OpenAIVectorStoreCrud.delete] Vector store deleted | {{'vector_store_id': '{vector_store_id}'}}" + ) class OpenAIAssistantCrud(OpenAICrud): def create(self, vector_store_id: str, **kwargs): - return self.client.beta.assistants.create( + logger.info( + f"[OpenAIAssistantCrud.create] Creating assistant | {{'vector_store_id': '{vector_store_id}'}}" + ) + assistant = self.client.beta.assistants.create( tools=[ { "type": "file_search", @@ -145,22 +205,38 @@ def create(self, vector_store_id: str, **kwargs): }, **kwargs, ) + logger.info( + f"[OpenAIAssistantCrud.create] Assistant created | {{'assistant_id': '{assistant.id}', 'vector_store_id': '{vector_store_id}'}}" + ) + return assistant def delete(self, assistant_id: str): + logger.info( + f"[OpenAIAssistantCrud.delete] Starting assistant deletion | {{'assistant_id': '{assistant_id}'}}" + ) assistant = self.client.beta.assistants.retrieve(assistant_id) vector_stores = assistant.tool_resources.file_search.vector_store_ids + try: (vector_store_id,) = vector_stores - except ValueError as err: + except ValueError: if vector_stores: names = ", ".join(vector_stores) - msg = f"Too many attached vector stores: {names}" + err = ValueError(f"Too many attached vector stores: {names}") else: - msg = "No vector stores found" - raise ValueError(msg) + err = ValueError("No vector stores found") + + logger.error( + f"[OpenAIAssistantCrud.delete] Invalid vector store state | {{'assistant_id': '{assistant_id}', 'vector_stores': '{vector_stores}'}}", + exc_info=True, + ) + raise err v_crud = OpenAIVectorStoreCrud(self.client) v_crud.delete(vector_store_id) cleaner = AssistantCleaner(self.client) cleaner(assistant_id) + logger.info( + f"[OpenAIAssistantCrud.delete] Assistant deleted | {{'assistant_id': '{assistant_id}'}}" + ) diff --git a/backend/app/main.py b/backend/app/main.py index 6ae1d33c..09c6f0e9 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,11 +1,13 @@ import sentry_sdk -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI from fastapi.routing import APIRoute from app.api.main import api_router from app.core.config import settings +import app.core.logger from app.core.exception_handlers import register_exception_handlers +from app.core.middleware import http_request_logger def custom_generate_unique_id(route: APIRoute) -> str: @@ -21,6 +23,8 @@ def custom_generate_unique_id(route: APIRoute) -> str: generate_unique_id_function=custom_generate_unique_id, ) +app.middleware("http")(http_request_logger) + app.include_router(api_router, prefix=settings.API_V1_STR) register_exception_handlers(app) diff --git a/backend/app/utils.py b/backend/app/utils.py index 63712ebd..b7f7f793 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -151,6 +151,18 @@ def verify_password_reset_token(token: str) -> str | None: return None +def mask_string(value: str, mask_char: str = "*") -> str: + if not value: + return "" + + length = len(value) + num_mask = length // 2 + start = (length - num_mask) // 2 + end = start + num_mask + + return value[:start] + (mask_char * num_mask) + value[end:] + + @ft.singledispatch def load_description(filename: Path) -> str: if not filename.exists():