From 8f56de0bf0994a1c15d096399af0de7a25729ea9 Mon Sep 17 00:00:00 2001 From: Yad Konrad Date: Wed, 29 Apr 2026 10:40:10 -0400 Subject: [PATCH 1/2] feat(ingest): auto-reindex search/tags/qa after ingest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_ingest now refreshes the FTS5 search index, tag index, and Q&A index for every project that gained new messages, so users no longer have to POST to /api/{search,tags,qa}/reindex after new sessions land. Per-project re-index, not full reindex_all — only touched slugs are touched. Each service is invoked in its own try/except so a beta service failure (tags/qa) cannot break ingest, and search itself fails soft. Gated by a new auto_reindex_on_ingest setting (default True, env AUTO_REINDEX_ON_INGEST) for opt-out. The hook lives in run_ingest itself (single source of truth) so all four call sites — server.py startup, cli.py reindex, two routes/data.py refresh handlers — pick it up automatically. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 3 + stackunderflow/ingest/__init__.py | 86 +++++++- stackunderflow/settings.py | 1 + .../ingest/test_auto_reindex.py | 193 ++++++++++++++++++ 4 files changed, 281 insertions(+), 2 deletions(-) create mode 100644 tests/stackunderflow/ingest/test_auto_reindex.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 48aa9f6..1769252 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- **Auto-reindex after ingest.** `run_ingest` now refreshes the search, tag, and Q&A indexes for every project that gained new messages, so users no longer have to POST to `/api/search/reindex`, `/api/tags/reindex`, `/api/qa/reindex` after ingest. Each service is invoked in its own try/except — a beta-service failure (tags or Q&A) cannot break ingest, and search itself fails soft. Gated by a new `auto_reindex_on_ingest` setting (default `True`, env `AUTO_REINDEX_ON_INGEST`) for power users who want to disable it. Per-project re-index, not full `reindex_all` — only the touched projects are touched. + ## [0.3.5] - 2026-04-25 ### Fixed diff --git a/stackunderflow/ingest/__init__.py b/stackunderflow/ingest/__init__.py index 9fac9fc..42144fc 100644 --- a/stackunderflow/ingest/__init__.py +++ b/stackunderflow/ingest/__init__.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging import sqlite3 from stackunderflow.adapters.base import SourceAdapter @@ -9,7 +10,9 @@ from .enumerate import iter_refs from .writer import ingest_file -__all__ = ["iter_refs", "ingest_file", "run_ingest"] +__all__ = ["iter_refs", "ingest_file", "run_ingest", "auto_reindex_touched"] + +_logger = logging.getLogger(__name__) def run_ingest(conn: sqlite3.Connection, adapters: list[SourceAdapter]) -> dict[str, int]: @@ -18,8 +21,15 @@ def run_ingest(conn: sqlite3.Connection, adapters: list[SourceAdapter]) -> dict[ For each file, compare (mtime, size) against ingest_log and either skip, tail-read, or full-reparse. Returns per-provider new-record counts (handy for logging). + + After all files are processed, automatically refreshes the search, + tag, and Q&A indexes for any project that gained new messages, + unless the ``auto_reindex_on_ingest`` setting is disabled. Each + service is called in its own try/except so a beta-feature failure + cannot break ingest. """ counts: dict[str, int] = {} + touched_slugs: set[str] = set() for ref in iter_refs(adapters): prior = conn.execute( "SELECT mtime, size, processed_offset FROM ingest_log WHERE file_path = ?", @@ -40,11 +50,83 @@ def run_ingest(conn: sqlite3.Connection, adapters: list[SourceAdapter]) -> dict[ pre = conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] ingest_file(conn, adapter, ref, since_offset=since) post = conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] - counts[ref.provider] = counts.get(ref.provider, 0) + (post - pre) + added = post - pre + counts[ref.provider] = counts.get(ref.provider, 0) + added + if added: + touched_slugs.add(ref.project_slug) + + if touched_slugs: + auto_reindex_touched(conn, touched_slugs) return counts +def auto_reindex_touched( + conn: sqlite3.Connection, + slugs: set[str] | list[str], +) -> None: + """Refresh search/tag/Q&A indexes for the given project slugs. + + Each service is invoked independently — a failure in one (e.g. the + beta tag/qa services) must not block the others. No-op when the + ``auto_reindex_on_ingest`` setting is disabled or the corresponding + services are not initialised on ``deps``. + """ + import stackunderflow.deps as deps + from stackunderflow.store import queries + + if not deps.config.get("auto_reindex_on_ingest"): + return + + slug_list = list(slugs) + if not slug_list: + return + + project_rows = queries.list_projects(conn) + by_slug: dict[str, list[int]] = {} + for prow in project_rows: + if prow.slug in slug_list: + by_slug.setdefault(prow.slug, []).append(prow.id) + + prior_flag = getattr(deps, "is_reindexing", False) + deps.is_reindexing = True + try: + for slug in slug_list: + ids = by_slug.get(slug, []) + if not ids: + continue + # The schema has UNIQUE(provider, slug) so the same slug can map + # to multiple project rows (claude + codex). Concatenate before + # indexing — index_project does a DELETE-by-slug first, so naive + # iteration would let pass 2 wipe pass 1. + messages: list[dict] = [] + for pid in ids: + messages.extend(queries.get_project_messages(conn, project_id=pid)) + + for svc, name, mode in ( + (getattr(deps, "search_service", None), "search", "with_project"), + (getattr(deps, "qa_service", None), "qa", "with_project"), + (getattr(deps, "tag_service", None), "tags", "messages_only"), + ): + if svc is None: + continue + try: + if mode == "messages_only": + svc.index_project(messages) + else: + svc.index_project(slug, messages) + _logger.info( + "auto-reindex %s ok: project=%s messages=%d", + name, slug, len(messages), + ) + except Exception as e: + _logger.warning( + "auto-reindex %s failed for %s: %s", name, slug, e, + ) + finally: + deps.is_reindexing = prior_flag + + def _lookup(adapters: list[SourceAdapter], name: str) -> SourceAdapter: for a in adapters: if a.name == name: diff --git a/stackunderflow/settings.py b/stackunderflow/settings.py index 1fe9483..1821f4f 100644 --- a/stackunderflow/settings.py +++ b/stackunderflow/settings.py @@ -77,6 +77,7 @@ class Settings: max_date_range_days = _Opt(30, "MAX_DATE_RANGE_DAYS") messages_initial_load = _Opt(500, "MESSAGES_INITIAL_LOAD") log_level = _Opt("INFO","LOG_LEVEL") + auto_reindex_on_ingest = _Opt(True, "AUTO_REINDEX_ON_INGEST") # ── public helpers (used by server.py / cli.py) ────────────────────── diff --git a/tests/stackunderflow/ingest/test_auto_reindex.py b/tests/stackunderflow/ingest/test_auto_reindex.py new file mode 100644 index 0000000..0675b2d --- /dev/null +++ b/tests/stackunderflow/ingest/test_auto_reindex.py @@ -0,0 +1,193 @@ +"""Tests for the post-ingest auto-reindex hook. + +After run_ingest finishes, the search/tag/qa services should be invoked +once per touched project. Each must be in its own try/except so a beta +service failure cannot break ingest. An opt-out setting must skip the +hook entirely. +""" +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +import stackunderflow.deps as deps +from stackunderflow.adapters.base import Record, SessionRef +from stackunderflow.ingest import run_ingest +from stackunderflow.store import db, schema + + +class _StubAdapter: + name = "stub" + + def __init__(self, refs, records_per_ref): + self._refs = refs + self._records = records_per_ref + + def enumerate(self): + yield from self._refs + + def read(self, ref, *, since_offset=0): + yield from self._records.get(ref.session_id, []) + + +def _rec(seq: int, content: str = "hello world") -> Record: + # raw payload mirrors the Claude JSONL shape so the classifier can + # surface message content end-to-end (matters for the FTS test). + return Record( + provider="stub", session_id="s1", seq=seq, + timestamp="2026-01-01T00:00:00+00:00", role="user", model=None, + input_tokens=0, output_tokens=0, + cache_create_tokens=0, cache_read_tokens=0, + content_text=content, tools=(), cwd=None, + is_sidechain=False, uuid=f"u{seq}", parent_uuid=None, + raw={ + "type": "user", + "uuid": f"u{seq}", + "sessionId": "s1", + "timestamp": "2026-01-01T00:00:00+00:00", + "message": {"role": "user", "content": content}, + }, + ) + + +def _ref(tmp_path: Path, slug: str = "-a", mtime: float = 1.0, size: int = 100) -> SessionRef: + fp = tmp_path / f"{slug}.jsonl" + fp.write_bytes(b"x" * size) + return SessionRef("stub", slug, "s1", fp, file_mtime=mtime, file_size=size) + + +@pytest.fixture +def conn(tmp_path: Path) -> sqlite3.Connection: + c = db.connect(tmp_path / "store.db") + schema.apply(c) + yield c + c.close() + + +class _RecordingService: + """Captures index_project calls. ``mode`` mirrors the three services.""" + + def __init__(self, mode: str = "with_project"): + self.mode = mode + self.calls: list[tuple] = [] + + def index_project(self, *args): + self.calls.append(args) + + +class _BoomService: + """Always raises — exercises the per-service try/except.""" + + def __init__(self): + self.called = False + + def index_project(self, *args): + self.called = True + raise RuntimeError("boom") + + +@pytest.fixture +def reset_deps(): + """Snapshot/restore deps so tests don't leak service stubs.""" + saved = { + "search_service": deps.search_service, + "tag_service": deps.tag_service, + "qa_service": deps.qa_service, + } + deps.search_service = None + deps.tag_service = None + deps.qa_service = None + try: + yield + finally: + for k, v in saved.items(): + setattr(deps, k, v) + + +def test_run_ingest_auto_reindexes_touched_project(conn, tmp_path, reset_deps): + search = _RecordingService(mode="with_project") + tag = _RecordingService(mode="messages_only") + qa = _RecordingService(mode="with_project") + deps.search_service = search + deps.tag_service = tag + deps.qa_service = qa + + ref = _ref(tmp_path) + run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0), _rec(1)]})]) + + assert len(search.calls) == 1 + assert search.calls[0][0] == "-a" + assert len(qa.calls) == 1 + assert qa.calls[0][0] == "-a" + # Tags receives messages-only (no project name positional). + assert len(tag.calls) == 1 + + +def test_run_ingest_does_not_reindex_when_no_new_messages(conn, tmp_path, reset_deps): + search = _RecordingService() + deps.search_service = search + + ref = _ref(tmp_path) + adapter = _StubAdapter([ref], {"s1": [_rec(0)]}) + run_ingest(conn, [adapter]) # first run — should index + assert len(search.calls) == 1 + + run_ingest(conn, [adapter]) # second run — file unchanged, no reindex + assert len(search.calls) == 1 + + +def test_failing_service_does_not_break_others(conn, tmp_path, reset_deps): + search = _BoomService() + tag = _RecordingService(mode="messages_only") + qa = _RecordingService() + deps.search_service = search + deps.tag_service = tag + deps.qa_service = qa + + ref = _ref(tmp_path) + counts = run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0)]})]) + + assert search.called # search blew up + assert len(tag.calls) == 1 # tag still ran + assert len(qa.calls) == 1 # qa still ran + assert counts == {"stub": 1} # ingest still reported success + + +def test_opt_out_skips_reindex(conn, tmp_path, reset_deps, monkeypatch): + search = _RecordingService() + deps.search_service = search + + monkeypatch.setenv("AUTO_REINDEX_ON_INGEST", "false") + + ref = _ref(tmp_path) + run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0)]})]) + + assert search.calls == [] + + +def test_search_service_actually_indexes_after_ingest(tmp_path, reset_deps, monkeypatch): + """End-to-end: a fresh ingest should leave the FTS index queryable + without a manual /api/search/reindex POST.""" + from stackunderflow.services.search_service import SearchService + + search_db = tmp_path / "search.db" + monkeypatch.setattr( + "stackunderflow.services.search_service.SEARCH_DB_PATH", search_db + ) + deps.search_service = SearchService(db_path=search_db) + + store_db_path = tmp_path / "store.db" + monkeypatch.setattr(deps, "store_path", store_db_path) + conn = db.connect(store_db_path) + schema.apply(conn) + try: + ref = _ref(tmp_path) + run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0, "alpha bravo charlie")]})]) + finally: + conn.close() + + results = deps.search_service.search(query="bravo") + assert results["total"] >= 1 + assert any("bravo" in r.get("content", "").lower() for r in results["results"]) From 9e58be246d2266cfa8308e80f50951c66c54acbe Mon Sep 17 00:00:00 2001 From: Yad Konrad Date: Wed, 29 Apr 2026 10:55:30 -0400 Subject: [PATCH 2/2] test(ingest): annotate conn fixture as Iterator[Connection] Pyright flagged the generator-yielding fixture as returning Connection when it actually yields one. Use Iterator[sqlite3.Connection] to match. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/stackunderflow/ingest/test_auto_reindex.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/stackunderflow/ingest/test_auto_reindex.py b/tests/stackunderflow/ingest/test_auto_reindex.py index 0675b2d..fa6f3c9 100644 --- a/tests/stackunderflow/ingest/test_auto_reindex.py +++ b/tests/stackunderflow/ingest/test_auto_reindex.py @@ -8,6 +8,7 @@ from __future__ import annotations import sqlite3 +from collections.abc import Iterator from pathlib import Path import pytest @@ -59,7 +60,7 @@ def _ref(tmp_path: Path, slug: str = "-a", mtime: float = 1.0, size: int = 100) @pytest.fixture -def conn(tmp_path: Path) -> sqlite3.Connection: +def conn(tmp_path: Path) -> Iterator[sqlite3.Connection]: c = db.connect(tmp_path / "store.db") schema.apply(c) yield c