Skip to content

feat(sync): write-through Brain.correct() → /api/v1/ingest (day 3 of #194)#200

Merged
Gradata merged 2 commits into
mainfrom
feat/brain-correct-write-through-day3
May 17, 2026
Merged

feat(sync): write-through Brain.correct() → /api/v1/ingest (day 3 of #194)#200
Gradata merged 2 commits into
mainfrom
feat/brain-correct-write-through-day3

Conversation

@Gradata
Copy link
Copy Markdown
Owner

@Gradata Gradata commented May 17, 2026

Day 3 of #194 — wire Brain.correct() to write-through cloud sync.

What

Every Brain.correct(draft, final) now enqueues into local sync_queue + a background daemon thread POSTs each row to api.gradata.ai/api/v1/ingest.

Why

Removes the dependency on session-end hooks (which barely fire for long-running agents on hermes --continue) and the cron sync band-aid. See /home/olive/gradata-office-hours-memo.md for the full architectural rationale.

Env vars

  • GRADATA_API_KEY required to start worker
  • GRADATA_DISABLE_WRITE_THROUGH=1: opt-out
  • GRADATA_CLOUD_INGEST_URL (default https://api.gradata.ai/api/v1/ingest)
  • GRADATA_SYNC_TICK_SEC (default 30)

Failure modes

  • Cloud unreachable: row stays pending, retried next tick
  • 422 permanent: poison row marked synced+failed
  • 429: backoff, bail batch
  • No API key: worker doesn't start, local correct() unaffected
  • enqueue exception: caught, local correct() always succeeds

Tests

38 passed (test_sync_queue × 9, test_sync_worker × 9, test_brain_write_through × 5, test_brain × 15 regression).

Series

…194)

Every Brain.correct(draft, final) now enqueues the correction into a local sync_queue table and a background daemon thread drains the queue by POSTing each row to api.gradata.ai/api/v1/ingest.

Removes the dependency on session-end hooks and the cron sync band-aid. See /home/olive/gradata-office-hours-memo.md for the architectural rationale.

## Components
- src/gradata/_sync_queue.py: enqueue/peek/mark_synced/mark_failed CRUD + enqueue_correction helper
- src/gradata/_sync_worker.py: daemon-thread SyncWorker with start/stop(drain)/_tick, handles 2xx + dedup + 422 poison + 429 backoff + 5xx + network
- src/gradata/brain.py: Brain.__init__ starts worker when API key resolvable AND GRADATA_DISABLE_WRITE_THROUGH != 1; Brain.correct enqueues post-local-write; Brain.close drains and stops
- src/gradata/_migrations/__init__.py + onboard.py: sync_queue table + idx_sync_queue_pending (idempotent, no-op if #195 lands first)

## Env vars
- GRADATA_API_KEY (existing): write-through requires it
- GRADATA_DISABLE_WRITE_THROUGH=1: opt-out, fall back to hook+cron path
- GRADATA_CLOUD_INGEST_URL (default https://api.gradata.ai/api/v1/ingest)
- GRADATA_SYNC_TICK_SEC (default 30)

## Failure modes
- Cloud unreachable: row stays pending, retried next tick
- 422 permanent: poison row marked synced + failed so it doesn't block batch
- 429: bail batch, retry next tick
- No API key: worker doesn't start, no enqueue, local correct() unaffected
- _write_through_enqueue exception: caught, local correct() always succeeds

## Tests
- tests/test_sync_queue.py (9): existing CRUD primitives + enqueue_correction
- tests/test_sync_worker.py (9): HTTPServer stub /ingest with all status branches + at-least-once + stop-drain
- tests/test_brain_write_through.py (5): enqueue happy path, disabled, no-key, cloud-unreachable, sabotaged enqueue

38 passed locally. Series: #195 (day 1 SDK queue), Gradata/gradata-cloud#58 merged (day 2 cloud receiver), this PR (day 3 wire-up).
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 17, 2026

Review Change Stack

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: a08a8bf9-1013-463a-be64-954a778f046e

📥 Commits

Reviewing files that changed from the base of the PR and between 0561c82 and 30d7dfe.

📒 Files selected for processing (9)
  • Gradata/src/gradata/_migrations/__init__.py
  • Gradata/src/gradata/_sync_queue.py
  • Gradata/src/gradata/_sync_worker.py
  • Gradata/src/gradata/brain.py
  • Gradata/src/gradata/onboard.py
  • Gradata/tests/test_brain_write_through.py
  • Gradata/tests/test_daemon_sync.py
  • Gradata/tests/test_sync_queue.py
  • Gradata/tests/test_sync_worker.py

📝 Walkthrough
  • Implements write-through cloud sync for Brain.correct(): each correction is enqueued to a local SQLite queue and posted to cloud ingest endpoint (default https://api.gradata.ai/api/v1/ingest) by a background daemon thread
  • Worker starts automatically when API key is resolvable (GRADATA_API_KEY env var or ~/.gradata/key) and can be disabled via GRADATA_DISABLE_WRITE_THROUGH=1
  • Configurable sync behavior via environment variables: GRADATA_API_KEY, GRADATA_DISABLE_WRITE_THROUGH, GRADATA_CLOUD_INGEST_URL, GRADATA_SYNC_TICK_SEC (default 30 seconds)
  • Differentiates failure modes: transient errors (5xx, network) retry next tick; 422 marked as poison/failed to prevent retries; 429 triggers backoff and batch abort; local correct() always succeeds even if cloud sync fails
  • Adds new sync_queue table to SQLite schema with columns for payload, kind, enqueue/sync timestamps, retry tracking, and partial index for pending rows
  • New modules: _sync_queue.py (enqueue/CRUD primitives), _sync_worker.py (daemon with tick/backoff/poison handling), updated brain.py with worker lifecycle management
  • No breaking changes to existing Brain public API; Brain.close() now gracefully drains pending sync queue
  • Comprehensive test coverage: 38 tests for queue primitives, worker HTTP handling (2xx, 422, 429, 5xx, network errors), and Brain write-through scenarios

Walkthrough

This PR implements write-through cloud sync for Brain corrections. A new sync_queue SQLite table buffers corrections with timestamps and retry metadata. The _sync_queue module provides enqueue, peek, and mark operations. A background SyncWorker daemon drains the queue by POSTing to a cloud ingest endpoint and handles HTTP statuses (2xx marks synced, 429 aborts batch, 422 marks permanent, 5xx/network retries). Brain integrates the worker during construction (if API key available and write-through enabled), enqueues corrections after local processing, and drains queued items on close.

Changes

Write-through cloud sync

Layer / File(s) Summary
Schema and migration setup
src/gradata/_migrations/__init__.py, src/gradata/onboard.py
Adds sync_queue table with payload, kind, enqueue/sync timestamps, retry attempts, and error columns; includes partial index on synced_at IS NULL for pending row lookups.
Sync queue primitives and tests
src/gradata/_sync_queue.py, tests/test_sync_queue.py
Implements enqueue (JSON serialization, row id return), enqueue_correction (convenience wrapper), peek_pending (FIFO with fallback JSON decode), mark_synced (timestamp update), and mark_failed (increment attempts, truncate error); unit tests cover FIFO ordering, state transitions, constraint validation, and error truncation.
Sync worker daemon and tests
src/gradata/_sync_worker.py, tests/test_sync_worker.py
Implements SyncWorker as daemon thread that ticks repeatedly, fetching up to 50 pending rows and POSTing each to cloud ingest with bearer auth; differentiates status handling (2xx syncs, 429 batch-aborts, 422 permanent, 5xx/network transient); tests cover happy path, failure modes, retries across ticks, lifecycle (stop/drain), and at-least-once semantics when DB-write fails.
Brain write-through wiring and integration tests
src/gradata/brain.py, tests/test_brain_write_through.py
Brain resolves API key from env/file, starts worker on init if available and not disabled, enqueues corrections after local processing via _write_through_enqueue (builds cloud payload, computes event_id, manages fresh connection), stops worker on close with drain; integration tests verify worker start conditions, enqueue behavior, cloud error resilience, and local path correctness.
Daemon sync test update
tests/test_daemon_sync.py
Updates empty-brain sync test to expect heartbeat POST and non-fatal cloud failures per PR #198 behavior.

Sequence Diagram(s)

Write-through sync workflow from correction to cloud delivery:

sequenceDiagram
    participant User
    participant Brain
    participant SyncWorker as SyncWorker<br/>(daemon)
    participant Queue as sync_queue<br/>(SQLite)
    participant Cloud
    
    User->>Brain: brain.correct(draft, final)
    Brain->>Brain: local correction logic
    Brain->>Queue: enqueue_correction(brain_id, correction, event_id)
    Queue-->>Brain: row_id
    Note over Brain,Queue: enqueue best-effort, errors logged
    
    Note over SyncWorker: background tick loop
    SyncWorker->>Queue: peek_pending(limit=50)
    Queue-->>SyncWorker: list of pending rows
    
    loop for each row
        SyncWorker->>Cloud: POST /api/v1/ingest<br/>payload + bearer token
        alt 2xx success
            Cloud-->>SyncWorker: 200/201
            SyncWorker->>Queue: mark_synced(id)
        else 429 rate limit
            Cloud-->>SyncWorker: 429
            SyncWorker->>Queue: mark_failed(id, error)
            SyncWorker->>SyncWorker: abort tick, sleep
        else 422 permanent
            Cloud-->>SyncWorker: 422
            SyncWorker->>Queue: mark_failed then mark_synced
        else 5xx/network
            Cloud-->>SyncWorker: 500 or timeout
            SyncWorker->>Queue: mark_failed(id, error)
            Note over SyncWorker,Queue: continues batch, retries next tick
        end
    end
    
    User->>Brain: brain.close()
    Brain->>SyncWorker: stop(drain=True)
    SyncWorker->>SyncWorker: final synchronous tick
    SyncWorker->>Brain: join thread
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Suggested labels

feature

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/brain-correct-write-through-day3

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 OpenGrep (1.20.0)

OpenGrep fatal error (exit code 2):
┌──────────────┐
│ Opengrep CLI │
└──────────────┘

�[32m✔�[39m �[1mOpengrep OSS�[0m
�[32m✔�[39m Basic security coverage for first-party code vulnerabilities.

�[1m Loading rules from local config...�[0m
[00.25][ERROR]: Error: exception Glob.Lexer.Syntax_error("malformed glob pattern: missing ']'")
Raised at Glob__Lexer.syntax_error in file "libs/glob/Lexer.mll", line 8, characters 2-26
Called from Glob__Lexer.__ocaml_lex_token_rec in file "libs/glob/Lexer.mll", line 29, characters 26-53
Cal


Comment @coderabbitai help to get the list of available commands and usage tips.

…heartbeat

PR #198 changed empty /sync behavior to POST an empty heartbeat to cloud so brains.last_sync_at advances. The pre-#198 test still asserted mock_post.assert_not_called() — now expects exactly one call.
@Gradata Gradata marked this pull request as ready for review May 17, 2026 00:28
@Gradata Gradata merged commit 8a9a1e4 into main May 17, 2026
9 checks passed
@Gradata Gradata deleted the feat/brain-correct-write-through-day3 branch May 17, 2026 00:28
Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

Gradata pushed a commit that referenced this pull request May 17, 2026
…ve (day 4 of #194)

Brain.correct() write-through (PR #200) is now the default cloud sync path. The session_close hook's legacy cloud_sync_tick was double-writing every correction.

Default behavior: session_close skips cloud_sync_tick. Lesson graduation, pipeline, tree consolidation, lesson_applications resolution all still run as before — only the cloud sync portion is gated.

Opt-out: GRADATA_DISABLE_WRITE_THROUGH=1 restores the legacy hook-based cloud sync path AND disables write-through. The two paths are mutually exclusive: only one ever runs.

Untouched:
- daemon /sync (dashboard 'Sync Now' button)
- /api/v1/sync (bulk endpoint, used by Brain.close() drain)
- /home/olive/.gradata/sync_cron.py (safety-net cron, keep for 7-day soak)

Follow-up: after 7 days of clean write-through telemetry in cloud, the cron and legacy /api/v1/sync code paths can be removed in a separate PR.
@coderabbitai coderabbitai Bot added the feature label May 17, 2026
Gradata added a commit that referenced this pull request May 17, 2026
…ve (day 4 of #194) (#201)

Brain.correct() write-through (PR #200) is now the default cloud sync path. The session_close hook's legacy cloud_sync_tick was double-writing every correction.

Default behavior: session_close skips cloud_sync_tick. Lesson graduation, pipeline, tree consolidation, lesson_applications resolution all still run as before — only the cloud sync portion is gated.

Opt-out: GRADATA_DISABLE_WRITE_THROUGH=1 restores the legacy hook-based cloud sync path AND disables write-through. The two paths are mutually exclusive: only one ever runs.

Untouched:
- daemon /sync (dashboard 'Sync Now' button)
- /api/v1/sync (bulk endpoint, used by Brain.close() drain)
- /home/olive/.gradata/sync_cron.py (safety-net cron, keep for 7-day soak)

Follow-up: after 7 days of clean write-through telemetry in cloud, the cron and legacy /api/v1/sync code paths can be removed in a separate PR.

Co-authored-by: data-engineer <data-engineer@gradata.ai>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant