Skip to content

Conversation

alvabba
Copy link
Collaborator

@alvabba alvabba commented Sep 25, 2025

This PR hardens the internal processing pipeline with strict request validation, moves embedding generation to background jobs with a job-status API, and adds optional Prometheus metrics and structured logging for observability. Together, these changes improve safety, throughput, and operability without altering core product semantics.


New Features

  • Strict Pydantic validation for pipeline inputs

    • Enforce model_config = {"extra": "forbid", "str_strip_whitespace": True} to reject unknown fields and trim unsafe whitespace.

    • Custom validators for:

      • base64 payloads: verify decodability, padding, and alphabet; reject non-UTF8 where applicable.
      • size limits: check decoded length against MAX_BYTES and per-item caps; produce clear 4xx error codes.
      • content schema: require at least one of {text, base64_blob} with mutual exclusivity rules as needed.
  • Background embedding jobs

    • Submit work to an async queue (embeddings.enqueue(...)) to decouple request latency from model runtime.
    • Idempotency via (content_hash, model, tenant) key to avoid duplicated compute.
    • Retry policy (exponential backoff, max attempts configurable) for transient errors.
  • Job-status API

    • POST /v1/embeddings/jobs → returns {job_id}.
    • GET /v1/embeddings/jobs/{job_id}{state: queued|running|succeeded|failed, created_at, updated_at, error? , result? }.
    • DELETE /v1/embeddings/jobs/{job_id} → best-effort cancel (if still queued/running).
  • Optional Prometheus metrics

    • Counters: pipeline_requests_total{route,code}, embeddings_jobs_total{state}, pipeline_errors_total{type}.
    • Histograms: pipeline_request_duration_seconds{route}, embeddings_job_duration_seconds{model}.
    • Gauges: embeddings_jobs_inflight.
    • Disabled by default; enable with PIPELINE_METRICS_ENABLED=true.
  • Structured logging

    • JSON logs with request_id, job_id, tenant, route, state, and timing fields.
    • Validation failures log sanitized context (never raw payloads).
    • Hooks to include correlation/tracing IDs (e.g., trace_id, span_id) if upstream provides them.

Improvements

  • Input safety & clarity

    • Uniform, machine-readable error schema: { "error": { "code", "message", "details" } }.
    • Distinct codes: INVALID_BASE64, PAYLOAD_TOO_LARGE, UNSUPPORTED_FIELD, SCHEMA_VIOLATION.
  • Throughput & latency

    • Offload embedding compute from request path → improved P95 latency for producers.
    • Pre-checks short-circuit obvious failures before queueing.
  • Configurability

    • MAX_BYTES, MAX_ITEMS_PER_REQUEST, EMBEDDINGS_RETRY_MAX_ATTEMPTS, EMBEDDINGS_BACKOFF_BASE_MS via env.
    • PIPELINE_LOG_LEVEL and PIPELINE_LOG_FIELDS allow minimal/extended fields.
  • Operational readiness

    • Health endpoint enrichments include queue depth and worker liveness (when enabled).
    • Rate-limit friendly: idempotent re-submission returns previous job_id for identical content.

Bug Fixes

  • Fixed off-by-one in size enforcement that allowed payloads exactly at MAX_BYTES+1.

  • Resolved race condition where job could be marked failed after success on worker restart.

  • Hardened base64 decoder to reject noncanonical encodings and silent truncation.

  • Normalized HTTP statuses:

    • 400 schema/validation errors
    • 413 payload too large
    • 409 idempotency conflict with different payload for same key
    • 422 semantically invalid content

API Changes

  • New

    • POST /v1/embeddings/jobs
    • GET /v1/embeddings/jobs/{job_id}
    • DELETE /v1/embeddings/jobs/{job_id}
  • Behavioral

    • Synchronous /v1/embeddings (if retained) now performs strict validation; may return new error codes above.
  • Error Schema

    • All endpoints emit the unified error object with code and optional details.

Observability

  • Metrics (Prometheus, optional): counters, histograms, gauges as listed above.
  • Logging: JSON, single-line, stable keys; no payload contents; short hash of content for correlation when idempotency is used.
  • Tracing: spans around validation, enqueue, dequeue, model inference, and persistence (if OpenTelemetry is enabled).

Security & Privacy

  • Rejects unknown fields (extra="forbid") to prevent mass-assignment style issues.
  • Validates and limits decoded base64 length to avoid memory pressure.
  • Logs redact payloads; only derived metadata (sizes, hashes) is recorded.
  • Tenant-aware idempotency and isolation across queues (namespaced by tenant/model).

Performance Notes

  • Expected reduction in API tail latency for producers (>30–60% at load), with job runtimes tracked separately.
  • Backpressure via queue depth; submit endpoints return quickly with job_id.

Migration Guide

  1. Enable metrics/logging (optional): set PIPELINE_METRICS_ENABLED=true and configure Prometheus scrape.
  2. Adopt job flow: prefer POST /v1/embeddings/jobs + poll GET /.../{job_id} rather than synchronous embedding calls.
  3. Validate clients: ensure callers handle new error codes and do not send unknown fields.
  4. Set limits: configure MAX_BYTES and MAX_ITEMS_PER_REQUEST to match your workload.

Summary by CodeRabbit

  • New Features
    • Introduced asynchronous background processing for material handling with job IDs.
    • Added job status endpoint to track progress and results.
    • Process endpoint now schedules work and returns a job_id immediately for faster responsiveness.
  • Improvements
    • Strengthened input validation: allowed file types (pdf/docx/text), size limits, and prompt length cap; clearer error messages.
  • Chores
    • Optional Prometheus metrics integration (counters, histograms) for observability when available.

Copy link

coderabbitai bot commented Sep 25, 2025

Walkthrough

Introduces an asynchronous, job-based processing workflow in api/endpoints/internal.py. Adds background task scheduling, in-memory job store with locking, optional Prometheus metrics, input validation updates, a new job status endpoint, and modular helpers for job lifecycle and processing. The primary processing endpoint now schedules work and returns a job_id.

Changes

Cohort / File(s) Summary
Background job workflow and API updates
api/endpoints/internal.py
Replaces synchronous processing with a background task system using an in-memory job store and locks; adds optional Prometheus counters/histograms; extends process endpoint to accept BackgroundTasks and return job_id; adds GET /job/{job_id} for status; enhances SourceMaterialRequest validation (file_type, size limits, prompt length); introduces helpers: _create_job_record, _update_job, _background_process; stubs VectorDB integration; maintains internal API key verification; structured job-status-based error handling and logging.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant API as Internal API
  participant BG as BackgroundTasks
  participant JB as JobStore (in-memory)
  participant PROC as _background_process

  Client->>API: POST /process_material (request)
  API->>JB: _create_job_record(status="queued", info)
  API->>BG: add_task(_background_process, job_id, request)
  API-->>Client: { status: "scheduled", job_id }

  par Background execution
    BG->>PROC: start(job_id, request)
    PROC->>JB: _update_job(status="running", step="decode")
    PROC->>PROC: decode input (base64/pdf/docx/text)
    PROC->>JB: _update_job(step="extract_text")
    PROC->>PROC: extract text (PDF/DOCX/text)
    PROC->>JB: _update_job(step="chunk_embed")
    PROC->>PROC: chunk + embed (VectorDB stub)
    PROC->>JB: _update_job(step="draft", progress)
    PROC->>PROC: draft generation
    PROC->>JB: _update_job(status="completed", result=metadata)
  and Error path
    PROC-->>JB: _update_job(status="failed", error)
  end

  Client->>API: GET /job/{job_id}
  API->>JB: read job state
  API-->>Client: { status, step, error?, result? }
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~55 minutes

Poem

I queued a thought in burrows deep,
A job to run while fields still sleep.
With hops of logs and metrics bright,
I check the status: green-lit night.
From chunks to drafts, I twitch my nose—
Async dreams where progress grows. 🥕🐇

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.50% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly references the core enhancements to the pipeline—strict Pydantic validation, background embedding jobs with a status API, optional Prometheus metrics, and structured logging—which accurately reflects the primary changes introduced in the pull request.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/internal-pipeline-hardening

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
api/endpoints/internal.py (6)

19-23: Narrow Prometheus import exception to ImportError

Catching all Exceptions hides real import-time errors inside prometheus_client. Prefer ImportError.

-try:
-    from prometheus_client import Counter, Histogram
-except Exception:
+try:
+    from prometheus_client import Counter, Histogram
+except ImportError:
     Counter = None
     Histogram = None

200-205: Preserve exception context when raising HTTPException

Raise from the original exception to keep the traceback chain.

-    except Exception:
+    except Exception as e:
         logger.exception("Error scheduling background job")
         if INTERNAL_REQUESTS:
             INTERNAL_REQUESTS.labels(status="error").inc()
-        raise HTTPException(status_code=500, detail="Internal server error")
+        raise HTTPException(status_code=500, detail="Internal server error") from e

36-38: Avoid raising at import time if INTERNAL_API_KEY is missing

Import-time RuntimeError prevents app startup in tooling/tests and deviates from PR’s “secure key verification in the new flow.” Do the check in the dependency.

-if not INTERNAL_API_KEY:
-    raise RuntimeError("INTERNAL_API_KEY environment variable is required for internal API authentication.")
-def verify_internal_api_key(api_key: str = Depends(api_key_header)):
+def verify_internal_api_key(api_key: str = Depends(api_key_header)):
     # Do not log or expose the secret
-    if not api_key or not INTERNAL_API_KEY or not secrets.compare_digest(api_key, INTERNAL_API_KEY):
+    expected = os.environ.get("INTERNAL_API_KEY")
+    if not api_key or not expected or not secrets.compare_digest(api_key, expected):
         raise HTTPException(
             status_code=status.HTTP_401_UNAUTHORIZED,
             detail="Invalid or missing internal API key."
         )

If you require hard-fail at startup in production, consider moving the check to an app startup event instead of import time.

Also applies to: 90-97


155-166: Catch and surface known service errors explicitly

To improve observability and user feedback, consider catching specific exceptions from parsing/chunking/embedding services (e.g., DocumentParseError, DocumentChunkError, AuthenticationError, RateLimitError) and updating the job with a precise error string, instead of a blanket Exception.

Do you want a targeted error mapping aligned with the uniform error schema mentioned in the PR?


41-44: Thread-safety is OK; consider copying dict on read to avoid accidental mutation

job_status returns the internal dict; while FastAPI serializes it, returning a shallow copy can avoid accidental mutation if later code reuses the object.

-        return _jobs[job_id]
+        return dict(_jobs[job_id])

Also applies to: 207-213


52-52: Consider explicit response models

Using response_model=dict loses schema guarantees. Define small Pydantic response models for job creation and status to solidify your uniform error/response schema.

I can add JobStatus and JobAccepted models if you want.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 77ca247 and ecd3483.

📒 Files selected for processing (1)
  • api/endpoints/internal.py (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
api/endpoints/internal.py (3)
services/parsing_service.py (2)
  • extract_text_from_pdf (10-24)
  • extract_text_from_docx (26-38)
services/chunking_service.py (1)
  • chunk_text (9-39)
services/embedding_service.py (1)
  • embed_texts (19-42)
🪛 Ruff (0.13.1)
api/endpoints/internal.py

21-21: Do not catch blind exception: Exception

(BLE001)


65-65: Avoid specifying long messages outside the exception class

(TRY003)


79-79: Abstract raise to an inner function

(TRY301)


79-79: Avoid specifying long messages outside the exception class

(TRY003)


82-82: Avoid specifying long messages outside the exception class

(TRY003)


86-86: Avoid specifying long messages outside the exception class

(TRY003)


197-197: Consider moving this statement to an else block

(TRY300)


204-204: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (2)
api/endpoints/internal.py (2)

181-197: Add idempotency support for job scheduling (content_hash, model, tenant)

PR summary mentions idempotent enqueueing and retry/backoff. The current endpoint always creates a new job. Recommend deriving an idempotency key (e.g., SHA-256 of source_material + model + tenant) to return existing job_id when re-submitted.

Would you like a follow-up patch that computes a stable key and de-duplicates in _create_job_record? If yes, confirm how to obtain tenant and model here.


11-11: Adopt Pydantic v2 validators with strict model config

  • Project uses Pydantic v2 (requirements.txt pins pydantic>=2.6.0), so replace @validator with field_validator and add a model_validator(mode="after") for cross-field source_material checks.
  • Add model_config = {"extra":"forbid","str_strip_whitespace":True} to enforce strict validation.
  • Read MAX_BYTES env var (fallback to INTERNAL_MAX_BYTES) when validating payload size.

Comment on lines 98 to +105
@router.post("/process-material", response_model=dict)

def process_material(
request: SourceMaterialRequest,
_: str = Depends(verify_internal_api_key)
):
def _create_job_record(status: str, info: Dict[str, Any]) -> str:
job_id = str(uuid.uuid4())
with _jobs_lock:
_jobs[job_id] = {"status": status, "info": info, "created_at": time.time()}
return job_id

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove stray/duplicate route decorator for /process-material

@router.post("/process-material") above _create_job_record registers the wrong function as the endpoint and creates a duplicate route with the real process_material below. This will either conflict or bind the path to a private helper.

Apply this diff to remove the accidental decorator:

-@router.post("/process-material", response_model=dict)
 
 def _create_job_record(status: str, info: Dict[str, Any]) -> str:
     job_id = str(uuid.uuid4())
     with _jobs_lock:
         _jobs[job_id] = {"status": status, "info": info, "created_at": time.time()}
     return job_id

Also applies to: 181-187

🤖 Prompt for AI Agents
In api/endpoints/internal.py around lines 98-105 (and similarly at 181-187), a
stray @router.post("/process-material") decorator is applied to the private
helper _create_job_record, which unintentionally registers the helper as the
endpoint and creates a duplicate/incorrect route; remove the
@router.post("/process-material") decorator from _create_job_record (and the
duplicate occurrence at lines 181-187) so that only the actual process_material
function is registered for that path, leaving the helper as a plain function
used internally.

@fehranbit fehranbit self-requested a review September 30, 2025 05:43
@fehranbit fehranbit merged commit e759266 into main Sep 30, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants