Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions scripts/cloud_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,15 @@
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))

from brainlayer.vector_store import VectorStore
from brainlayer.paths import get_db_path
from brainlayer.pipeline.enrichment import (
ENRICHMENT_PROMPT,
HIGH_VALUE_TYPES,
build_external_prompt,
build_prompt,
parse_enrichment,
)
from brainlayer.paths import get_db_path
from brainlayer.pipeline.sanitize import Sanitizer, SanitizeConfig
from brainlayer.pipeline.sanitize import SanitizeConfig, Sanitizer
from brainlayer.vector_store import VectorStore

# ── Config ──────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -136,10 +135,28 @@ def estimate_batch_cost_usd(input_tokens: int, output_tokens: int) -> float:

# ── DB helpers ──────────────────────────────────────────────────────────

def _open_checkpoint_conn() -> apsw.Connection:
def get_checkpoint_db_path(db_path: Path | str | None = None) -> Path:
"""Resolve the checkpoint sidecar path for a selected main DB."""
if db_path is None:
return CHECKPOINT_DB_PATH
return Path(db_path).with_name("enrichment_checkpoints.db")


def _store_db_path(store: Any | None) -> Path | None:
"""Best-effort DB path lookup from a store-like object."""
if store is None:
return None
db_path = getattr(store, "db_path", None)
if db_path is None:
return None
return Path(db_path)


def _open_checkpoint_conn(db_path: Path | str | None = None) -> apsw.Connection:
"""Open the sidecar checkpoint DB without VectorStore startup hooks."""
CHECKPOINT_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = apsw.Connection(str(CHECKPOINT_DB_PATH))
checkpoint_db_path = get_checkpoint_db_path(db_path)
checkpoint_db_path.parent.mkdir(parents=True, exist_ok=True)
conn = apsw.Connection(str(checkpoint_db_path))
conn.setbusytimeout(5000)
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
Expand Down Expand Up @@ -203,12 +220,13 @@ def _migrate_checkpoints_from_main_db(store: Optional[VectorStore], conn: apsw.C
return len(rows)


def _get_recorded_jsonl_paths(statuses: tuple[str, ...]) -> set[str]:
def _get_recorded_jsonl_paths(statuses: tuple[str, ...], db_path: Path | str | None = None) -> set[str]:
"""Return JSONL paths already represented by stable checkpoint states."""
if not CHECKPOINT_DB_PATH.exists():
checkpoint_db_path = get_checkpoint_db_path(db_path)
if not checkpoint_db_path.exists():
return set()

conn = _open_checkpoint_conn()
conn = _open_checkpoint_conn(db_path)
try:
_ensure_checkpoint_table_in_conn(conn)
placeholders = ", ".join("?" for _ in statuses)
Expand All @@ -228,20 +246,20 @@ def _get_recorded_jsonl_paths(statuses: tuple[str, ...]) -> set[str]:
conn.close()


def get_unsubmitted_export_files(export_dir: Optional[Path] = None) -> List[Path]:
def get_unsubmitted_export_files(export_dir: Optional[Path] = None, db_path: Path | str | None = None) -> List[Path]:
"""Reuse existing JSONL exports and skip files already checkpointed."""
export_dir = export_dir or EXPORT_DIR
existing_files = sorted(export_dir.glob("batch_*.jsonl"))
if not existing_files:
return []

recorded_paths = _get_recorded_jsonl_paths(CHECKPOINT_STABLE_STATUSES)
recorded_paths = _get_recorded_jsonl_paths(CHECKPOINT_STABLE_STATUSES, db_path=db_path)
return [path for path in existing_files if str(path) not in recorded_paths]


def ensure_checkpoint_table(store: VectorStore) -> None:
"""Create the sidecar checkpoint DB and migrate legacy rows if present."""
conn = _open_checkpoint_conn()
conn = _open_checkpoint_conn(_store_db_path(store))
try:
_ensure_checkpoint_table_in_conn(conn)
_migrate_checkpoints_from_main_db(store, conn)
Expand All @@ -251,13 +269,13 @@ def ensure_checkpoint_table(store: VectorStore) -> None:

def save_checkpoint(store: VectorStore, batch_id: str, **kwargs) -> None:
"""Insert or update a checkpoint row in the sidecar checkpoint DB."""
del store # Legacy signature retained for callers; checkpoint writes avoid the main DB.
last_error: Exception | None = None
checkpoint_db_path = _store_db_path(store)

for attempt in range(CHECKPOINT_WRITE_MAX_RETRIES):
conn = None
try:
conn = _open_checkpoint_conn()
conn = _open_checkpoint_conn(checkpoint_db_path)
_ensure_checkpoint_table_in_conn(conn)
cursor = conn.cursor()
existing = list(
Expand Down Expand Up @@ -294,8 +312,7 @@ def save_checkpoint(store: VectorStore, batch_id: str, **kwargs) -> None:

def get_pending_jobs(store: VectorStore) -> List[Dict[str, Any]]:
"""Get jobs that need polling (submitted but not completed/failed)."""
del store # Retained for signature compatibility.
conn = _open_checkpoint_conn()
conn = _open_checkpoint_conn(_store_db_path(store))
try:
_ensure_checkpoint_table_in_conn(conn)
rows = list(
Expand Down Expand Up @@ -807,17 +824,20 @@ def run_full_backfill(
# Step 1: Export or reuse existing batch files
max_chunks = sample if sample > 0 else 0
if submit_only and sample == 0:
jsonl_files = get_unsubmitted_export_files()
jsonl_files = get_unsubmitted_export_files(db_path=db_path)
if jsonl_files:
print(
f"Reusing {len(jsonl_files)} existing JSONL batch files not yet checkpointed "
f"from {EXPORT_DIR}"
)
elif list(EXPORT_DIR.glob("batch_*.jsonl")):
print("All existing JSONL batch files are already checkpointed. No new submissions needed.")
return
else:
jsonl_files = export_unenriched_chunks(store, max_chunks=max_chunks, no_sanitize=no_sanitize)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if not jsonl_files:
if get_pending_jobs(store):
print("Existing batch jobs are already submitted. Run --resume to import them.")
elif list(EXPORT_DIR.glob("batch_*.jsonl")):
print("All existing JSONL batch files are already checkpointed. No new submissions needed.")
return
else:
jsonl_files = export_unenriched_chunks(store, max_chunks=max_chunks, no_sanitize=no_sanitize)

Expand Down
26 changes: 25 additions & 1 deletion src/brainlayer/search_repo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Search and retrieval methods for VectorStore (mixin)."""

import copy
import hashlib
import json
import math
import os
Expand All @@ -21,15 +22,36 @@
# - Filter-scoped: all query-affecting filters belong in the cache key.
# - Copy-on-read: callers enrich and mutate result metadata after search.
_HYBRID_CACHE_TTL = 60.0 # seconds
_HYBRID_CACHE_MAX = 128 # max entries (LRU eviction)
_HYBRID_CACHE_MAX = 128 # max entries (LRU eviction)

# Module-level LRU cache: {cache_key: (result, timestamp)}
_hybrid_cache: "OrderedDict[tuple, tuple[dict, float]]" = OrderedDict()


def clear_hybrid_search_cache(store_key: Any = None) -> None:
"""Clear cached hybrid search results, optionally scoped to a single DB."""
if store_key is None:
_hybrid_cache.clear()
return

normalized_store_key = os.fspath(store_key)
stale_keys = [key for key in _hybrid_cache if key and key[0] == normalized_store_key]
for key in stale_keys:
_hybrid_cache.pop(key, None)


def _hybrid_embedding_key(query_embedding: Optional[List[float]]) -> bytes:
"""Hash embeddings so cache keys stay stable across equivalent iterables."""
if query_embedding is None:
return b""
embedding_bytes = serialize_f32([float(value) for value in query_embedding])
return hashlib.sha256(embedding_bytes).digest()


def _hybrid_cache_key(
store_key: str,
query_text: str,
query_embedding: Optional[List[float]],
n_results: int,
project_filter: Optional[str],
content_type_filter: Optional[str],
Expand All @@ -48,6 +70,7 @@ def _hybrid_cache_key(
return (
store_key,
query_text,
_hybrid_embedding_key(query_embedding),
n_results,
project_filter,
content_type_filter,
Expand Down Expand Up @@ -424,6 +447,7 @@ def hybrid_search(
cache_key = _hybrid_cache_key(
store_key,
query_text,
query_embedding,
n_results,
Comment on lines +447 to +451
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include query embedding in the hybrid search cache key

The cache key excludes query_embedding even though semantic ranking is computed from it, so two calls with identical text/filters but different embeddings will incorrectly reuse the first result for up to 60 seconds. This can return stale or wrong rankings during embedding model changes, alternate embedding pipelines, or recovery from a bad first embedding.

Useful? React with 👍 / 👎.

project_filter,
content_type_filter,
Expand Down
3 changes: 3 additions & 0 deletions src/brainlayer/session_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def update_enrichment(
for attempt in range(3):
try:
cursor.execute(f"UPDATE chunks SET {', '.join(sets)} WHERE id = ?", params)
from .search_repo import clear_hybrid_search_cache

clear_hybrid_search_cache(getattr(self, "db_path", None))
return
except apsw.BusyError:
if attempt < 2:
Expand Down
4 changes: 4 additions & 0 deletions src/brainlayer/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ def store_memory(
context=f"Stored via brain_store: {memory_type}",
)

from .search_repo import clear_hybrid_search_cache

clear_hybrid_search_cache(getattr(store, "db_path", None))

return {
"id": chunk_id,
"related": related,
Expand Down
6 changes: 6 additions & 0 deletions src/brainlayer/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,9 @@ def update_chunk(
"INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)",
(chunk_id, serialize_f32(embedding)),
)
from .search_repo import clear_hybrid_search_cache

clear_hybrid_search_cache(getattr(self, "db_path", None))
return True

def archive_chunk(self, chunk_id: str) -> bool:
Expand All @@ -712,6 +715,9 @@ def archive_chunk(self, chunk_id: str) -> bool:
return False
cursor.execute("UPDATE chunks SET value_type = 'ARCHIVED' WHERE id = ?", (chunk_id,))
cursor.execute("DELETE FROM chunk_vectors WHERE chunk_id = ?", (chunk_id,))
from .search_repo import clear_hybrid_search_cache

clear_hybrid_search_cache(getattr(self, "db_path", None))
return True

def get_chunk(self, chunk_id: str) -> Optional[Dict[str, Any]]:
Expand Down
85 changes: 83 additions & 2 deletions tests/test_cloud_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import apsw
import pytest

from scripts import cloud_backfill
from brainlayer.vector_store import VectorStore
from scripts import cloud_backfill


def _insert_unenriched_chunk(store: VectorStore, chunk_id: str, content: str, content_type: str = "assistant_text") -> None:
def _insert_unenriched_chunk(
store: VectorStore, chunk_id: str, content: str, content_type: str = "assistant_text"
) -> None:
"""Insert a minimal unenriched chunk eligible for export."""
cursor = store.conn.cursor()
cursor.execute(
Expand Down Expand Up @@ -246,3 +248,82 @@ def locked_vector_store(_db_path):
assert isinstance(store, cloud_backfill.ReadOnlyBackfillStore)
finally:
store.close()


def test_get_pending_jobs_is_scoped_to_the_selected_db(tmp_path, monkeypatch):
"""Pending jobs for one DB must not leak into another DB's resume path."""
monkeypatch.setattr(cloud_backfill, "CHECKPOINT_DB_PATH", tmp_path / "shared-sidecar.db", raising=False)

db_dir_a = tmp_path / "db-a"
db_dir_b = tmp_path / "db-b"
db_dir_a.mkdir()
db_dir_b.mkdir()

store_a = VectorStore(db_dir_a / "brainlayer.db")
store_b = VectorStore(db_dir_b / "brainlayer.db")
try:
cloud_backfill.save_checkpoint(
store_a,
batch_id="batch-a",
backend="gemini",
model="models/gemini-2.5-flash",
status="submitted",
chunk_count=10,
jsonl_path="/tmp/a.jsonl",
)

assert [job["batch_id"] for job in cloud_backfill.get_pending_jobs(store_a)] == ["batch-a"]
assert cloud_backfill.get_pending_jobs(store_b) == []
finally:
store_a.close()
store_b.close()


def test_submit_only_exports_new_chunks_when_old_files_are_checkpointed(tmp_path, monkeypatch):
"""submit-only should export new unenriched chunks instead of returning early."""
export_dir = tmp_path / "exports"
export_dir.mkdir()
checkpoint_db = tmp_path / "enrichment_checkpoints.db"
monkeypatch.setattr(cloud_backfill, "EXPORT_DIR", export_dir)
monkeypatch.setattr(cloud_backfill, "CHECKPOINT_DB_PATH", checkpoint_db, raising=False)

old_file = export_dir / "batch_001.jsonl"
old_file.write_text("{}\n")
new_file = export_dir / "batch_002.jsonl"

cloud_backfill.save_checkpoint(
None,
batch_id="imported-batch",
backend="gemini",
model="models/gemini-2.5-flash",
status="imported",
chunk_count=500,
jsonl_path=str(old_file),
)

exported_paths: list[Path] = []
submitted_paths: list[Path] = []

def fake_export(*args, **kwargs):
exported_paths.append(new_file)
new_file.write_text("{}\n")
return [new_file]

def fake_submit(jsonl_path, model, store):
submitted_paths.append(Path(jsonl_path))
return f"job-{Path(jsonl_path).stem}"

monkeypatch.setattr(cloud_backfill, "export_unenriched_chunks", fake_export)
monkeypatch.setattr(cloud_backfill, "submit_gemini_batch", fake_submit)
monkeypatch.setattr(cloud_backfill.time, "sleep", lambda *_args, **_kwargs: None)

store = VectorStore(tmp_path / "brainlayer.db")
try:
_insert_unenriched_chunk(store, "chunk-1", "new unenriched content that still needs export")
finally:
store.close()

cloud_backfill.run_full_backfill(tmp_path / "brainlayer.db", submit_only=True)

assert exported_paths == [new_file]
assert submitted_paths == [new_file]
4 changes: 4 additions & 0 deletions tests/test_eval_baselines.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,8 +928,12 @@ def _call_hook(self, prompt: str) -> str:
"""Run the hook subprocess and return its stdout."""
import subprocess

from brainlayer.paths import get_db_path

if not self.HOOK_PATH.exists():
pytest.skip(f"Hook not found at {self.HOOK_PATH}")
if not get_db_path().exists():
pytest.skip("Production DB not found — hook tests require a live DB")

result = subprocess.run(
["python3", str(self.HOOK_PATH)],
Expand Down
Loading
Loading