Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/brainlayer/dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ def _loads_tags(value: Any) -> set[str]:
return set()


def _tags_json_equivalent(left: Any, right: Any) -> bool:
return _loads_tags(left) == _loads_tags(right)


def _max_optional_number(left: Any, right: Any) -> Any:
values = []
for value in (left, right):
Expand Down Expand Up @@ -640,11 +644,13 @@ def merge_existing_chunk_seen(conn: Any, *, chunk_id: str, incoming: dict[str, A
last_seen_at = _latest_timestamp(
existing_last, existing_created, incoming.get("last_seen_at"), incoming.get("created_at")
)
merged_tags_json = json.dumps(merged_tags) if merged_tags else None
updates: dict[str, Any] = {
"tags": json.dumps(merged_tags) if merged_tags else None,
"seen_count": int(existing_seen or 1) + int(incoming.get("seen_count") or 1),
"last_seen_at": last_seen_at,
}
if not _tags_json_equivalent(existing_tags, merged_tags_json):
updates["tags"] = merged_tags_json
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if merged_importance is not None:
updates["importance"] = merged_importance
if merged_half_life is not None:
Expand Down
28 changes: 27 additions & 1 deletion src/brainlayer/drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,21 @@ def _columns(conn: apsw.Connection, table: str) -> set[str]:
return {row[1] for row in conn.execute(f"PRAGMA table_info({table})")}


def _content_hash(content: str) -> str:
return hashlib.sha256(content.strip().encode("utf-8")).hexdigest()


def _preview_text(values: dict[str, Any]) -> str:
summary = str(values.get("summary") or "").strip()
content = str(values.get("content") or "").strip()
source = summary or content
return source.replace("\n", " ").replace("\r", " ").replace("\t", " ")[:220]


def _insert_chunk(conn: apsw.Connection, values: dict[str, Any]) -> None:
cols = _columns(conn, "chunks")
if "preview_text" in cols and not str(values.get("preview_text") or "").strip():
values = {**values, "preview_text": _preview_text(values)}
if "content" in values:
fields = compute_dedupe_fields(str(values["content"]), values.get("created_at"))
values = {
Expand Down Expand Up @@ -226,6 +239,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult:
"summary": content[:200],
"tags": json.dumps(tags) if tags else None,
"importance": float(event["importance"]) if event.get("importance") is not None else None,
"content_hash": _content_hash(content),
"chunk_origin": detect_chunk_origin(content, event.get("chunk_origin")),
},
)
Expand Down Expand Up @@ -296,6 +310,7 @@ def _apply_watcher(conn: apsw.Connection, event: dict[str, Any]) -> None:
"conversation_id": event.get("conversation_id"),
"sender": event.get("sender"),
"tags": json.dumps(tags) if tags else None,
"content_hash": _content_hash(content),
"chunk_origin": detect_chunk_origin(content, event.get("chunk_origin")),
},
)
Expand Down Expand Up @@ -344,6 +359,7 @@ def _apply_hook(conn: apsw.Connection, event: dict[str, Any]) -> None:
"created_at": datetime.fromtimestamp(timestamp, timezone.utc).isoformat(),
"conversation_id": session_id,
"importance": 5,
"content_hash": _content_hash(content),
"chunk_origin": detect_chunk_origin(content, event.get("chunk_origin")),
},
)
Expand All @@ -356,6 +372,14 @@ def _apply_enrichment(conn: apsw.Connection, event: dict[str, Any]) -> None:
return
enrichment = event.get("enrichment") or {}
cols = _columns(conn, "chunks")
if "content_hash" in cols and event.get("content_hash"):
row = conn.execute("SELECT content_hash, content FROM chunks WHERE id = ?", (chunk_id,)).fetchone()
if not row:
return
current_hash = row[0] or _content_hash(str(row[1] or ""))
if current_hash and current_hash != event["content_hash"]:
Comment on lines +379 to +380
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Verify actual content when rejecting stale enrichment

When a queued watcher/store update rewrites an existing chunk through _insert_or_merge_chunk (the same-id content merge or duplicate merge paths), those merge helpers update content but do not update content_hash; this new guard then trusts the stale stored hash instead of hashing the current content. In that state, an older enrichment-* event whose content_hash matches the pre-merge content will still pass this check and overwrite summary/tags/resolved-query metadata for the merged content, so the freshness check should either keep content_hash in sync on content-changing merges or compare against _content_hash(content) here. Fresh evidence is that this commit added the hash guard here, while merge_existing_chunk_content/merge_duplicate_chunk still omit content_hash from their content update sets.

Useful? React with 👍 / 👎.

logger.warning("Skipping stale enrichment for chunk_id=%s content_hash mismatch", chunk_id)
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale enrichment hash trusts column

High Severity

_apply_enrichment treats a matching content_hash column as proof the enrichment matches the row, but same-id watcher content merges update content without updating content_hash. After a rewrite, an queued enrichment for the prior body can still apply summary/tags to the new content.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 2749cd5. Configure here.

updates: dict[str, Any] = {}
mappings = {
"summary": "summary",
Expand Down Expand Up @@ -507,7 +531,9 @@ def drain_once(

lock_fd = _acquire_queue_lock(queue_dir)
try:
files = sorted(queue_dir.glob("*.jsonl"))[:batch_size]
files = sorted(queue_dir.glob("*.jsonl"), key=lambda path: (path.name.startswith("enrichment-"), path.name))[
:batch_size
Comment on lines +534 to +535
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent stale enrichment from applying after newer writes

When an enrichment-*.jsonl file was queued for a chunk before a later watcher/store update for the same chunk_id, this ordering now drains the newer write first and then applies the older enrichment afterward. _apply_enrichment unconditionally updates summary/tags/resolved queries and only stores content_hash without checking it against the current row, so a stale enrichment result can overwrite retrieval metadata for the newly merged content; this is especially likely for same-id watcher updates, which the existing arbitration tests show are merged into the canonical chunk.

Useful? React with 👍 / 👎.

]
Comment on lines +534 to +536
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent enrichment starvation under sustained watcher load

When the queue has at least batch_size non-enrichment-* files, this ordering slices the batch before any enrichment file is selected, so a steady watcher/hook/store producer can keep enrichment updates out of every drain pass indefinitely. That keeps new chunks visible, but it also means summaries/tags/resolved queries for older chunks may never be applied while the realtime write rate stays above the drain capacity; consider reserving some batch capacity or aging in enrichment files rather than making the priority absolute.

Useful? React with 👍 / 👎.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if not files:
return 0
_log(log_path, f"queue_depth={len(files)}")
Expand Down
46 changes: 42 additions & 4 deletions src/brainlayer/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,22 @@ def _init_db(self) -> None:
)
""")
self._trigram_fts_available = True
cursor.execute("""
CREATE TABLE IF NOT EXISTS chunk_fts_rowids (
chunk_id TEXT PRIMARY KEY,
fts_rowid INTEGER,
trigram_rowid INTEGER
)
""")
cursor.execute("""
INSERT OR IGNORE INTO chunk_fts_rowids(chunk_id, fts_rowid)
SELECT chunk_id, rowid FROM chunks_fts WHERE chunk_id IS NOT NULL
""")
cursor.execute("""
INSERT INTO chunk_fts_rowids(chunk_id, trigram_rowid)
SELECT chunk_id, rowid FROM chunks_fts_trigram WHERE chunk_id IS NOT NULL
ON CONFLICT(chunk_id) DO UPDATE SET trigram_rowid = excluded.trigram_rowid
""")

# FTS5 sync triggers — keep summary/tags/resolved_query in sync
cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_insert")
Expand All @@ -816,6 +832,9 @@ def _init_db(self) -> None:
new.resolved_queries,
new.id
);
INSERT INTO chunk_fts_rowids(chunk_id, fts_rowid)
VALUES (new.id, last_insert_rowid())
ON CONFLICT(chunk_id) DO UPDATE SET fts_rowid = excluded.fts_rowid;
END
""")
cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_trigram_insert")
Expand All @@ -831,25 +850,37 @@ def _init_db(self) -> None:
new.resolved_queries,
new.id
);
INSERT INTO chunk_fts_rowids(chunk_id, trigram_rowid)
VALUES (new.id, last_insert_rowid())
ON CONFLICT(chunk_id) DO UPDATE SET trigram_rowid = excluded.trigram_rowid;
END
""")
cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_delete")
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS chunks_fts_delete AFTER DELETE ON chunks BEGIN
DELETE FROM chunks_fts WHERE chunk_id = old.id;
DELETE FROM chunks_fts
WHERE rowid = (SELECT fts_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id);
DELETE FROM chunks_fts_trigram
WHERE rowid = (SELECT trigram_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id);
DELETE FROM chunk_fts_rowids WHERE chunk_id = old.id;
END
""")
cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_trigram_delete")
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS chunks_fts_trigram_delete AFTER DELETE ON chunks BEGIN
DELETE FROM chunks_fts_trigram WHERE chunk_id = old.id;
DELETE FROM chunks_fts
WHERE rowid = (SELECT fts_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id);
DELETE FROM chunks_fts_trigram
WHERE rowid = (SELECT trigram_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id);
DELETE FROM chunk_fts_rowids WHERE chunk_id = old.id;
END
""")
cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_update")
Comment on lines 876 to 878
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep rowid map in sync during FTS backfills

When chunks_fts is populated outside the insert triggers, such as the startup empty-FTS backfill at src/brainlayer/vector_store.py:1483-1490 or the repair/rebuild paths, chunk_fts_rowids is not updated. A later chunk update then reaches this new mapped-rowid delete with a NULL or stale fts_rowid, so the old FTS row is not removed before inserting the replacement; searches can keep matching stale terms and may return duplicate rows after any repair/fresh backfill. The same synchronization gap applies to the trigram mapping when repair_fts() rebuilds chunks_fts_trigram.

Useful? React with 👍 / 👎.

cursor.execute("""
CREATE TRIGGER IF NOT EXISTS chunks_fts_update
AFTER UPDATE OF content, summary, tags, resolved_query, key_facts, resolved_queries ON chunks BEGIN
DELETE FROM chunks_fts WHERE chunk_id = old.id;
DELETE FROM chunks_fts
WHERE rowid = (SELECT fts_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id);
INSERT INTO chunks_fts(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id)
VALUES (
new.content,
Expand All @@ -860,13 +891,17 @@ def _init_db(self) -> None:
new.resolved_queries,
new.id
);
INSERT INTO chunk_fts_rowids(chunk_id, fts_rowid)
VALUES (new.id, last_insert_rowid())
ON CONFLICT(chunk_id) DO UPDATE SET fts_rowid = excluded.fts_rowid;
END
""")
cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_trigram_update")
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS chunks_fts_trigram_update
AFTER UPDATE OF content, summary, tags, resolved_query, key_facts, resolved_queries ON chunks BEGIN
DELETE FROM chunks_fts_trigram WHERE chunk_id = old.id;
DELETE FROM chunks_fts_trigram
WHERE rowid = (SELECT trigram_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id);
INSERT INTO chunks_fts_trigram(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id)
VALUES (
new.content,
Expand All @@ -877,6 +912,9 @@ def _init_db(self) -> None:
new.resolved_queries,
new.id
);
INSERT INTO chunk_fts_rowids(chunk_id, trigram_rowid)
VALUES (new.id, last_insert_rowid())
ON CONFLICT(chunk_id) DO UPDATE SET trigram_rowid = excluded.trigram_rowid;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repair breaks trigram rowid map

Medium Severity

repair_fts rebuilds chunks_fts_trigram with new rowids but does not refresh chunk_fts_rowids.trigram_rowid. Trigram update/delete triggers then target wrong or missing rowids, leaving orphan FTS rows and broken trigram sync after BRAINLAYER_REPAIR=1.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8e4639a. Configure here.

END
""")

Expand Down
Loading
Loading