From 23884f22c920007609b4253b6bd3fcab2692b6aa Mon Sep 17 00:00:00 2001 From: Anirudha Acharya <127017012+anirudhaacharyap@users.noreply.github.com> Date: Sat, 2 May 2026 09:20:36 +0530 Subject: [PATCH] feat(api): implement sequential batch ingestion endpoint (Phase 1) --- src/api/routes/memory.py | 45 ++++++++++++++++++++++++++++++++++++++++ src/api/schemas.py | 13 ++++++++++++ 2 files changed, 58 insertions(+) diff --git a/src/api/routes/memory.py b/src/api/routes/memory.py index 427c94a..3397a69 100644 --- a/src/api/routes/memory.py +++ b/src/api/routes/memory.py @@ -23,6 +23,8 @@ ) from src.api.schemas import ( APIResponse, + BatchIngestRequest, + BatchIngestResponse, DomainResult, IngestRequest, IngestResponse, @@ -573,6 +575,49 @@ def _safe_classifications(result: Dict[str, Any]) -> list: return [] +# POST /v1/memory/batch-ingest +@router.post( + "/batch-ingest", + response_model=APIResponse, + summary="Ingest multiple conversation turns into long-term memory sequentially", +) +async def batch_ingest_memory(req: BatchIngestRequest, request: Request, user: dict = Depends(require_api_key)): + start = time.perf_counter() + pipeline = get_ingest_pipeline() + user_id = user.get("username") or user.get("name") or user["id"] + + results = [] + + for item in req.items: + result = await asyncio.wait_for( + pipeline.run( + user_query=item.user_query, + agent_response=item.agent_response or "Acknowledged.", + user_id=user_id, + session_datetime=item.session_datetime, + image_url=item.image_url, + effort_level=item.effort_level, + ), + timeout=120.0 + ) + + data = IngestResponse( + model=_model_name(pipeline.model), + classification=_safe_classifications(result), + profile=_build_domain_result(result.get("profile_judge"), result.get("profile_weaver")), + temporal=_build_domain_result(result.get("temporal_judge"), result.get("temporal_weaver")), + summary=_build_domain_result(result.get("summary_judge"), result.get("summary_weaver")), + image=_build_domain_result(result.get("image_judge"), result.get("image_weaver")), + ) + results.append(data) + + response_data = BatchIngestResponse(results=results) + + elapsed = round((time.perf_counter() - start) * 1000, 2) + return _wrap(request, response_data, elapsed) + + + # POST /v1/memory/retrieve @router.post( "/retrieve", diff --git a/src/api/schemas.py b/src/api/schemas.py index 26b1901..b7ee122 100644 --- a/src/api/schemas.py +++ b/src/api/schemas.py @@ -102,6 +102,19 @@ class IngestResponse(BaseModel): image: Optional[DomainResult] = None +class BatchIngestRequest(BaseModel): + """Store multiple new memories in a single batch.""" + items: List[IngestRequest] = Field( + ..., min_length=1, max_length=100, + description="List of conversation turns to ingest" + ) + +class BatchIngestResponse(BaseModel): + """Response for a batch ingest operation.""" + results: List[IngestResponse] = Field(default_factory=list) + + + # ── Retrieve (answer a question from memory) ────────────────────────────── class RetrieveRequest(BaseModel):