Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 134 additions & 50 deletions api/endpoints/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@
import os
import secrets

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from typing import Optional
from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, Any
import base64
import binascii
import time
import threading
import uuid

try:
from prometheus_client import Counter, Histogram
except Exception:
Counter = None
Histogram = None
import tempfile
import logging
from services.parsing_service import extract_text_from_pdf, extract_text_from_docx
from services.chunking_service import chunk_text
from services.embedding_service import embed_texts
from services.vector_db_service import VectorDBClient
# VectorDBClient intentionally not imported by default here; integrate in production


OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") # used by embedding service
Expand All @@ -27,13 +38,53 @@

router = APIRouter(prefix="/internal", tags=["internal"])

# Simple in-memory job store for background processing (for demo/testing only)
_jobs: Dict[str, Dict[str, Any]] = {}
_jobs_lock = threading.Lock()

# Metrics (optional)
if Counter:
INTERNAL_REQUESTS = Counter("internal_requests_total", "Internal API requests", ["status"])
EMBEDDING_DURATION = Histogram("embedding_duration_seconds", "Embedding generation duration")
else:
INTERNAL_REQUESTS = EMBEDDING_DURATION = None



class SourceMaterialRequest(BaseModel):
source_material: str # Base64-encoded file content for PDF/DOCX, or plain text
prompt: str
source_material: str = Field(..., description="Base64-encoded file content for PDF/DOCX, or plain text")
prompt: str = Field(..., max_length=2000)
metadata: Optional[dict] = None
file_type: Optional[str] = None # e.g., 'pdf', 'docx', 'text'
file_type: Optional[str] = Field(None, description="pdf|docx|text")

@validator("file_type")
def validate_file_type(cls, v):
if v is None:
return v
if v not in {"pdf", "docx", "text"}:
raise ValueError("file_type must be one of: pdf, docx, text")
return v

@validator("source_material")
def base64_or_text_size(cls, v, values):
# If file_type is text or None, allow plain text up to a configured size
file_type = values.get("file_type")
max_bytes = int(os.getenv("INTERNAL_MAX_BYTES", str(5 * 1024 * 1024)))
if file_type in ("pdf", "docx"):
# validate base64 roughly
try:
# base64 length roughly 4/3 of the binary size
approx = (len(v) * 3) // 4
if approx > max_bytes:
raise ValueError("source_material too large")
base64.b64decode(v, validate=True)
except (binascii.Error, ValueError) as e:
raise ValueError("Invalid or too-large base64 source_material") from e
else:
# plain text size check
if len(v.encode("utf-8", errors="ignore")) > max_bytes:
raise ValueError("source_material too large")
return v

# Dependency for internal authentication
def verify_internal_api_key(api_key: str = Depends(api_key_header)):
Expand All @@ -46,39 +97,42 @@ def verify_internal_api_key(api_key: str = Depends(api_key_header)):

@router.post("/process-material", response_model=dict)

def process_material(
request: SourceMaterialRequest,
_: str = Depends(verify_internal_api_key)
):
def _create_job_record(status: str, info: Dict[str, Any]) -> str:
job_id = str(uuid.uuid4())
with _jobs_lock:
_jobs[job_id] = {"status": status, "info": info, "created_at": time.time()}
return job_id

Comment on lines 98 to +105
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove stray/duplicate route decorator for /process-material

@router.post("/process-material") above _create_job_record registers the wrong function as the endpoint and creates a duplicate route with the real process_material below. This will either conflict or bind the path to a private helper.

Apply this diff to remove the accidental decorator:

-@router.post("/process-material", response_model=dict)
 
 def _create_job_record(status: str, info: Dict[str, Any]) -> str:
     job_id = str(uuid.uuid4())
     with _jobs_lock:
         _jobs[job_id] = {"status": status, "info": info, "created_at": time.time()}
     return job_id

Also applies to: 181-187

🤖 Prompt for AI Agents
In api/endpoints/internal.py around lines 98-105 (and similarly at 181-187), a
stray @router.post("/process-material") decorator is applied to the private
helper _create_job_record, which unintentionally registers the helper as the
endpoint and creates a duplicate/incorrect route; remove the
@router.post("/process-material") decorator from _create_job_record (and the
duplicate occurrence at lines 181-187) so that only the actual process_material
function is registered for that path, leaving the helper as a plain function
used internally.


def _update_job(job_id: str, **kwargs: Any) -> None:
with _jobs_lock:
if job_id in _jobs:
_jobs[job_id].update(kwargs)


def _background_process(job_id: str, request: SourceMaterialRequest) -> None:
logger = logging.getLogger(__name__)
try:
# Step 1: Parse document text
text = None
_update_job(job_id, status="processing")

import base64
import binascii
# Decode/parse
tmp_file = None
text = None
try:
if request.file_type == "pdf":
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", mode="wb") as tmp:
try:
data = base64.b64decode(request.source_material, validate=True)
except (binascii.Error, ValueError) as err:
raise HTTPException(status_code=400, detail="Invalid base64-encoded source_material for PDF upload.") from err
data = base64.b64decode(request.source_material)
tmp.write(data)
tmp.flush()
tmp_file = tmp.name
text = extract_text_from_pdf(tmp.name)
text = extract_text_from_pdf(tmp_file)
elif request.file_type == "docx":
with tempfile.NamedTemporaryFile(delete=False, suffix=".docx", mode="wb") as tmp:
try:
data = base64.b64decode(request.source_material, validate=True)
except (binascii.Error, ValueError) as err:
raise HTTPException(status_code=400, detail="Invalid base64-encoded source_material for DOCX upload.") from err
data = base64.b64decode(request.source_material)
tmp.write(data)
tmp.flush()
tmp_file = tmp.name
text = extract_text_from_docx(tmp.name)
text = extract_text_from_docx(tmp_file)
else:
text = request.source_material
finally:
Expand All @@ -89,40 +143,70 @@ def process_material(
pass

if not text:
raise HTTPException(status_code=400, detail="No text extracted from source material.")
_update_job(job_id, status="failed", error="No text extracted")
return

# Step 2: Chunk text
# Chunk
chunks = chunk_text(text)
if not chunks:
raise HTTPException(status_code=400, detail="No chunks generated from text.")
_update_job(job_id, status="failed", error="No chunks generated")
return

# Step 3: Generate embeddings
# Embeddings (with optional histogram)
try:
embeddings = embed_texts(chunks)
if EMBEDDING_DURATION:
with EMBEDDING_DURATION.time():
embeddings = embed_texts(chunks)
else:
embeddings = embed_texts(chunks)
embedding_count = len(embeddings) if embeddings else 0
except Exception as e:
logger.exception("Embedding generation failed")
raise HTTPException(status_code=500, detail=f"Embedding generation failed: {e}")
_update_job(job_id, status="failed", error=str(e))
return

# Step 4: Store embeddings in vector DB (stub)
# NOTE: Replace with real API key/environment for Pinecone
# vector_db = VectorDBClient(api_key="YOUR_PINECONE_API_KEY", environment="YOUR_PINECONE_ENV")
# vector_db.create_index("authormaton-index", dimension=len(embeddings[0]))
# vector_db.upsert_vectors(embeddings, [str(i) for i in range(len(embeddings))])

# Step 5: Synthesize draft (stub)
# TODO: upsert embeddings into vector DB
# For now, we store counts and a draft
draft = f"Draft generated for prompt: {request.prompt}\n\n" + "\n---\n".join(chunks[:3])

return {
"status": "success",
"received_material_length": len(request.source_material),
"prompt": request.prompt,
"metadata": request.metadata or {},
_update_job(job_id, status="completed", result={
"num_chunks": len(chunks),
"embedding_count": embedding_count,
"draft_preview": draft,
}
except HTTPException as e:
# Preserve intended status codes/messages
})
except Exception:
logger.exception("Unexpected error in background job")
_update_job(job_id, status="failed", error="Unexpected error")


@router.post("/process-material", response_model=dict)
def process_material(
request: SourceMaterialRequest,
background_tasks: BackgroundTasks,
_: str = Depends(verify_internal_api_key),
):
"""Validate input and schedule background processing; return job id."""
logger = logging.getLogger(__name__)
try:
# Create job record and schedule background processing
job_id = _create_job_record("pending", {"prompt_len": len(request.prompt)})
background_tasks.add_task(_background_process, job_id, request)

if INTERNAL_REQUESTS:
INTERNAL_REQUESTS.labels(status="accepted").inc()

return {"status": "accepted", "job_id": job_id}
except HTTPException:
raise
except Exception as e:
logger.exception("Error in process_material pipeline")
raise HTTPException(status_code=500, detail="Internal server error.") from e
except Exception:
logger.exception("Error scheduling background job")
if INTERNAL_REQUESTS:
INTERNAL_REQUESTS.labels(status="error").inc()
raise HTTPException(status_code=500, detail="Internal server error")


@router.get("/job/{job_id}")
def job_status(job_id: str, _: str = Depends(verify_internal_api_key)):
with _jobs_lock:
if job_id not in _jobs:
raise HTTPException(status_code=404, detail="Job not found")
return _jobs[job_id]