Skip to content

Collections: one batch processing per task #786

Open
nishika26 wants to merge 12 commits intomainfrom
enhancement/collection_batching
Open

Collections: one batch processing per task #786
nishika26 wants to merge 12 commits intomainfrom
enhancement/collection_batching

Conversation

@nishika26
Copy link
Copy Markdown
Collaborator

@nishika26 nishika26 commented Apr 24, 2026

Summary

Target issue is #PLEASE_TYPE_ISSUE_NUMBER
Explain the motivation for making this change. What existing problem does the pull request solve?

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

Notes

Please add here if any other information is required for the reviewer.

Summary by CodeRabbit

  • Documentation

    • File upload size limit documented as 25 MB
  • New Features

    • Batch-based document processing for collection creation
  • Refactor

    • Optimized collection creation workflow for improved efficiency
    • Enhanced timeout management for background operations
    • Streamlined document upload handling with improved integration support

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 24, 2026

📝 Walkthrough

Walkthrough

This PR implements batch-based document processing for collections. It adds database columns for batch tracking, refactors the collection creation workflow into a two-phase batch orchestration system, updates the provider interface to support batch uploads, replaces OpenTelemetry tracing with correlation IDs, and adds gevent-based timeout support to Celery tasks.

Changes

Batch-Based Collection Processing

Layer / File(s) Summary
Database Schema
backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py, backend/app/models/collection_job.py, backend/app/models/document.py
Migration adds total_batches, current_batch_number, and documents_uploaded columns to collection_jobs table; openai_file_id added to document table. Models expose these fields as optional JSON/string fields.
Core Provider Interface
backend/app/services/collections/providers/base.py
BaseProvider.create() signature updated to accept pre-fetched docs list and optional vector_store_id; new abstract methods upload_files() and get_existing_file_id() added to support batch operations.
OpenAI Provider Implementation
backend/app/services/collections/providers/openai.py, backend/app/crud/rag/open_ai.py
OpenAIProvider.create() refactored to match new interface; new upload_files() and get_existing_file_id() helpers added. OpenAIVectorStoreCrud.update_batch() replaces per-document update() method to batch-upload documents with duration logging.
Collection Creation Orchestration
backend/app/services/collections/create_collection.py
Complete rewrite implements two-phase batching: Phase 1 fetches documents, uploads files, creates vector store, splits into batches, and enqueues first batch; Phase 2 processes remaining batches and finalizes collection. Replaces OpenTelemetry tracing with correlation_id based tracking; adds helpers for persisting succeeded docs and retrying failed uploads.
Batch Helpers
backend/app/services/collections/helpers.py
batch_documents() now requires non-None file_size_kb (raises TypeError if missing) instead of defaulting to zero.
Celery Task Execution & Utilities
backend/app/celery/tasks/job_execution.py, backend/app/celery/utils.py
Removes OpenTelemetry context extraction/injection; all start_* task functions now enqueue via .delay() directly. gevent_timeout() decorator added to enforce soft time limits with Timeout handling. run_create_collection_job decorated with timeout; all tasks use correlation_id.set(trace_id) for tracing.
Tests & Documentation
backend/app/tests/services/collections/test_helpers.py, backend/app/api/docs/documents/upload.md
Test for None file_size_kb updated to expect TypeError instead of zero-byte fallback. Upload documentation updated to note 25 MB maximum file size.

Sequence Diagram

sequenceDiagram
    participant API as API Endpoint
    participant CeleryWorker as Celery Worker
    participant CollectionSvc as Collection Service
    participant Provider as OpenAI Provider
    participant VectorStore as Vector Store API
    participant Storage as Cloud Storage
    participant Database as Database

    API->>CeleryWorker: start_create_collection_job(...)
    
    CeleryWorker->>CollectionSvc: execute_setup_job() [Phase 1]
    CollectionSvc->>Database: fetch documents
    CollectionSvc->>Storage: upload_files(docs)
    Storage-->>CollectionSvc: files ready
    
    CollectionSvc->>Provider: create(docs, vector_store_id=None) [Phase 1]
    Provider->>VectorStore: create vector store
    VectorStore-->>Provider: vector_store_id
    Provider->>VectorStore: update_batch(docs)
    VectorStore-->>Provider: succeeded[], failed[]
    
    CollectionSvc->>Database: create collection_job (PROCESSING)
    CollectionSvc->>CollectionSvc: split docs into batches
    CollectionSvc->>CeleryWorker: start_collection_batch_job(batch_1)
    
    CeleryWorker->>CollectionSvc: execute_batch_job(batch_n) [Phase 2]
    loop for each remaining batch
        CollectionSvc->>Provider: create(docs, vector_store_id) [Phase 2, is_final=False]
        CollectionSvc->>CeleryWorker: start_collection_batch_job(batch_n+1)
    end
    
    CollectionSvc->>Provider: create(final_docs, vector_store_id, is_final=True) [Phase 2]
    CollectionSvc->>Database: finalize collection, link documents
    CollectionSvc->>API: send_callback(success)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

enhancement, ready-for-review

Suggested reviewers

  • vprashrex
  • Prajna1999
  • AkhileshNegi

Poem

🐰 Batches bundled, docs now flow,
Vector stores with vigor grow,
Two-phase dance through Celery's beat,
Timeouts gevent-sweet, complete!
No more OTEL traces cast,
Batch by batch, we process fast! 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.64% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Collections: one batch processing per task' clearly reflects the main focus of the changeset, which introduces batch-based orchestration for collection creation with phase-based processing.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch enhancement/collection_batching

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@nishika26 nishika26 changed the title Collections: batch per task and gevent timeout Collections: one batch processing per task Apr 29, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
backend/app/services/collections/helpers.py (1)

84-99: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Replace the implicit TypeError with an explicit validation error.

Removing the or 0 fallback means a document with file_size_kb=None now crashes inside the batching loop with an opaque unsupported operand type(s) for +: 'int' and 'NoneType' mid-iteration. Callers cannot tell which document is invalid and any batches accumulated up to that point are discarded. A pre-loop validation (or explicit per-doc check) yields a clear message and a deterministic failure point.

🛡️ Proposed fix
     for doc in documents:
-        doc_size_kb = doc.file_size_kb
+        if doc.file_size_kb is None:
+            raise ValueError(
+                f"[batch_documents] Document {doc.id} has no file_size_kb; "
+                "sizes must be backfilled before batching."
+            )
+        doc_size_kb = doc.file_size_kb
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/helpers.py` around lines 84 - 99, The
batching loop in batch_documents (the for doc in documents loop using
current_batch and current_batch_size_kb) can raise an opaque TypeError when
doc.file_size_kb is None; add explicit validation for each doc before using it
(either a pre-loop scan or a per-doc check) that verifies file_size_kb is not
None and is a numeric type, and if invalid raise a clear ValueError that
includes an identifier (e.g., doc.id or doc.name) so callers know which document
failed; perform this validation before updating current_batch_size_kb so
existing batches are preserved and add a short logger.warning or logger.error
with the same diagnostic information when raising.
backend/app/crud/rag/open_ai.py (1)

119-151: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove the unused update method from OpenAIVectorStoreCrud.

This method is not called anywhere in the codebase and has been replaced by update_batch. Additionally, it's missing a return type hint, which violates the coding guideline requiring type hints on all function return values. Removing it eliminates redundant code and the maintenance burden of two divergent upload flows.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/rag/open_ai.py` around lines 119 - 151, Delete the unused
OpenAIVectorStoreCrud.update method (the entire function) since upload logic is
now handled by update_batch; after removal, run a quick search for any remaining
references to OpenAIVectorStoreCrud.update and remove them, and clean up any
now-unused imports or symbols used only by that method (e.g., BytesIO, Document,
CloudStorage) to avoid lints and type-hint violations.
backend/app/services/collections/create_collection.py (2)

174-303: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add type hints for task_instance (and tighten helper hints).

Per the coding guidelines, all function parameters and return values must have type hints. The following are missing/loose:

  • execute_setup_job(... task_instance, ...) -> Nonetask_instance lacks a type
  • execute_batch_job(... task_instance, ...) -> None — same
  • _persist_succeeded_docs(succeeded: list, ...) — should be list[Document]
  • _retry_failed_uploads(vector_store_crud, ..., failed_docs: list, ...)vector_store_crud lacks a type, failed_docs should be list[Document]

task_instance can be typed as celery.Task (or kept as Any from typing if you want to avoid the dependency leak).

As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/create_collection.py` around lines 174 -
303, The functions are missing/loose type hints: add an explicit type for
task_instance in both execute_setup_job and execute_batch_job (use celery.Task
or typing.Any if you want to avoid importing Celery), and tighten helper
signatures so _persist_succeeded_docs uses succeeded: list[Document] and
_retry_failed_uploads uses failed_docs: list[Document] and type-hint
vector_store_crud to the actual CRUD class (e.g., VectorStoreCrud) or typing.Any
if that class isn't accessible; also import any needed names (Document, Any,
celery.Task) and update return annotations if necessary.

39-66: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Return type mismatch: declared -> str but returns a UUID.

collection_job_id is a UUID (per the parameter annotation on line 43); returning it directly violates the declared -> str. Cast or change the annotation.

🐛 Proposed fix
-    return collection_job_id
+    return str(collection_job_id)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/create_collection.py` around lines 39 - 66,
The function start_job currently declares a return type of -> str but returns
collection_job_id which is a UUID; fix by either changing the function signature
to return -> UUID or converting the returned value to a string with return
str(collection_job_id). Update any imports/annotations if you choose UUID (e.g.,
ensure UUID is imported) and keep the rest of the logic (calls to
CollectionJobCrud.update and start_create_collection_job) unchanged.
backend/app/services/collections/providers/openai.py (1)

23-28: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Update test calls to match new create signature.

The test suite in backend/app/tests/services/collections/providers/test_openai_provider.py has three test functions that call provider.create() with the old three-argument signature:

  • test_create_openai_vector_store_only() (line 40)
  • test_create_openai_with_assistant() (line 79)
  • test_create_propagates_exception() (line 143)

All three pass storage as the second argument and a documents list as the third, but the updated signature is create(collection_request, docs, vector_store_id=None, is_final=False). The tests need to pass the documents list as the second argument, not storage:

  • Change from: provider.create(collection_request, storage, documents)
  • Change to: provider.create(collection_request, documents) (with vector_store_id as named argument if needed)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/providers/openai.py` around lines 23 - 28,
Update the three failing tests so they call the new create signature: replace
calls to provider.create(collection_request, storage, documents) with
provider.create(collection_request, documents) and, if a vector_store_id or
is_final was intended, pass those as named args (e.g.
provider.create(collection_request, documents, vector_store_id=...,
is_final=...)); modify the three test functions in
backend/app/tests/services/collections/providers/test_openai_provider.py
(test_create_openai_vector_store_only, test_create_openai_with_assistant,
test_create_propagates_exception) to pass the documents list as the second
parameter and remove the positional storage argument.
🧹 Nitpick comments (4)
backend/app/models/document.py (1)

49-53: ⚡ Quick win

Align column comment between model and migration.

Migration 055 sets the column comment to "File ID assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading", but the model declares it as "File ID assigned by OpenAI (avoid re-uploading)". Future alembic revision --autogenerate runs may flag this drift as an unintended schema change. Pick one wording and keep both in sync.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/models/document.py` around lines 49 - 53, The model field
openai_file_id's sa_column_kwargs comment string mismatches the migration;
update the Field definition for openai_file_id in the Document model to use the
exact comment used in migration 055 ("File ID assigned by the LLM provider (e.g.
OpenAI file ID) to avoid re-uploading") so the sa_column_kwargs comment and the
migration stay in sync and prevent autogenerate diffs.
backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py (1)

47-55: 💤 Low value

Migration name only mentions collection_jobs, but it also alters document.

The filename and revision message refer to collection_jobs only, while the upgrade also adds document.openai_file_id. Consider splitting into two migrations or renaming/updating the message so the change scope is discoverable from the migration filename and history.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py`
around lines 47 - 55, The migration
'055_add_batch_tracking_to_collections_jobs.py' declares changes for
collection_jobs but also adds a column to document (op.add_column adding
document.openai_file_id); either split the document change into a separate
migration or rename/update this migration's filename and revision message to
reflect both changes (and update the upgrade/revision docstring) so the history
accurately describes the addition of document.openai_file_id alongside the
collection_jobs alterations.
backend/app/services/collections/providers/openai.py (1)

47-52: ⚡ Quick win

Open one DB session for the whole batch, not one per document.

The current code opens a fresh Session(engine) and constructs a DocumentCrud for every successful upload. For a collection with hundreds/thousands of docs this multiplies connection overhead unnecessarily. A single session outside the loop with per-doc commits (or a single commit at the end if you don't need partial-progress durability) is cleaner.

♻️ Proposed refactor
-    def upload_files(
+    def upload_files(
         self,
         storage: CloudStorage,
         docs: list[Document],
         project_id: int,
     ) -> None:
-        for doc in docs:
-            if self.get_existing_file_id(doc):
-                continue
-            try:
-                content = storage.get(doc.object_store_url)
-                if doc.file_size_kb is None:
-                    doc.file_size_kb = round(len(content) / 1024, 2)
-                f_obj = BytesIO(content)
-                f_obj.name = doc.fname
-                uploaded = self.client.files.create(file=f_obj, purpose="assistants")
-                doc.openai_file_id = uploaded.id
-                with Session(engine) as session:
-                    document_crud = DocumentCrud(session, project_id)
-                    db_doc = document_crud.read_one(doc.id)
-                    db_doc.openai_file_id = uploaded.id
-                    db_doc.file_size_kb = doc.file_size_kb
-                    document_crud.update(db_doc)
-            except Exception as err:
-                ...
+        with Session(engine) as session:
+            document_crud = DocumentCrud(session, project_id)
+            for doc in docs:
+                if self.get_existing_file_id(doc):
+                    continue
+                content = storage.get(doc.object_store_url)
+                if doc.file_size_kb is None:
+                    doc.file_size_kb = round(len(content) / 1024, 2)
+                f_obj = BytesIO(content)
+                f_obj.name = doc.fname
+                uploaded = self.client.files.create(file=f_obj, purpose="assistants")
+                doc.openai_file_id = uploaded.id
+                db_doc = document_crud.read_one(doc.id)
+                db_doc.openai_file_id = uploaded.id
+                db_doc.file_size_kb = doc.file_size_kb
+                document_crud.update(db_doc)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/providers/openai.py` around lines 47 - 52,
The code currently creates a new Session(engine) and DocumentCrud for every
uploaded document; instead open a single Session(engine) outside the upload loop
and reuse it (and a DocumentCrud instance per project_id) for each doc, calling
document_crud.read_one(doc.id), updating db_doc.openai_file_id and
db_doc.file_size_kb, and then document_crud.update(db_doc) inside the loop;
perform either a session.commit() per document for partial durability or one
commit after the loop, and ensure the session is closed once after processing
the entire batch.
backend/app/services/collections/create_collection.py (1)

475-491: ⚡ Quick win

Change except BaseException to except Exception.

BaseException catches KeyboardInterrupt, SystemExit, and GeneratorExit, which should normally be allowed to propagate. Additionally, gevent's Timeout deliberately inherits from BaseException (not Exception), so this generic handler will swallow timeouts that escape the dedicated except Timeout handler above and incorrectly mark the job as failed. Use except Exception instead.

♻️ Proposed change
-    except BaseException as err:
+    except Exception as err:
         logger.error(
             "[create_collection.execute_batch_job] Batch %d failed | job_id=%s, error=%s",
             ...
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/create_collection.py` around lines 475 -
491, The catch-all in create_collection.execute_batch_job currently uses "except
BaseException as err" which improperly catches KeyboardInterrupt/SystemExit and
gevent Timeouts; change that handler to "except Exception as err" so only
regular exceptions are caught (leaving the earlier "except Timeout" and
system-exiting signals to propagate), and keep the existing logging,
_mark_job_failed, and callback logic unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 74-105: The gevent_timeout decorator currently raises TimeoutError
unconditionally in its finally block causing tasks like
run_create_collection_job and run_collection_batch_job to always fail; modify
gevent_timeout (the decorator implementation) so that the Timeout exception is
raised only inside the except Timeout: handler and the finally: block only calls
timeout.cancel() (no raise), ensuring timeout.cancel() is reachable and
successful task completions do not raise TimeoutError.

In `@backend/app/celery/utils.py`:
- Around line 185-208: gevent_timeout currently always raises TimeoutError and
never cancels the gevent Timeout; fix wrapper in gevent_timeout by tracking
whether the gevent Timeout fired (e.g., timed_out flag and optionally store
result/exception), don't unconditionally raise in finally, always call
timeout.cancel() in the finally block, and only raise TimeoutError (or re-raise
the stored Timeout) after timeout.cancel() if timed_out is true; reference
wrapper, Timeout, timeout.cancel(), task_name and func.__name__ to locate where
to apply the change.

In `@backend/app/crud/rag/open_ai.py`:
- Around line 158-163: The docstring for the batch upload method incorrectly
refers to provider_file_id; update it to reference the actual Document attribute
used in the code (doc.openai_file_id) so the docstring matches the
implementation (see the method that calls upload_and_poll / the loop that reads
doc.openai_file_id). Ensure the sentence now states that all docs must have
openai_file_id set before calling this method and return description remains
unchanged.
- Around line 182-190: In OpenAIVectorStoreCrud.update_batch, when
batch.file_counts.failed > 0, don't mark all docs for retry; call the OpenAI
helper client.beta.vector_stores.file_batches.list_files(batch_id=batch.id,
vector_store_id=vector_store_id, filter="failed") to get only failed file
entries, map those failed file identifiers back to the input docs list (using
the same file id/key used when building docs), and extend the failed list with
only those docs so upload_and_poll() is retried only for genuinely failed files
instead of the entire batch.

In `@backend/app/services/collections/create_collection.py`:
- Around line 122-172: The two helper functions _persist_succeeded_docs and
_retry_failed_uploads (and the stale docstring reference to
_upload_batch_with_retry) are dead code and OpenAIVectorStoreCrud is unused;
either wire them into the batch path (execute_setup_job / execute_batch_job) or
remove them. Fix by removing the unused helpers _persist_succeeded_docs and
_retry_failed_uploads and the OpenAIVectorStoreCrud import, and update the
execute_batch_job docstring to not reference _upload_batch_with_retry;
alternatively, if you intend to keep retry logic, add calls from
execute_batch_job/execute_setup_job to _retry_failed_uploads (and ensure
vector_store_crud is passed) and implement or rename _upload_batch_with_retry
accordingly so the docstring matches the implemented function.
- Around line 304-311: Update the Phase 2 docstring to remove the reference to
the non-existent _upload_batch_with_retry and instead describe the actual
behavior: that the code calls provider.create(...) which delegates to
vector_store_crud.update_batch, and that inline retries are handled by
_retry_failed_uploads (if used) or by the underlying vector_store_crud; ensure
the docstring accurately states that failed items are retried via
_retry_failed_uploads or the vector_store_crud retry semantics, and that the
function still checkpoints progress, queues next batch, and finalizes the
collection on the last batch.
- Around line 215-220: The log call in create_collection.execute_setup_job uses
four format specifiers but only passes job_id and len(flat_docs), causing a
runtime TypeError; update the logger.info call to either (A) reduce the format
string to match the two provided args (e.g., remove failed and duration_s
placeholders) or (B) compute and supply the missing values by timing the
upload_files call and getting a failed count (modify upload_files to return a
result struct with failed_count and have execute_setup_job measure duration_s
and pass job_id, len(flat_docs), failed_count, duration_s into logger.info).
Ensure the change references logger.info and the upload_files/flat_docs
variables so the log formatting and values are consistent.
- Around line 243-253: The first batch enqueue call to
start_collection_batch_job is missing the required vector_store_id expected by
execute_batch_job, causing a TypeError; fix it by passing vector_store_id=None
in the start_collection_batch_job invocation (where project_id/job_id/trace_id
are passed) so execute_batch_job receives the argument, or alternatively add a
default vector_store_id: Optional[...] = None to execute_batch_job's signature;
reference start_collection_batch_job and execute_batch_job when making the
change.

In `@backend/app/services/collections/providers/openai.py`:
- Around line 30-59: The upload_files loop in OpenAIProvider.upload_files
currently logs per-document exceptions and continues, leaving docs with None
file_size_kb/openai_file_id and causing downstream TypeError or silent failures;
modify upload_files to either (A) fail-fast by re-raising the caught exception
after logging so callers (e.g., create_collection.execute_setup_job) can stop
and surface the real error, or (B) accumulate per-doc failures into a structured
result (e.g., list of successes and failures) and return that to callers so they
can decide (and avoid passing docs without openai_file_id to
vector_store_crud.update_batch); update the function signature and callers
accordingly (refer to upload_files, create_collection.execute_setup_job, and
vector_store_crud.update_batch) so callers handle the returned error info or the
propagated exception.

---

Outside diff comments:
In `@backend/app/crud/rag/open_ai.py`:
- Around line 119-151: Delete the unused OpenAIVectorStoreCrud.update method
(the entire function) since upload logic is now handled by update_batch; after
removal, run a quick search for any remaining references to
OpenAIVectorStoreCrud.update and remove them, and clean up any now-unused
imports or symbols used only by that method (e.g., BytesIO, Document,
CloudStorage) to avoid lints and type-hint violations.

In `@backend/app/services/collections/create_collection.py`:
- Around line 174-303: The functions are missing/loose type hints: add an
explicit type for task_instance in both execute_setup_job and execute_batch_job
(use celery.Task or typing.Any if you want to avoid importing Celery), and
tighten helper signatures so _persist_succeeded_docs uses succeeded:
list[Document] and _retry_failed_uploads uses failed_docs: list[Document] and
type-hint vector_store_crud to the actual CRUD class (e.g., VectorStoreCrud) or
typing.Any if that class isn't accessible; also import any needed names
(Document, Any, celery.Task) and update return annotations if necessary.
- Around line 39-66: The function start_job currently declares a return type of
-> str but returns collection_job_id which is a UUID; fix by either changing the
function signature to return -> UUID or converting the returned value to a
string with return str(collection_job_id). Update any imports/annotations if you
choose UUID (e.g., ensure UUID is imported) and keep the rest of the logic
(calls to CollectionJobCrud.update and start_create_collection_job) unchanged.

In `@backend/app/services/collections/helpers.py`:
- Around line 84-99: The batching loop in batch_documents (the for doc in
documents loop using current_batch and current_batch_size_kb) can raise an
opaque TypeError when doc.file_size_kb is None; add explicit validation for each
doc before using it (either a pre-loop scan or a per-doc check) that verifies
file_size_kb is not None and is a numeric type, and if invalid raise a clear
ValueError that includes an identifier (e.g., doc.id or doc.name) so callers
know which document failed; perform this validation before updating
current_batch_size_kb so existing batches are preserved and add a short
logger.warning or logger.error with the same diagnostic information when
raising.

In `@backend/app/services/collections/providers/openai.py`:
- Around line 23-28: Update the three failing tests so they call the new create
signature: replace calls to provider.create(collection_request, storage,
documents) with provider.create(collection_request, documents) and, if a
vector_store_id or is_final was intended, pass those as named args (e.g.
provider.create(collection_request, documents, vector_store_id=...,
is_final=...)); modify the three test functions in
backend/app/tests/services/collections/providers/test_openai_provider.py
(test_create_openai_vector_store_only, test_create_openai_with_assistant,
test_create_propagates_exception) to pass the documents list as the second
parameter and remove the positional storage argument.

---

Nitpick comments:
In `@backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py`:
- Around line 47-55: The migration
'055_add_batch_tracking_to_collections_jobs.py' declares changes for
collection_jobs but also adds a column to document (op.add_column adding
document.openai_file_id); either split the document change into a separate
migration or rename/update this migration's filename and revision message to
reflect both changes (and update the upgrade/revision docstring) so the history
accurately describes the addition of document.openai_file_id alongside the
collection_jobs alterations.

In `@backend/app/models/document.py`:
- Around line 49-53: The model field openai_file_id's sa_column_kwargs comment
string mismatches the migration; update the Field definition for openai_file_id
in the Document model to use the exact comment used in migration 055 ("File ID
assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading") so
the sa_column_kwargs comment and the migration stay in sync and prevent
autogenerate diffs.

In `@backend/app/services/collections/create_collection.py`:
- Around line 475-491: The catch-all in create_collection.execute_batch_job
currently uses "except BaseException as err" which improperly catches
KeyboardInterrupt/SystemExit and gevent Timeouts; change that handler to "except
Exception as err" so only regular exceptions are caught (leaving the earlier
"except Timeout" and system-exiting signals to propagate), and keep the existing
logging, _mark_job_failed, and callback logic unchanged.

In `@backend/app/services/collections/providers/openai.py`:
- Around line 47-52: The code currently creates a new Session(engine) and
DocumentCrud for every uploaded document; instead open a single Session(engine)
outside the upload loop and reuse it (and a DocumentCrud instance per
project_id) for each doc, calling document_crud.read_one(doc.id), updating
db_doc.openai_file_id and db_doc.file_size_kb, and then
document_crud.update(db_doc) inside the loop; perform either a session.commit()
per document for partial durability or one commit after the loop, and ensure the
session is closed once after processing the entire batch.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 14146497-9eeb-46d5-94d9-fe7751afed6a

📥 Commits

Reviewing files that changed from the base of the PR and between a9e2ac5 and 2a2e268.

📒 Files selected for processing (12)
  • backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py
  • backend/app/api/docs/documents/upload.md
  • backend/app/celery/tasks/job_execution.py
  • backend/app/celery/utils.py
  • backend/app/crud/rag/open_ai.py
  • backend/app/models/collection_job.py
  • backend/app/models/document.py
  • backend/app/services/collections/create_collection.py
  • backend/app/services/collections/helpers.py
  • backend/app/services/collections/providers/base.py
  • backend/app/services/collections/providers/openai.py
  • backend/app/tests/services/collections/test_helpers.py

Comment on lines 74 to 105
@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_create_collection_job")
def run_create_collection_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_job
from app.services.collections.create_collection import execute_setup_job

_set_trace(trace_id)
return execute_setup_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_collection_batch_job")
def run_collection_batch_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_batch_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_batch_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Show gevent_timeout implementation and how it interacts with gevent.Timeout
fd -t f 'utils.py' --search-path backend/app/celery -x cat {}
echo '---'
rg -nP -C5 'def\s+gevent_timeout\b' --type=py
echo '---'
rg -nP -C3 '\bTimeout\b' backend/app/celery/utils.py 2>/dev/null

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 7831


Fix critical bug in gevent_timeout decorator — it unconditionally raises TimeoutError on every task completion.

The finally block in gevent_timeout (backend/app/celery/utils.py, lines 202–204) unconditionally raises TimeoutError regardless of whether the task succeeded or actually timed out. The timeout.cancel() call is unreachable (placed after the raise statement). This means every execution of run_create_collection_job and run_collection_batch_job will fail with TimeoutError, even when completing successfully. The finally block must be refactored to cancel the timeout cleanly on success:

finally:
    timeout.cancel()

Only the except Timeout block should raise the TimeoutError when a genuine timeout occurs.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/celery/tasks/job_execution.py` around lines 74 - 105, The
gevent_timeout decorator currently raises TimeoutError unconditionally in its
finally block causing tasks like run_create_collection_job and
run_collection_batch_job to always fail; modify gevent_timeout (the decorator
implementation) so that the Timeout exception is raised only inside the except
Timeout: handler and the finally: block only calls timeout.cancel() (no raise),
ensuring timeout.cancel() is reachable and successful task completions do not
raise TimeoutError.

Comment on lines +185 to +208
def gevent_timeout(seconds, task_name=None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
name = task_name or func.__name__
timeout = Timeout(seconds)
timeout.start()
try:
return func(*args, **kwargs)
except Timeout:
logger.error(
f"[{name}] Timed out after {seconds}s — args={args}, kwargs={kwargs}"
)
raise
# raise TimeoutError(f"[{name}] Task exceeded soft time limit of {seconds}s")
finally:
raise TimeoutError(
f"[{name}] Task exceeded soft time limit of {seconds}s"
)
timeout.cancel()

return wrapper

return decorator
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: gevent_timeout always raises TimeoutError and never cancels the timeout.

The finally block unconditionally raises TimeoutError, so:

  • Successful returns from func are discarded and replaced with a TimeoutError.
  • The except Timeout: ... raise re-raise is overridden by the finally's raise, so callers never see a Timeout.
  • timeout.cancel() on Line 204 is unreachable (statement after an unconditional raise), meaning the gevent timer is never cancelled on the success path and continues to fire later.

The decorator as written is unusable on any path. The TimeoutError should only be raised when the Timeout actually fired, and timeout.cancel() must run on every exit.

🛡️ Proposed fix
-def gevent_timeout(seconds, task_name=None):
-    def decorator(func):
-        `@functools.wraps`(func)
-        def wrapper(*args, **kwargs):
-            name = task_name or func.__name__
-            timeout = Timeout(seconds)
-            timeout.start()
-            try:
-                return func(*args, **kwargs)
-            except Timeout:
-                logger.error(
-                    f"[{name}] Timed out after {seconds}s — args={args}, kwargs={kwargs}"
-                )
-                raise
-            # raise TimeoutError(f"[{name}] Task exceeded soft time limit of {seconds}s")
-            finally:
-                raise TimeoutError(
-                    f"[{name}] Task exceeded soft time limit of {seconds}s"
-                )
-                timeout.cancel()
-
-        return wrapper
-
-    return decorator
+def gevent_timeout(seconds: float, task_name: str | None = None):
+    def decorator(func):
+        `@functools.wraps`(func)
+        def wrapper(*args, **kwargs):
+            name = task_name or func.__name__
+            timeout = Timeout(seconds)
+            timeout.start()
+            try:
+                return func(*args, **kwargs)
+            except Timeout:
+                logger.error(
+                    f"[{name}] Timed out after {seconds}s — args={args}, kwargs={kwargs}"
+                )
+                raise TimeoutError(
+                    f"[{name}] Task exceeded soft time limit of {seconds}s"
+                )
+            finally:
+                timeout.cancel()
+
+        return wrapper
+
+    return decorator
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/celery/utils.py` around lines 185 - 208, gevent_timeout currently
always raises TimeoutError and never cancels the gevent Timeout; fix wrapper in
gevent_timeout by tracking whether the gevent Timeout fired (e.g., timed_out
flag and optionally store result/exception), don't unconditionally raise in
finally, always call timeout.cancel() in the finally block, and only raise
TimeoutError (or re-raise the stored Timeout) after timeout.cancel() if
timed_out is true; reference wrapper, Timeout, timeout.cancel(), task_name and
func.__name__ to locate where to apply the change.

Comment on lines +158 to +163
"""
Attach a batch of documents to the vector store via a single upload_and_poll call.

All docs must have provider_file_id set before calling this method.
Returns (succeeded, failed) — failed docs should be retried in the next batch.
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Docstring references a field name that doesn't exist.

The docstring says docs must have provider_file_id set, but the code reads doc.openai_file_id (Line 175) and that's the field defined on the Document model. Update the docstring to use the actual attribute name.

📝 Proposed fix
-        All docs must have provider_file_id set before calling this method.
+        All docs must have openai_file_id set before calling this method.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"""
Attach a batch of documents to the vector store via a single upload_and_poll call.
All docs must have provider_file_id set before calling this method.
Returns (succeeded, failed) — failed docs should be retried in the next batch.
"""
"""
Attach a batch of documents to the vector store via a single upload_and_poll call.
All docs must have openai_file_id set before calling this method.
Returns (succeeded, failed) — failed docs should be retried in the next batch.
"""
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/rag/open_ai.py` around lines 158 - 163, The docstring for
the batch upload method incorrectly refers to provider_file_id; update it to
reference the actual Document attribute used in the code (doc.openai_file_id) so
the docstring matches the implementation (see the method that calls
upload_and_poll / the loop that reads doc.openai_file_id). Ensure the sentence
now states that all docs must have openai_file_id set before calling this method
and return description remains unchanged.

Comment on lines +182 to +190
if batch.file_counts.failed == 0:
succeeded.extend(docs)
else:
# Can't identify which specific files failed — retry all of them
logger.warning(
f"[OpenAIVectorStoreCrud.update_batch] Batch had failures, marking all for retry | "
f"{{'vector_store_id': '{vector_store_id}', 'failed_count': {batch.file_counts.failed}}}"
)
failed.extend(docs)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🌐 Web query:

OpenAI Python SDK vector_stores.file_batches.upload_and_poll return value per-file status retrieve files failed

💡 Result:

The client.beta.vector_stores.file_batches.upload_and_poll(vector_store_id, files=...) method returns a VectorStoreFileBatch object [1][2]. This object includes: - status: Overall batch status ("in_progress", "completed", "cancelled", "failed") [1]. - file_counts: Counts of files by status - total, completed, failed, in_progress, cancelled [1][3]. It provides aggregate per-status counts but not individual file details or IDs [1][2]. To retrieve per-file status, especially failed files: 1. Use the returned batch's id (starts with "vsfb_") [4][5]. 2. Call client.beta.vector_stores.file_batches.list_files(batch_id=batch.id, vector_store_id=vector_store_id) [6][7]. - Returns a paginated list of VectorStoreFile objects for files in the batch [6]. - Each VectorStoreFile has status ("in_progress", "completed", "cancelled", "failed") and last_error (with code and message for failures, e.g., "server_error", "unsupported_file", "invalid_file") [8][6][9]. - Filter by status: filter="failed" to get only failed files [1][6][7]. Example: file_batch = client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id="vs_...", files=file_streams ) print(file_batch.status) # e.g., "completed" print(file_batch.file_counts.failed) # e.g., 1 # List failed files failed_files = client.beta.vector_stores.file_batches.list_files( batch_id=file_batch.id, vector_store_id="vs_...", filter="failed" ) for f in failed_files.data: print(f.id, f.status, f.last_error) # Details on failed file Note: Batch status can be "completed" even if some files failed (file_counts.failed > 0) [10]. Individual VectorStoreFile objects provide detailed status and errors [8][6].

Citations:


🏁 Script executed:

# First, locate and examine the file mentioned in the review
find . -path "*/backend/app/crud/rag/open_ai.py" -o -path "*/open_ai.py" | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 109


🏁 Script executed:

# Also search for the file more broadly
fd open_ai.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 107


🏁 Script executed:

# Read the file with line numbers to see lines around 182-190
cat -n backend/app/crud/rag/open_ai.py | sed -n '160,210p'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2632


🏁 Script executed:

# Get the full function context - let's find the function name first
cat -n backend/app/crud/rag/open_ai.py | sed -n '150,200p'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2513


🏁 Script executed:

# Search for usage of list_files in the codebase
rg "list_files" backend/app/crud/rag/

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Find where update_batch is called
rg "update_batch" backend/app/crud/rag/

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 601


🏁 Script executed:

# Get more context on the class structure
cat -n backend/app/crud/rag/open_ai.py | head -80

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2923


🏁 Script executed:

# Search for update_batch usage across the entire backend
rg "update_batch" backend/

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1983


🏁 Script executed:

# Get full class definition to understand the context
cat -n backend/app/crud/rag/open_ai.py | sed -n '80,160p'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 3898


🏁 Script executed:

# Examine the two caller locations
cat -n backend/app/services/collections/create_collection.py | grep -A 10 -B 10 "update_batch"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 848


🏁 Script executed:

# Check the openai provider file
cat -n backend/app/services/collections/providers/openai.py | grep -A 10 -B 10 "update_batch"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1030


🏁 Script executed:

# Verify type hints compliance in the function signature
cat -n backend/app/crud/rag/open_ai.py | sed -n '153,157p'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 254


🏁 Script executed:

# Check if there are any type hint issues elsewhere in the function
cat -n backend/app/crud/rag/open_ai.py | sed -n '153,203p'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2640


Use per-file status from list_files() to identify only failed files instead of retrying all docs on batch failure.

When batch.file_counts.failed > 0, the current code marks all docs for retry, including those successfully completed. On retry, these already-attached files are sent again to upload_and_poll(), which either causes idempotency issues or wastes API calls. The OpenAI SDK provides client.beta.vector_stores.file_batches.list_files(batch_id=batch.id, vector_store_id=vector_store_id, filter="failed") to retrieve only the files that actually failed, allowing you to map them back to the input docs and retry only the genuinely failed ones.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/rag/open_ai.py` around lines 182 - 190, In
OpenAIVectorStoreCrud.update_batch, when batch.file_counts.failed > 0, don't
mark all docs for retry; call the OpenAI helper
client.beta.vector_stores.file_batches.list_files(batch_id=batch.id,
vector_store_id=vector_store_id, filter="failed") to get only failed file
entries, map those failed file identifiers back to the input docs list (using
the same file id/key used when building docs), and extend the failed list with
only those docs so upload_and_poll() is retried only for genuinely failed files
instead of the entire batch.

Comment on lines +122 to +172
def _persist_succeeded_docs(succeeded: list, project_id: int) -> list[str]:
with Session(engine) as session:
document_crud = DocumentCrud(session, project_id)
for doc in succeeded:
if doc.openai_file_id:
db_doc = document_crud.read_one(doc.id)
if db_doc.openai_file_id != doc.openai_file_id:
db_doc.openai_file_id = doc.openai_file_id
document_crud.update(db_doc)
return [str(doc.id) for doc in succeeded]


def _retry_failed_uploads(
vector_store_crud,
vector_store_id: str,
failed_docs: list,
project_id: int,
max_retries: int = 3,
) -> list[str]:
"""
Retry attaching docs that failed the initial batch upload_and_poll.
All docs must already have provider_file_id set.
Returns the list of successfully retried doc IDs.
Raises RuntimeError if any docs still fail after all retries.
"""
pending = failed_docs
all_succeeded_ids: list[str] = []

for attempt in range(1, max_retries + 1):
logger.warning(
"[_retry_failed_uploads] Retry attempt %d/%d: %d doc(s) | vector_store_id=%s",
attempt,
max_retries,
len(pending),
vector_store_id,
)
succeeded, failed = vector_store_crud.update_batch(vector_store_id, pending)

if succeeded:
all_succeeded_ids += _persist_succeeded_docs(succeeded, project_id)

if not failed:
return all_succeeded_ids

pending = failed

ids = [str(d.id) for d in pending]
raise RuntimeError(
f"Failed to upload {len(pending)} document(s) after {max_retries} retries: {ids}"
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Dead code: _persist_succeeded_docs and _retry_failed_uploads are never invoked.

Neither helper is called from execute_setup_job nor execute_batch_job. The retry path described in execute_batch_job's docstring (line 306–307: "any failures within the batch are retried inline by _upload_batch_with_retry") refers to a function that doesn't exist in this file at all. Either wire these helpers into the batch path or remove them along with the now-unused OpenAIVectorStoreCrud import on line 24. As-is, they will rot and mislead future readers.

#!/bin/bash
# Confirm these helpers are dead and OpenAIVectorStoreCrud is unused in this file
rg -nP '_persist_succeeded_docs|_retry_failed_uploads|_upload_batch_with_retry|OpenAIVectorStoreCrud' backend/app/services/collections/create_collection.py
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/create_collection.py` around lines 122 -
172, The two helper functions _persist_succeeded_docs and _retry_failed_uploads
(and the stale docstring reference to _upload_batch_with_retry) are dead code
and OpenAIVectorStoreCrud is unused; either wire them into the batch path
(execute_setup_job / execute_batch_job) or remove them. Fix by removing the
unused helpers _persist_succeeded_docs and _retry_failed_uploads and the
OpenAIVectorStoreCrud import, and update the execute_batch_job docstring to not
reference _upload_batch_with_retry; alternatively, if you intend to keep retry
logic, add calls from execute_batch_job/execute_setup_job to
_retry_failed_uploads (and ensure vector_store_crud is passed) and implement or
rename _upload_batch_with_retry accordingly so the docstring matches the
implemented function.

Comment on lines +215 to +220
logger.info(
"[create_collection.execute_setup_job] All file uploads complete | "
"job_id=%s, total=%d, failed=%d, duration_s=%.2f",
job_id,
len(flat_docs),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Logger format string has 4 placeholders but only 2 arguments — runtime TypeError.

The format string contains %s, total=%d, failed=%d, duration_s=%.2f (four placeholders), but you only pass job_id and len(flat_docs). At runtime Python's logging will raise TypeError: not enough arguments for format string while formatting; the log will not be emitted as intended (only an internal logging error is written to stderr). You also reference a failed count and duration_s that are never computed in this function.

🐛 Proposed fix
-        logger.info(
-            "[create_collection.execute_setup_job] All file uploads complete | "
-            "job_id=%s, total=%d, failed=%d, duration_s=%.2f",
-            job_id,
-            len(flat_docs),
-        )
+        logger.info(
+            "[create_collection.execute_setup_job] All file uploads complete | "
+            "job_id=%s, total=%d",
+            job_id,
+            len(flat_docs),
+        )

If you actually want failed/duration_s, change upload_files to return a result struct and time the call so those values exist.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info(
"[create_collection.execute_setup_job] All file uploads complete | "
"job_id=%s, total=%d, failed=%d, duration_s=%.2f",
job_id,
len(flat_docs),
)
logger.info(
"[create_collection.execute_setup_job] All file uploads complete | "
"job_id=%s, total=%d",
job_id,
len(flat_docs),
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/create_collection.py` around lines 215 -
220, The log call in create_collection.execute_setup_job uses four format
specifiers but only passes job_id and len(flat_docs), causing a runtime
TypeError; update the logger.info call to either (A) reduce the format string to
match the two provided args (e.g., remove failed and duration_s placeholders) or
(B) compute and supply the missing values by timing the upload_files call and
getting a failed count (modify upload_files to return a result struct with
failed_count and have execute_setup_job measure duration_s and pass job_id,
len(flat_docs), failed_count, duration_s into logger.info). Ensure the change
references logger.info and the upload_files/flat_docs variables so the log
formatting and values are consistent.

Comment on lines +243 to +253
start_collection_batch_job(
project_id=project_id,
job_id=job_id,
trace_id=trace_id,
batch_number=1,
batch_doc_ids=batch_doc_ids[0],
remaining_batches=batch_doc_ids[1:],
request=request,
with_assistant=with_assistant,
organization_id=organization_id,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect start_collection_batch_job and start_create_collection_job signatures and how they forward kwargs
rg -nP -C8 'def\s+start_collection_batch_job\b|def\s+start_create_collection_job\b' --type=py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2005


🏁 Script executed:

# Find execute_setup_job and check what it passes to start_collection_batch_job
rg -nP -A 30 'def execute_setup_job' backend/app/services/collections/ --type=py | head -100

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2824


🏁 Script executed:

# Find execute_batch_job signature to check vector_store_id parameter
rg -nP -A 5 'def execute_batch_job' backend/app/services/collections/ --type=py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 556


🏁 Script executed:

# Find run_collection_batch_job Celery task definition
rg -nP -A 10 'def run_collection_batch_job' backend/app/celery/tasks/ --type=py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 927


🏁 Script executed:

# Get full execute_batch_job signature and first few lines
rg -nP -A 20 'def execute_batch_job' backend/app/services/collections/create_collection.py --type=py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 906


🏁 Script executed:

# Read the specific lines 243-253 in create_collection.py to see the actual call
sed -n '240,260p' backend/app/services/collections/create_collection.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 698


🏁 Script executed:

# Also check run_collection_batch_job to see if it processes kwargs
rg -nP -A 20 'def run_collection_batch_job' backend/app/celery/tasks/job_execution.py --type=py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 763


🏁 Script executed:

# Check if vector_store_id is set anywhere in execute_setup_job before the call
rg -nP -B 50 'start_collection_batch_job' backend/app/services/collections/create_collection.py | grep -E '(vector_store_id|def execute_setup_job)' | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 238


🏁 Script executed:

# Check the full execute_setup_job to see if vector_store_id is created
sed -n '174,250p' backend/app/services/collections/create_collection.py | grep -n 'vector_store_id'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Get the full execute_setup_job function to see vector_store_id handling
sed -n '174,290p' backend/app/services/collections/create_collection.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 3915


Pass vector_store_id in the first batch enqueue call.

The call to start_collection_batch_job at lines 243–253 does not pass vector_store_id, but execute_batch_job requires it (line 299, no default). The first batch will fail with TypeError: execute_batch_job() missing 1 required positional argument: 'vector_store_id'.

Either provide a default of None to execute_batch_job's vector_store_id parameter, or pass vector_store_id=None explicitly in the start_collection_batch_job call.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/create_collection.py` around lines 243 -
253, The first batch enqueue call to start_collection_batch_job is missing the
required vector_store_id expected by execute_batch_job, causing a TypeError; fix
it by passing vector_store_id=None in the start_collection_batch_job invocation
(where project_id/job_id/trace_id are passed) so execute_batch_job receives the
argument, or alternatively add a default vector_store_id: Optional[...] = None
to execute_batch_job's signature; reference start_collection_batch_job and
execute_batch_job when making the change.

Comment on lines +304 to +311
"""
Phase 2: Upload one batch of documents to the vector store.
- Uploads the batch; any failures within the batch are retried inline by _upload_batch_with_retry
- Raises immediately if all retries for the batch are exhausted
- Checkpoints progress to the DB
- If more batches remain, queues the next batch task
- If this is the last batch, finalizes: creates Collection, links docs, marks job SUCCESSFUL
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Docstring references a function that doesn't exist.

The docstring claims "any failures within the batch are retried inline by _upload_batch_with_retry", but that function isn't defined in this file. The actual code calls provider.create(...) (line 354) which delegates to vector_store_crud.update_batch; whatever retry semantics live there are not visible here. Either implement inline retry (you have _retry_failed_uploads already, see prior comment) or update the docstring to reflect what really happens.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/create_collection.py` around lines 304 -
311, Update the Phase 2 docstring to remove the reference to the non-existent
_upload_batch_with_retry and instead describe the actual behavior: that the code
calls provider.create(...) which delegates to vector_store_crud.update_batch,
and that inline retries are handled by _retry_failed_uploads (if used) or by the
underlying vector_store_crud; ensure the docstring accurately states that failed
items are retried via _retry_failed_uploads or the vector_store_crud retry
semantics, and that the function still checkpoints progress, queues next batch,
and finalizes the collection on the last batch.

Comment on lines +30 to +59
def upload_files(
self,
storage: CloudStorage,
docs: list[Document],
project_id: int,
) -> None:
for doc in docs:
if self.get_existing_file_id(doc):
continue
try:
content = storage.get(doc.object_store_url)
if doc.file_size_kb is None:
doc.file_size_kb = round(len(content) / 1024, 2)
f_obj = BytesIO(content)
f_obj.name = doc.fname
uploaded = self.client.files.create(file=f_obj, purpose="assistants")
doc.openai_file_id = uploaded.id
with Session(engine) as session:
document_crud = DocumentCrud(session, project_id)
db_doc = document_crud.read_one(doc.id)
db_doc.openai_file_id = uploaded.id
db_doc.file_size_kb = doc.file_size_kb
document_crud.update(db_doc)
except Exception as err:
logger.error(
"[OpenAIProvider.upload_files] Failed to upload file | doc_id=%s, error=%s",
doc.id,
str(err),
exc_info=True,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

upload_files silently swallows per-document failures, leaving inconsistent state.

When storage.get or client.files.create raises, the exception is logged but not re-raised. The loop proceeds to the next doc, leaving doc.openai_file_id (and possibly doc.file_size_kb) unset on the failed doc. Two concrete downstream consequences:

  1. In create_collection.execute_setup_job (line 222), total_size_kb = sum(doc.file_size_kb for doc in flat_docs) will raise TypeError: unsupported operand type(s) for +: 'float' and 'NoneType' for any doc that failed upload, masking the real OpenAI/storage error with a confusing arithmetic error.
  2. The doc without an openai_file_id still gets passed into vector_store_crud.update_batch, leading to silent data loss or a separate failure inside the vector-store upload path.

Either fail fast on the first error, or collect failures and surface a structured result so callers can decide. Continuing past failures while logging is the worst of both worlds.

🛡️ Suggested approach
     def upload_files(
         self,
         storage: CloudStorage,
         docs: list[Document],
         project_id: int,
     ) -> None:
+        failed: list[tuple[Document, Exception]] = []
         for doc in docs:
             if self.get_existing_file_id(doc):
                 continue
             try:
                 content = storage.get(doc.object_store_url)
                 ...
             except Exception as err:
                 logger.error(
                     "[OpenAIProvider.upload_files] Failed to upload file | doc_id=%s, error=%s",
                     doc.id,
                     str(err),
                     exc_info=True,
                 )
+                failed.append((doc, err))
+        if failed:
+            ids = [str(d.id) for d, _ in failed]
+            raise RuntimeError(f"Failed to upload {len(failed)} document(s): {ids}")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/collections/providers/openai.py` around lines 30 - 59,
The upload_files loop in OpenAIProvider.upload_files currently logs per-document
exceptions and continues, leaving docs with None file_size_kb/openai_file_id and
causing downstream TypeError or silent failures; modify upload_files to either
(A) fail-fast by re-raising the caught exception after logging so callers (e.g.,
create_collection.execute_setup_job) can stop and surface the real error, or (B)
accumulate per-doc failures into a structured result (e.g., list of successes
and failures) and return that to callers so they can decide (and avoid passing
docs without openai_file_id to vector_store_crud.update_batch); update the
function signature and callers accordingly (refer to upload_files,
create_collection.execute_setup_job, and vector_store_crud.update_batch) so
callers handle the returned error info or the propagated exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant