Wire batch enrich to Gemini Batch API preview flow#243
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 52 minutes and 0 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughThis PR relocates and restructures the Gemini Batch re-enrichment pipeline: the full implementation previously in Changes
Sequence DiagramsequenceDiagram
actor User
participant CLI as CLI (enrich)
participant CloudBackfill as cloud_backfill module
participant VectorStore as VectorStore/DB
participant GeminiBatch as Gemini Batch API
participant Checkpoint as Checkpoint DB
User->>CLI: enrich --mode batch --phase submit
CLI->>CloudBackfill: run_full_backfill(db_path, model)
CloudBackfill->>VectorStore: get unenriched/legacy chunks
VectorStore-->>CloudBackfill: chunk rows
CloudBackfill->>CloudBackfill: export to JSONL, sanitize PII
CloudBackfill->>GeminiBatch: submit batch with JSONL
GeminiBatch-->>CloudBackfill: batch_id
CloudBackfill->>Checkpoint: save_checkpoint(batch_id, metadata)
User->>CLI: enrich --mode batch --phase poll
CLI->>CloudBackfill: poll_gemini_batch(batch_id)
loop Poll until complete
CloudBackfill->>GeminiBatch: check batch status
GeminiBatch-->>CloudBackfill: status (processing/completed)
end
CloudBackfill->>GeminiBatch: download results JSONL
GeminiBatch-->>CloudBackfill: enriched summaries
CloudBackfill->>VectorStore: import_results(summaries)
VectorStore->>VectorStore: UPDATE chunks SET summary_v2, enrichment_version
VectorStore-->>CloudBackfill: import complete
CloudBackfill->>Checkpoint: update checkpoint (completed)
CLI-->>User: Import completed
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 271248920e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| from google import genai | ||
| except ImportError: | ||
| print("ERROR: google-genai not installed. Run: pip install google-genai") | ||
| sys.exit(1) |
There was a problem hiding this comment.
Import sys before calling sys.exit in client bootstrap
_get_genai_client() calls sys.exit(1) on missing SDK/API key, but this module no longer imports sys, so those error paths crash with NameError instead of returning the intended actionable message. This is user-visible whenever google-genai is absent or credentials are unset, and it makes CLI failure handling misleading.
Useful? React with 👍 / 👎.
| pending = get_pending_jobs(store) | ||
| summary["checked"] = len(pending) | ||
| for job in pending: | ||
| status = get_gemini_batch_state(job["batch_id"]) |
There was a problem hiding this comment.
Catch batch-state lookup errors in one-pass poller
process_pending_jobs_once() calls get_gemini_batch_state() without a per-job try/except, so a single API error (for example a 404 on an expired batch or transient transport failure) aborts the whole polling/import pass before remaining submitted jobs are processed or checkpointed. This affects both brainlayer enrich --mode batch --phase poll|import and scripts/monitor_batch_reenrichment.py, which now depend on this function.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/monitor_batch_reenrichment.py`:
- Line 111: Replace the hardcoded default DB path used in parser.add_argument
(currently cloud_backfill.DEFAULT_DB_PATH) with the shared resolver by calling
paths.get_db_path(); update the parser.add_argument("--db", ...) default to use
get_db_path() so the CLI uses the centralized path resolution. Locate the
parser.add_argument invocation in scripts/monitor_batch_reenrichment.py and
import get_db_path from paths (or the appropriate module already exporting it)
and set default=get_db_path() to ensure environment-dependent DB location is
resolved consistently.
In `@src/brainlayer/cli/__init__.py`:
- Around line 903-905: The Typer parameter validation that raises
typer.BadParameter (the checks using "if mode not in (\"realtime\",\"batch\"):
raise typer.BadParameter(...)" and the similar check at the other occurrence) is
being swallowed by a surrounding broad except Exception; update the code so
BadParameter is not caught: either move these validations outside the broad try
block, or add an explicit except typer.BadParameter: raise before the general
except Exception, ensuring typer.BadParameter is re-raised so Typer's normal
behavior occurs; apply the same change to the other validation block referenced.
In `@src/brainlayer/cloud_backfill.py`:
- Around line 25-32: The file is missing an import for the sys module which
causes NameError when _get_genai_client() calls sys.exit(1); add an import sys
at the top of the file alongside other imports so _get_genai_client can safely
call sys.exit and error handling works properly.
- Around line 1120-1123: The resume_backfill and show_status CLI entry points
construct VectorStore directly (VectorStore(...)) and therefore don't handle
lock contention BusyError consistently; change both to use the existing
open_backfill_store helper or wrap the VectorStore creation in the same
BusyError try/except logic used by open_backfill_store so a friendly CLI error
is shown instead of a traceback—update resume_backfill (and the similar block at
lines ~1190-1193) to call open_backfill_store(db_path) or replicate its
BusyError handling before calling ensure_checkpoint_table.
- Around line 816-894: The import_results function is missing WAL checkpoints
for this bulk update; before starting the loop run a PRAGMA wal_checkpoint(FULL)
on store.conn, then inside the loop perform periodic checkpoints (e.g., every
1000 processed or when (success+failed+skipped) % 1000 == 0) by executing the
same PRAGMA on store.conn to flush the WAL, and finally run one more PRAGMA
wal_checkpoint(FULL) after the loop completes (before save_checkpoint). Locate
import_results and use the existing store.conn (and its cursor) to execute the
checkpoint statements so bulk updates via store.update_reenrichment_preview and
calls like _clear_imported_preview/save_checkpoint won’t cause WAL bloat.
- Around line 1060-1063: process_pending_jobs_once currently constructs
VectorStore(db_path) directly which can raise apsw.BusyError if the DB is
locked; wrap the store open in a try/except that catches apsw.BusyError and
handles it gracefully (e.g., log and return empty counts) or reuse the existing
open_backfill_store helper used by run_full_backfill to get a store safely.
Update process_pending_jobs_once to either call open_backfill_store(db_path) or
to catch apsw.BusyError around VectorStore(...) and return/exit cleanly before
calling ensure_checkpoint_table and further processing.
- Around line 803-813: The UPDATE in _clear_imported_preview can raise
SQLITE_BUSY/BusyError and should be retried instead of failing; wrap the call to
store.conn.cursor().execute(...) (and the surrounding transaction/commit if any)
with a small retry loop that catches the database busy error (e.g.,
sqlite3.OperationalError or the BusyError type your DB layer exposes),
sleep/backoff between attempts, and re-raise only after exhausting retries so
the original exception is not masked and the chunk's preview fields get cleared
reliably for chunk_id; keep the retry limits and backoff configurable/constants
and ensure the cursor is re-obtained on each retry.
- Around line 660-679: Replace the string-based 429 detection with proper
exception typing: import google.genai.errors.ClientError (or from google.genai
import errors) and change the broad except block to first catch
errors.ClientError as e and check e.code == 429 (and attempt < max_retries - 1)
to run the retry/wait logic; keep a subsequent generic except Exception as e
branch to handle other failures and call save_checkpoint (used here with store,
jsonl_path, model, chunk_count) and return None as before. Ensure the retry
backoff (wait = 30 * (2 ** attempt)) and logging/printing behavior remain
unchanged.
In `@src/brainlayer/session_repo.py`:
- Around line 216-223: The UPDATE in the retry loop can be a no-op if the chunk
was deleted, but the code currently returns normally; change the logic in the
method containing the cursor.execute call (the loop using sets, params and
chunk_id in src/brainlayer/session_repo.py) to check cursor.rowcount after
executing the UPDATE, and if rowcount == 0 treat it as a failed update by
raising an exception (so the retry loop continues and eventual failure is
reported), and only call clear_hybrid_search_cache(getattr(self, "db_path",
None)) and return when rowcount > 0 (i.e., a real row was updated).
In `@tests/test_cloud_backfill.py`:
- Around line 384-389: The test currently uses set(exported_keys) which hides
duplicates; update the assertions in the block handling exported_keys (from
jsonl_files) to both verify the expected key set and that there are no
duplicates: assert set(exported_keys) == {"unenriched", "legacy"} and assert
len(exported_keys) == len(set(exported_keys)) (or use collections.Counter to
assert all counts == 1) so duplicate exports will fail the test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 38888ff0-4609-484c-b13f-a64c3bf7bdd2
📒 Files selected for processing (9)
scripts/cloud_backfill.pyscripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/cloud_backfill.pysrc/brainlayer/session_repo.pysrc/brainlayer/vector_store.pytests/test_batch_reenrichment_setup.pytests/test_cli_enrich.pytests/test_cloud_backfill.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Macroscope - Correctness Check
- GitHub Check: test (3.13)
- GitHub Check: test (3.11)
- GitHub Check: test (3.12)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests
**/*.py: Usepaths.py:get_db_path()for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Files:
tests/test_cli_enrich.pysrc/brainlayer/session_repo.pysrc/brainlayer/vector_store.pytests/test_batch_reenrichment_setup.pyscripts/cloud_backfill.pyscripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.pytests/test_cloud_backfill.pysrc/brainlayer/cloud_backfill.py
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Use retry logic onSQLITE_BUSYerrors; each worker must use its own database connection to handle concurrency safely
Classification must preserveai_code,stack_trace, anduser_messageverbatim; skipnoiseentries entirely and summarizebuild_loganddir_listingentries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback viaenrichment_controller.py, and Ollama as offline last-resort; allow override viaBRAINLAYER_ENRICH_BACKENDenv var
Configure enrichment rate viaBRAINLAYER_ENRICH_RATEenvironment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns:superseded_by,aggregated_into,archived_aton chunks table; exclude lifecycle-managed chunks from default search; allowinclude_archived=Trueto show history
Implementbrain_supersedewith safety gate for personal data (journals, notes, health/finance); use soft-delete forbrain_archivewith timestamp
Addsupersedesparameter tobrain_storefor atomic store-and-replace operations
Run linting and formatting with:ruff check src/ && ruff format src/
Run tests withpytest
UsePRAGMA wal_checkpoint(FULL)before and after bulk database operations to prevent WAL bloat
Files:
src/brainlayer/session_repo.pysrc/brainlayer/vector_store.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/cloud_backfill.py
🧠 Learnings (13)
📓 Common learnings
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-14T02:20:54.656Z
Learning: Request codex review, cursor review, and bugbot review for BrainLayer PRs
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T23:47:49.756Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`: `service_tier="flex"` is the intentional default for all Gemini enrichment calls. Pass-2 enrichment is asynchronous backlog work where 1–15 minute latency is acceptable, and the 50% Gemini Flex Inference discount materially reduces backlog cost. This is locked by R84b design (§8 Q2). The `BRAINLAYER_GEMINI_SERVICE_TIER` environment variable is purely an operational escape hatch (e.g. `standard`), not the intended runtime default. Do not flag `service_tier="flex"` as a concern on this code path.
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-01T01:24:44.281Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment backend priority: Groq (primary/cloud) → Gemini (fallback) → Ollama (offline last-resort), configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via `enrichment_controller.py`, and Ollama as offline last-resort; allow override via `BRAINLAYER_ENRICH_BACKEND` env var
📚 Learning: 2026-03-22T15:55:22.017Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 100
File: src/brainlayer/enrichment_controller.py:175-199
Timestamp: 2026-03-22T15:55:22.017Z
Learning: In `src/brainlayer/enrichment_controller.py`, the `parallel` parameter in `enrich_local()` is intentionally kept in the function signature (currently unused, suppressed with `# noqa: ARG001`) for API stability. Parallel local enrichment via a thread pool or process pool is planned for a future iteration. Do not flag this as dead code requiring removal.
Applied to files:
tests/test_cli_enrich.pyscripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Implement chunk lifecycle columns: `superseded_by`, `aggregated_into`, `archived_at` on chunks table; exclude lifecycle-managed chunks from default search; allow `include_archived=True` to show history
Applied to files:
src/brainlayer/session_repo.pysrc/brainlayer/vector_store.pytests/test_batch_reenrichment_setup.pysrc/brainlayer/cloud_backfill.py
📚 Learning: 2026-04-04T15:16:13.883Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-04T15:16:13.883Z
Learning: Applies to src/brainlayer/hooks/dedup_coordination.py : Session dedup coordination: SessionStart hook writes injected chunk_ids to `/tmp/brainlayer_session_{id}.json`; UserPromptSubmit hook skips already-injected chunks; skip auto-search on handoff prompts
Applied to files:
src/brainlayer/session_repo.py
📚 Learning: 2026-04-04T23:24:03.159Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-04T23:24:03.159Z
Learning: Applies to src/brainlayer/{vector_store,search}*.py : Chunk lifecycle: implement columns `superseded_by`, `aggregated_into`, `archived_at` on chunks table; exclude lifecycle-managed chunks from default search
Applied to files:
src/brainlayer/session_repo.pysrc/brainlayer/vector_store.pytests/test_batch_reenrichment_setup.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via `enrichment_controller.py`, and Ollama as offline last-resort; allow override via `BRAINLAYER_ENRICH_BACKEND` env var
Applied to files:
scripts/cloud_backfill.pyscripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.pytests/test_cloud_backfill.pysrc/brainlayer/cloud_backfill.py
📚 Learning: 2026-04-01T01:24:44.281Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-01T01:24:44.281Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment backend priority: Groq (primary/cloud) → Gemini (fallback) → Ollama (offline last-resort), configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Applied to files:
scripts/cloud_backfill.pyscripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.pytests/test_cloud_backfill.pysrc/brainlayer/cloud_backfill.py
📚 Learning: 2026-04-02T23:32:14.543Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T23:32:14.543Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment rate configurable via `BRAINLAYER_ENRICH_RATE` environment variable (default 0.2 = 12 RPM)
Applied to files:
scripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Configure enrichment rate via `BRAINLAYER_ENRICH_RATE` environment variable (default 0.2 = 12 RPM)
Applied to files:
scripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.py
📚 Learning: 2026-04-11T23:47:49.756Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T23:47:49.756Z
Learning: Applies to `src/brainlayer/enrichment_controller.py`: `service_tier="flex"` is the intentional default for all Gemini enrichment calls. Pass-2 enrichment is asynchronous backlog work where 1–15 minute latency is acceptable, and the 50% Gemini Flex Inference discount materially reduces backlog cost. This is locked by R84b design (§8 Q2). The `BRAINLAYER_GEMINI_SERVICE_TIER` environment variable is purely an operational escape hatch (e.g. `standard`), not the intended runtime default. Do not flag `service_tier="flex"` as a concern on this code path.
Applied to files:
scripts/monitor_batch_reenrichment.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/cloud_backfill.py
📚 Learning: 2026-04-11T16:54:45.631Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-11T16:54:45.631Z
Learning: Applies to `src/brainlayer/enrichment_controller.py` and `src/brainlayer/pipeline/rate_limiter.py`: Gemini API calls in the enrichment pipeline are gated by a token bucket rate limiter. The rate is controlled by `BRAINLAYER_ENRICH_RATE` (default `5/s`, burst `10`) to keep throughput inside the Gemini Flex intended envelope. This default supersedes the earlier 0.2 (12 RPM) default for the Gemini Flex integration path.
Applied to files:
src/brainlayer/cli/__init__.py
📚 Learning: 2026-04-04T23:49:56.485Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-04T23:49:56.485Z
Learning: Applies to src/brainlayer/vector_store.py : Use sqlite-vec with APSW for vector storage and retrieval
Applied to files:
tests/test_cloud_backfill.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Implement `brain_supersede` with safety gate for personal data (journals, notes, health/finance); use soft-delete for `brain_archive` with timestamp
Applied to files:
src/brainlayer/cloud_backfill.py
🔇 Additional comments (10)
src/brainlayer/cloud_backfill.py (10)
269-309: LGTM!The
save_checkpointfunction correctly implements retry logic with exponential backoff forapsw.BusyError, which aligns with the coding guidelines for handlingSQLITE_BUSYerrors.
492-590: LGTM!The export function correctly uses read operations and properly handles PII sanitization. The batch file splitting with
CHUNKS_PER_JOBkeeps individual exports within Gemini API limits.
331-372: LGTM!The
ReadOnlyBackfillStoreclass provides a clean fallback for export-only runs when the main DB is locked. Usingsqlite3withmode=roURI is appropriate for read-only access.
684-717: LGTM!The polling implementation correctly handles job state transitions, includes graceful handling for expired/not-found jobs (404), and uses adaptive backoff (capped at 5 minutes) to reduce API calls during long-running batch jobs.
757-798: LGTM!The result download function correctly handles different
destformats and defensively skips malformed JSON lines without failing the entire batch import.
899-924: LGTM!Best-effort logging with a 5-second timeout and silent exception handling is appropriate here—logging should never block the primary backfill workflow.
390-418: LGTM!The filter logic correctly identifies re-enrichment candidates: genuinely unenriched chunks (
enriched_at IS NULL) OR legacy summarized chunks needingsummary_v2population (summary IS NOT NULL). Thepending_onlyflag appropriately excludes already-previewed rows.
1233-1265: LGTM!The CLI argument parsing is clean and provides all documented modes (
--dry-run,--sample,--resume,--status,--submit-only). The--no-sanitizeflag includes an appropriate warning in its help text.
73-91: LGTM!The enrichment schema correctly defines 10 properties with
version_scopeas the only optional field, matching the "10-field JSON schema" description.
929-1057: LGTM!The main workflow correctly handles the export → submit → poll → import lifecycle with proper resource cleanup in the
finallyblock. Theallow_read_only_fallbackoption for dry-run and submit-only modes is a thoughtful design choice.
|
|
||
| def main() -> int: | ||
| parser = argparse.ArgumentParser(description="Monitor BrainLayer Gemini Batch re-enrichment progress") | ||
| parser.add_argument("--db", type=Path, default=cloud_backfill.DEFAULT_DB_PATH, help="BrainLayer DB path") |
There was a problem hiding this comment.
Resolve the default DB path via get_db_path().
This script hardcodes its default DB location through cloud_backfill.DEFAULT_DB_PATH instead of the shared path resolver, so it can monitor the wrong database if path resolution changes or becomes environment-dependent.
Suggested fix
import brainlayer.cloud_backfill as cloud_backfill
+from brainlayer.paths import get_db_path
from brainlayer.vector_store import VectorStore
@@
- parser.add_argument("--db", type=Path, default=cloud_backfill.DEFAULT_DB_PATH, help="BrainLayer DB path")
+ parser.add_argument("--db", type=Path, default=None, help="BrainLayer DB path")
@@
- args.db,
+ args.db or get_db_path(),
interval_seconds=args.interval_seconds,
log_path=args.log_path,
once=args.once,
notify=not args.no_notify,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/monitor_batch_reenrichment.py` at line 111, Replace the hardcoded
default DB path used in parser.add_argument (currently
cloud_backfill.DEFAULT_DB_PATH) with the shared resolver by calling
paths.get_db_path(); update the parser.add_argument("--db", ...) default to use
get_db_path() so the CLI uses the centralized path resolution. Locate the
parser.add_argument invocation in scripts/monitor_batch_reenrichment.py and
import get_db_path from paths (or the appropriate module already exporting it)
and set default=get_db_path() to ensure environment-dependent DB location is
resolved consistently.
| if mode not in ("realtime", "batch"): | ||
| raise typer.BadParameter(f"Invalid mode: {mode}") | ||
|
|
There was a problem hiding this comment.
Don't swallow typer.BadParameter.
These validation errors are raised inside a broad except Exception, so users get the generic error path instead of Typer's normal parameter-specific message and exit semantics.
Suggested fix
- except Exception as e:
+ except typer.BadParameter:
+ raise
+ except Exception as e:
rprint(f"[bold red]Error:[/] {e}")
raise typer.Exit(1)Also applies to: 971-974
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/cli/__init__.py` around lines 903 - 905, The Typer parameter
validation that raises typer.BadParameter (the checks using "if mode not in
(\"realtime\",\"batch\"): raise typer.BadParameter(...)" and the similar check
at the other occurrence) is being swallowed by a surrounding broad except
Exception; update the code so BadParameter is not caught: either move these
validations outside the broad try block, or add an explicit except
typer.BadParameter: raise before the general except Exception, ensuring
typer.BadParameter is re-raised so Typer's normal behavior occurs; apply the
same change to the other validation block referenced.
| import argparse | ||
| import json | ||
| import os | ||
| import sqlite3 | ||
| import time | ||
| from datetime import datetime, timezone | ||
| from pathlib import Path | ||
| from typing import Any, Dict, List, Optional |
There was a problem hiding this comment.
Missing sys import causes NameError.
sys.exit(1) is called in _get_genai_client() (lines 601 and 606), but sys is not imported. This will raise a NameError at runtime when the Google API key is missing or google-genai is not installed.
🐛 Proposed fix
import argparse
import json
import os
import sqlite3
+import sys
import time
from datetime import datetime, timezone🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/cloud_backfill.py` around lines 25 - 32, The file is missing
an import for the sys module which causes NameError when _get_genai_client()
calls sys.exit(1); add an import sys at the top of the file alongside other
imports so _get_genai_client can safely call sys.exit and error handling works
properly.
| except Exception as e: | ||
| err = str(e) | ||
| if "429" in err and attempt < max_retries - 1: | ||
| wait = 30 * (2 ** attempt) # 30s, 60s, 120s | ||
| print(f" 429 RESOURCE_EXHAUSTED — waiting {wait}s before retry...") | ||
| time.sleep(wait) | ||
| else: | ||
| print(f" FAILED: {err[:120]}") | ||
| if store: | ||
| save_checkpoint( | ||
| store, | ||
| batch_id=f"failed-{jsonl_path.stem}", | ||
| backend="gemini", | ||
| model=model, | ||
| status="failed", | ||
| chunk_count=chunk_count, | ||
| jsonl_path=str(jsonl_path), | ||
| error=err[:500], | ||
| ) | ||
| return None |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
google-genai python SDK rate limit 429 error handling
💡 Result:
To handle 429 rate limit errors in the Google Gen AI Python SDK (google-genai), use built-in retry configuration via HttpOptions and HttpRetryOptions when initializing the Client. The SDK raises google.genai.errors.ClientError with code 429 for rate limits. Configure retries to automatically retry on 429 and other transient errors like 408, 500, 502, 503, 504 using exponential backoff with jitter. ## Built-in Retry Configuration (Recommended) python import google.genai as genai from google.genai import types client = genai.Client( api_key='YOUR_API_KEY', # or use Vertex AI env vars http_options=types.HttpOptions( retry_options=types.HttpRetryOptions( attempts=3, # Number of retry attempts initial_delay=1.0, # Initial delay in seconds max_delay=30.0, # Maximum delay exp_base=2.0, # Exponential backoff multiplier jitter=1, # Jitter factor (0-1) http_status_codes=[408, 429, 500, 502, 503, 504] # Retry on these codes ) ) ) # Now API calls automatically retry on 429 response = client.models.generate_content(model='gemini-2.5-flash', contents='Hello!') This uses tenacity under the hood with exponential jittered backoff. Defaults retry on 429 if retry_options is set. ## Manual Error Handling Catch and handle manually: python from google.genai import errors import time try: response = client.models.generate_content(model='gemini-2.5-flash', contents='Hello!') except errors.ClientError as e: if e.code == 429: print("Rate limit exceeded. Implement backoff.") time.sleep(2 attempt) # Exponential backoff # Retry logic here elif e.code == 400: print("Bad request") else: raise ## Using Tenacity Library (External) For more control: python from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type from google.genai.errors import ClientError @retry( retry=retry_if_exception_type(ClientError), stop=stop_after_attempt(3), wait=wait_exponential_jitter(min=1, max=10) ) def generate_content(client, model, contents): return client.models.generate_content(model=model, contents=contents) response = generate_content(client, 'gemini-2.5-flash', 'Hello!') ## Best Practices - Monitor quotas in Google AI Studio or Vertex AI console; request increases if needed. - Use multiple API keys/projects for high volume. - Limit concurrency with semaphores for async code. - Check rate limits: https://ai.google.dev/gemini-api/docs/rate-limits (RPM/TPM per tier). - For Vertex AI, quotas are per-region/model; enable billing even for free tier. This prevents thundering herd issues and ensures resilience.
Citations:
- 1: https://googleapis-python-genai-70.mintlify.app/guides/error-handling
- 2: https://github.com/googleapis/python-genai/blob/main/google/genai/errors.py
- 3: Retry configuration options? googleapis/python-genai#336
- 4: https://github.com/googleapis/python-genai/blob/main/google/genai/_api_client.py
🏁 Script executed:
# Find the submit_gemini_batch function and surrounding context
rg -n "def submit_gemini_batch" src/brainlayer/cloud_backfill.py -A 50 | head -80Repository: EtanHey/brainlayer
Length of output: 2083
🏁 Script executed:
# Check imports to see if ClientError is imported
rg -n "^import|^from" src/brainlayer/cloud_backfill.py | head -20Repository: EtanHey/brainlayer
Length of output: 448
🏁 Script executed:
# Search for SDK initialization/client creation to see retry options
rg -n "Client\(|genai\.|http_options" src/brainlayer/cloud_backfill.pyRepository: EtanHey/brainlayer
Length of output: 106
🏁 Script executed:
# Check if ClientError or google.genai is imported anywhere
rg -n "google\.genai|ClientError|from google" src/brainlayer/cloud_backfill.pyRepository: EtanHey/brainlayer
Length of output: 241
🏁 Script executed:
# Get full imports section of cloud_backfill.py
sed -n '1,50p' src/brainlayer/cloud_backfill.pyRepository: EtanHey/brainlayer
Length of output: 1439
Use proper exception typing instead of string-based 429 detection.
The google-genai SDK raises google.genai.errors.ClientError with a .code attribute set to 429 for rate limiting. While the current string-based check ("429" in str(e)) works because the SDK includes the status code in error messages, it's fragile and non-idiomatic. Import ClientError and check the code directly:
from google.genai import errors
# ...
except errors.ClientError as e:
if e.code == 429 and attempt < max_retries - 1:
# retry logic
else:
# handle other errorsThis approach is more maintainable and aligns with SDK conventions.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/cloud_backfill.py` around lines 660 - 679, Replace the
string-based 429 detection with proper exception typing: import
google.genai.errors.ClientError (or from google.genai import errors) and change
the broad except block to first catch errors.ClientError as e and check e.code
== 429 (and attempt < max_retries - 1) to run the retry/wait logic; keep a
subsequent generic except Exception as e branch to handle other failures and
call save_checkpoint (used here with store, jsonl_path, model, chunk_count) and
return None as before. Ensure the retry backoff (wait = 30 * (2 ** attempt)) and
logging/printing behavior remain unchanged.
| def _clear_imported_preview(store: VectorStore, chunk_id: str) -> None: | ||
| """Remove preview fields written by a failed remote import so the chunk can retry.""" | ||
| store.conn.cursor().execute( | ||
| """ | ||
| UPDATE chunks | ||
| SET summary_v2 = NULL, | ||
| enrichment_version = NULL | ||
| WHERE id = ? | ||
| """, | ||
| (chunk_id,), | ||
| ) |
There was a problem hiding this comment.
Missing retry logic for database write.
_clear_imported_preview performs a database UPDATE but lacks retry logic for SQLITE_BUSY/BusyError. Since this is called from an exception handler, a BusyError here could mask the original failure and leave the chunk in an inconsistent state.
🛠️ Proposed fix
def _clear_imported_preview(store: VectorStore, chunk_id: str) -> None:
"""Remove preview fields written by a failed remote import so the chunk can retry."""
- store.conn.cursor().execute(
- """
- UPDATE chunks
- SET summary_v2 = NULL,
- enrichment_version = NULL
- WHERE id = ?
- """,
- (chunk_id,),
- )
+ for attempt in range(3):
+ try:
+ store.conn.cursor().execute(
+ """
+ UPDATE chunks
+ SET summary_v2 = NULL,
+ enrichment_version = NULL
+ WHERE id = ?
+ """,
+ (chunk_id,),
+ )
+ return
+ except apsw.BusyError:
+ if attempt < 2:
+ time.sleep(0.5 * (attempt + 1))
+ # On final failure, silently continue - logging already handledAs per coding guidelines: "Use retry logic on SQLITE_BUSY errors."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/cloud_backfill.py` around lines 803 - 813, The UPDATE in
_clear_imported_preview can raise SQLITE_BUSY/BusyError and should be retried
instead of failing; wrap the call to store.conn.cursor().execute(...) (and the
surrounding transaction/commit if any) with a small retry loop that catches the
database busy error (e.g., sqlite3.OperationalError or the BusyError type your
DB layer exposes), sleep/backoff between attempts, and re-raise only after
exhausting retries so the original exception is not masked and the chunk's
preview fields get cleared reliably for chunk_id; keep the retry limits and
backoff configurable/constants and ensure the cursor is re-obtained on each
retry.
| def import_results( | ||
| store: VectorStore, | ||
| results: List[Dict[str, Any]], | ||
| batch_id: str, | ||
| ) -> Dict[str, int]: | ||
| """Import preview summaries back to DB without mutating the live summary.""" | ||
| success = 0 | ||
| failed = 0 | ||
| skipped = 0 | ||
|
|
||
| cursor = store.conn.cursor() | ||
|
|
||
| for result in results: | ||
| chunk_id = result.get("key") | ||
| if not chunk_id: | ||
| failed += 1 | ||
| continue | ||
|
|
||
| # Skip if preview summary already exists. | ||
| existing = list( | ||
| cursor.execute( | ||
| "SELECT summary_v2 FROM chunks WHERE id = ?", | ||
| [chunk_id], | ||
| ) | ||
| ) | ||
| if existing and existing[0][0] is not None: | ||
| skipped += 1 | ||
| continue | ||
|
|
||
| # Extract the generated text from Gemini response | ||
| response_text = None | ||
| try: | ||
| response = result.get("response", result.get("output", {})) | ||
| if isinstance(response, dict): | ||
| candidates = response.get("candidates", []) | ||
| if candidates: | ||
| parts = candidates[0].get("content", {}).get("parts", []) | ||
| if parts: | ||
| response_text = parts[0].get("text", "") | ||
| elif isinstance(response, str): | ||
| response_text = response | ||
| except (KeyError, IndexError, TypeError): | ||
| pass | ||
|
|
||
| if not response_text: | ||
| failed += 1 | ||
| continue | ||
|
|
||
| enrichment = parse_enrichment(response_text) | ||
| if enrichment: | ||
| try: | ||
| store.update_reenrichment_preview( | ||
| chunk_id=chunk_id, | ||
| summary_v2=enrichment.get("summary"), | ||
| enrichment_version=REENRICHMENT_VERSION, | ||
| ) | ||
| except Exception as exc: | ||
| print(f" WARNING: preview import failed for {chunk_id}: {exc}") | ||
| _clear_imported_preview(store, chunk_id) | ||
| failed += 1 | ||
| continue | ||
| success += 1 | ||
| else: | ||
| failed += 1 | ||
|
|
||
| if (success + failed + skipped) % 1000 == 0: | ||
| print(f" Progress: {success} ok, {failed} fail, {skipped} skip") | ||
|
|
||
| print(f" Import done: {success} ok, {failed} fail, {skipped} skip") | ||
|
|
||
| # Update checkpoint | ||
| save_checkpoint( | ||
| store, | ||
| batch_id=batch_id, | ||
| status="imported", | ||
| completed_at=datetime.now(timezone.utc).isoformat(), | ||
| ) | ||
|
|
||
| return {"success": success, "failed": failed, "skipped": skipped} |
There was a problem hiding this comment.
Missing WAL checkpoint for bulk import operation.
The import_results function can update thousands of rows but doesn't perform WAL checkpoints. Per coding guidelines, bulk database operations should "checkpoint WAL before and after" to prevent WAL bloat. Consider adding checkpoints at the start, periodically during the loop, and at completion.
🛠️ Proposed fix
def import_results(
store: VectorStore,
results: List[Dict[str, Any]],
batch_id: str,
) -> Dict[str, int]:
"""Import preview summaries back to DB without mutating the live summary."""
success = 0
failed = 0
skipped = 0
cursor = store.conn.cursor()
+ # WAL checkpoint before bulk import
+ cursor.execute("PRAGMA wal_checkpoint(FULL)")
for result in results:
# ... existing loop code ...
if (success + failed + skipped) % 1000 == 0:
print(f" Progress: {success} ok, {failed} fail, {skipped} skip")
+ # Periodic WAL checkpoint during bulk import
+ cursor.execute("PRAGMA wal_checkpoint(FULL)")
print(f" Import done: {success} ok, {failed} fail, {skipped} skip")
+ # WAL checkpoint after bulk import
+ cursor.execute("PRAGMA wal_checkpoint(FULL)")
# Update checkpoint
save_checkpoint(As per coding guidelines: "Use PRAGMA wal_checkpoint(FULL) before and after bulk database operations to prevent WAL bloat."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/cloud_backfill.py` around lines 816 - 894, The import_results
function is missing WAL checkpoints for this bulk update; before starting the
loop run a PRAGMA wal_checkpoint(FULL) on store.conn, then inside the loop
perform periodic checkpoints (e.g., every 1000 processed or when
(success+failed+skipped) % 1000 == 0) by executing the same PRAGMA on store.conn
to flush the WAL, and finally run one more PRAGMA wal_checkpoint(FULL) after the
loop completes (before save_checkpoint). Locate import_results and use the
existing store.conn (and its cursor) to execute the checkpoint statements so
bulk updates via store.update_reenrichment_preview and calls like
_clear_imported_preview/save_checkpoint won’t cause WAL bloat.
| def process_pending_jobs_once(db_path: Path) -> Dict[str, int]: | ||
| """Check each submitted job once, importing any completed results.""" | ||
| store = VectorStore(db_path) | ||
| ensure_checkpoint_table(store) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider handling BusyError when opening the database.
process_pending_jobs_once opens VectorStore(db_path) directly without handling apsw.BusyError. If called from the monitor script while another process holds the lock, this will crash. Consider wrapping in try/except or using open_backfill_store for consistency with run_full_backfill.
♻️ Proposed fix
def process_pending_jobs_once(db_path: Path) -> Dict[str, int]:
"""Check each submitted job once, importing any completed results."""
- store = VectorStore(db_path)
+ try:
+ store = VectorStore(db_path)
+ except apsw.BusyError:
+ print("Database locked, skipping this poll cycle")
+ return {"checked": 0, "imported_jobs": 0, "failed_jobs": 0,
+ "still_pending": 0, "success": 0, "failed": 0, "skipped": 0}
ensure_checkpoint_table(store)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/cloud_backfill.py` around lines 1060 - 1063,
process_pending_jobs_once currently constructs VectorStore(db_path) directly
which can raise apsw.BusyError if the DB is locked; wrap the store open in a
try/except that catches apsw.BusyError and handles it gracefully (e.g., log and
return empty counts) or reuse the existing open_backfill_store helper used by
run_full_backfill to get a store safely. Update process_pending_jobs_once to
either call open_backfill_store(db_path) or to catch apsw.BusyError around
VectorStore(...) and return/exit cleanly before calling ensure_checkpoint_table
and further processing.
| def resume_backfill(db_path: Path) -> None: | ||
| """Resume polling/importing for any submitted but incomplete jobs.""" | ||
| store = VectorStore(db_path) | ||
| ensure_checkpoint_table(store) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider consistent BusyError handling for CLI entry points.
Both resume_backfill and show_status open VectorStore without handling lock contention. For CLI commands, a clear error message is preferable to a stack trace. Consider using a shared helper or wrapping these calls similarly to open_backfill_store.
Also applies to: 1190-1193
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/cloud_backfill.py` around lines 1120 - 1123, The
resume_backfill and show_status CLI entry points construct VectorStore directly
(VectorStore(...)) and therefore don't handle lock contention BusyError
consistently; change both to use the existing open_backfill_store helper or wrap
the VectorStore creation in the same BusyError try/except logic used by
open_backfill_store so a friendly CLI error is shown instead of a
traceback—update resume_backfill (and the similar block at lines ~1190-1193) to
call open_backfill_store(db_path) or replicate its BusyError handling before
calling ensure_checkpoint_table.
| params.append(chunk_id) | ||
| for attempt in range(3): | ||
| try: | ||
| cursor.execute(f"UPDATE chunks SET {', '.join(sets)} WHERE id = ?", params) | ||
| from .search_repo import clear_hybrid_search_cache | ||
|
|
||
| clear_hybrid_search_cache(getattr(self, "db_path", None)) | ||
| return |
There was a problem hiding this comment.
Treat a zero-row preview update as a failed import.
If the chunk disappears between export and import, this UPDATE is a no-op and still returns normally. The caller in src/brainlayer/cloud_backfill.py counts any normal return as a successful import, so the job can be marked successful without actually writing summary_v2 or enrichment_version.
Suggested fix
params.append(chunk_id)
for attempt in range(3):
try:
+ exists = cursor.execute(
+ "SELECT 1 FROM chunks WHERE id = ?",
+ (chunk_id,),
+ ).fetchone()
+ if exists is None:
+ raise KeyError(f"Chunk not found: {chunk_id}")
cursor.execute(f"UPDATE chunks SET {', '.join(sets)} WHERE id = ?", params)
from .search_repo import clear_hybrid_search_cache
clear_hybrid_search_cache(getattr(self, "db_path", None))
return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/session_repo.py` around lines 216 - 223, The UPDATE in the
retry loop can be a no-op if the chunk was deleted, but the code currently
returns normally; change the logic in the method containing the cursor.execute
call (the loop using sets, params and chunk_id in
src/brainlayer/session_repo.py) to check cursor.rowcount after executing the
UPDATE, and if rowcount == 0 treat it as a failed update by raising an exception
(so the retry loop continues and eventual failure is reported), and only call
clear_hybrid_search_cache(getattr(self, "db_path", None)) and return when
rowcount > 0 (i.e., a real row was updated).
| exported_keys = [] | ||
| for path in jsonl_files: | ||
| for line in path.read_text().splitlines(): | ||
| exported_keys.append(json.loads(line)["key"]) | ||
|
|
||
| assert set(exported_keys) == {"unenriched", "legacy"} |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Make the assertion detect duplicate exports.
set(exported_keys) hides duplicate keys, so this test still passes if the exporter emits the same chunk twice. That would mask duplicate batch work on the exact path this test is supposed to protect.
Suggested fix
- assert set(exported_keys) == {"unenriched", "legacy"}
+ assert len(exported_keys) == 2
+ assert set(exported_keys) == {"unenriched", "legacy"}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_cloud_backfill.py` around lines 384 - 389, The test currently uses
set(exported_keys) which hides duplicates; update the assertions in the block
handling exported_keys (from jsonl_files) to both verify the expected key set
and that there are no duplicates: assert set(exported_keys) == {"unenriched",
"legacy"} and assert len(exported_keys) == len(set(exported_keys)) (or use
collections.Counter to assert all counts == 1) so duplicate exports will fail
the test.
Summary
brainlayer.cloud_backfilland keepscripts/cloud_backfill.pyas a thin wrapper so the installed CLI no longer depends onscriptsbeing importablebrainlayer enrich --mode batchto the real Gemini Batch API JSONL submit/poll/import flow, with non-destructive imports intosummary_v2andenrichment_versionsummary_v2migration support, preview-write persistence, targeted batch tests, andscripts/monitor_batch_reenrichment.pyfor 30-minute polling/loggingVerification
pytest tests/test_cli_enrich.py tests/test_cloud_backfill.py tests/test_batch_reenrichment_setup.py -x -v-> 21 passedpytest tests/test_cli_enrich.py tests/test_cloud_backfill.py tests/test_batch_reenrichment_setup.py tests/test_enrichment_controller.py tests/test_session_enrichment_candidates.py tests/test_chunk_lifecycle.py -x -v-> 136 passedpython3 -m py_compile src/brainlayer/cloud_backfill.py scripts/cloud_backfill.py scripts/monitor_batch_reenrichment.py src/brainlayer/cli/__init__.py src/brainlayer/session_repo.py src/brainlayer/vector_store.py-> passedcr review --plain-> 1 finding, fixed by moving the batch implementation into the package and leaving the repo script as a wrapperLive 10-chunk smoke
/tmp/brainlayer-r86-batch-smoke.dbbatches/i7xvbhk0i3aii4fy05vsbem6u4bs6ejch8nx2026-04-15T15:26:06+00:00:JOB_STATE_PENDINGsummary_v2=0/10,summary=0/10,enrichment_version=2.0 => 0/10python3 scripts/monitor_batch_reenrichment.py --db /tmp/brainlayer-r86-batch-smoke.db --once --no-notifywhile the batch queue clearsSummary by CodeRabbit
Release Notes
statusphase to check batch job details--modeloption to override the Gemini model for batch operationsNote
Wire batch enrich CLI to Gemini Batch API via
cloud_backfillmodulebrainlayer.cloud_backfillmodule implementing a full Gemini Batch API pipeline: export chunks to JSONL, submit batch jobs, poll for completion, importsummary_v2results, and log usage telemetry.enrichCLI batch mode tocloud_backfill:submit/runcallrun_full_backfill,poll/importcallprocess_pending_jobs_once, andstatuscallsshow_status; a new--modelflag is forwarded to batch submit.summary_v2(TEXT) andenrichment_version(TEXT DEFAULT '1.0') columns to thechunkstable schema, andupdate_reenrichment_previewtoSessionMixinfor persisting preview fields without overwriting livesummary.monitor_batch_reenrichment.pyCLI script that polls pending jobs on an interval, logs progress, and optionally sends a local notification on completion.enrichment_checkpoints.dbwith migration from any legacy checkpoint rows in the main DB.Macroscope summarized 2a49cfd.