Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- **Auto-reindex after ingest.** `run_ingest` now refreshes the search, tag, and Q&A indexes for every project that gained new messages, so users no longer have to POST to `/api/search/reindex`, `/api/tags/reindex`, `/api/qa/reindex` after ingest. Each service is invoked in its own try/except — a beta-service failure (tags or Q&A) cannot break ingest, and search itself fails soft. Gated by a new `auto_reindex_on_ingest` setting (default `True`, env `AUTO_REINDEX_ON_INGEST`) for power users who want to disable it. Per-project re-index, not full `reindex_all` — only the touched projects are touched.

## [0.3.5] - 2026-04-25

### Fixed
Expand Down
86 changes: 84 additions & 2 deletions stackunderflow/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

from __future__ import annotations

import logging
import sqlite3

from stackunderflow.adapters.base import SourceAdapter

from .enumerate import iter_refs
from .writer import ingest_file

__all__ = ["iter_refs", "ingest_file", "run_ingest"]
__all__ = ["iter_refs", "ingest_file", "run_ingest", "auto_reindex_touched"]

_logger = logging.getLogger(__name__)


def run_ingest(conn: sqlite3.Connection, adapters: list[SourceAdapter]) -> dict[str, int]:
Expand All @@ -18,8 +21,15 @@ def run_ingest(conn: sqlite3.Connection, adapters: list[SourceAdapter]) -> dict[
For each file, compare (mtime, size) against ingest_log and either
skip, tail-read, or full-reparse. Returns per-provider new-record
counts (handy for logging).

After all files are processed, automatically refreshes the search,
tag, and Q&A indexes for any project that gained new messages,
unless the ``auto_reindex_on_ingest`` setting is disabled. Each
service is called in its own try/except so a beta-feature failure
cannot break ingest.
"""
counts: dict[str, int] = {}
touched_slugs: set[str] = set()
for ref in iter_refs(adapters):
prior = conn.execute(
"SELECT mtime, size, processed_offset FROM ingest_log WHERE file_path = ?",
Expand All @@ -40,11 +50,83 @@ def run_ingest(conn: sqlite3.Connection, adapters: list[SourceAdapter]) -> dict[
pre = conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
ingest_file(conn, adapter, ref, since_offset=since)
post = conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
counts[ref.provider] = counts.get(ref.provider, 0) + (post - pre)
added = post - pre
counts[ref.provider] = counts.get(ref.provider, 0) + added
if added:
touched_slugs.add(ref.project_slug)

if touched_slugs:
auto_reindex_touched(conn, touched_slugs)

return counts


def auto_reindex_touched(
conn: sqlite3.Connection,
slugs: set[str] | list[str],
) -> None:
"""Refresh search/tag/Q&A indexes for the given project slugs.

Each service is invoked independently — a failure in one (e.g. the
beta tag/qa services) must not block the others. No-op when the
``auto_reindex_on_ingest`` setting is disabled or the corresponding
services are not initialised on ``deps``.
"""
import stackunderflow.deps as deps
from stackunderflow.store import queries

if not deps.config.get("auto_reindex_on_ingest"):
return

slug_list = list(slugs)
if not slug_list:
return

project_rows = queries.list_projects(conn)
by_slug: dict[str, list[int]] = {}
for prow in project_rows:
if prow.slug in slug_list:
by_slug.setdefault(prow.slug, []).append(prow.id)

prior_flag = getattr(deps, "is_reindexing", False)
deps.is_reindexing = True
try:
for slug in slug_list:
ids = by_slug.get(slug, [])
if not ids:
continue
# The schema has UNIQUE(provider, slug) so the same slug can map
# to multiple project rows (claude + codex). Concatenate before
# indexing — index_project does a DELETE-by-slug first, so naive
# iteration would let pass 2 wipe pass 1.
messages: list[dict] = []
for pid in ids:
messages.extend(queries.get_project_messages(conn, project_id=pid))

for svc, name, mode in (
(getattr(deps, "search_service", None), "search", "with_project"),
(getattr(deps, "qa_service", None), "qa", "with_project"),
(getattr(deps, "tag_service", None), "tags", "messages_only"),
):
if svc is None:
continue
try:
if mode == "messages_only":
svc.index_project(messages)
else:
svc.index_project(slug, messages)
_logger.info(
"auto-reindex %s ok: project=%s messages=%d",
name, slug, len(messages),
)
except Exception as e:
_logger.warning(
"auto-reindex %s failed for %s: %s", name, slug, e,
)
finally:
deps.is_reindexing = prior_flag


def _lookup(adapters: list[SourceAdapter], name: str) -> SourceAdapter:
for a in adapters:
if a.name == name:
Expand Down
1 change: 1 addition & 0 deletions stackunderflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Settings:
max_date_range_days = _Opt(30, "MAX_DATE_RANGE_DAYS")
messages_initial_load = _Opt(500, "MESSAGES_INITIAL_LOAD")
log_level = _Opt("INFO","LOG_LEVEL")
auto_reindex_on_ingest = _Opt(True, "AUTO_REINDEX_ON_INGEST")

# ── public helpers (used by server.py / cli.py) ──────────────────────

Expand Down
194 changes: 194 additions & 0 deletions tests/stackunderflow/ingest/test_auto_reindex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Tests for the post-ingest auto-reindex hook.

After run_ingest finishes, the search/tag/qa services should be invoked
once per touched project. Each must be in its own try/except so a beta
service failure cannot break ingest. An opt-out setting must skip the
hook entirely.
"""
from __future__ import annotations

import sqlite3
from collections.abc import Iterator
from pathlib import Path

import pytest

import stackunderflow.deps as deps
from stackunderflow.adapters.base import Record, SessionRef
from stackunderflow.ingest import run_ingest
from stackunderflow.store import db, schema


class _StubAdapter:
name = "stub"

def __init__(self, refs, records_per_ref):
self._refs = refs
self._records = records_per_ref

def enumerate(self):
yield from self._refs

def read(self, ref, *, since_offset=0):
yield from self._records.get(ref.session_id, [])


def _rec(seq: int, content: str = "hello world") -> Record:
# raw payload mirrors the Claude JSONL shape so the classifier can
# surface message content end-to-end (matters for the FTS test).
return Record(
provider="stub", session_id="s1", seq=seq,
timestamp="2026-01-01T00:00:00+00:00", role="user", model=None,
input_tokens=0, output_tokens=0,
cache_create_tokens=0, cache_read_tokens=0,
content_text=content, tools=(), cwd=None,
is_sidechain=False, uuid=f"u{seq}", parent_uuid=None,
raw={
"type": "user",
"uuid": f"u{seq}",
"sessionId": "s1",
"timestamp": "2026-01-01T00:00:00+00:00",
"message": {"role": "user", "content": content},
},
)


def _ref(tmp_path: Path, slug: str = "-a", mtime: float = 1.0, size: int = 100) -> SessionRef:
fp = tmp_path / f"{slug}.jsonl"
fp.write_bytes(b"x" * size)
return SessionRef("stub", slug, "s1", fp, file_mtime=mtime, file_size=size)


@pytest.fixture
def conn(tmp_path: Path) -> Iterator[sqlite3.Connection]:
c = db.connect(tmp_path / "store.db")
schema.apply(c)
yield c
c.close()


class _RecordingService:
"""Captures index_project calls. ``mode`` mirrors the three services."""

def __init__(self, mode: str = "with_project"):
self.mode = mode
self.calls: list[tuple] = []

def index_project(self, *args):
self.calls.append(args)


class _BoomService:
"""Always raises — exercises the per-service try/except."""

def __init__(self):
self.called = False

def index_project(self, *args):
self.called = True
raise RuntimeError("boom")


@pytest.fixture
def reset_deps():
"""Snapshot/restore deps so tests don't leak service stubs."""
saved = {
"search_service": deps.search_service,
"tag_service": deps.tag_service,
"qa_service": deps.qa_service,
}
deps.search_service = None
deps.tag_service = None
deps.qa_service = None
try:
yield
finally:
for k, v in saved.items():
setattr(deps, k, v)


def test_run_ingest_auto_reindexes_touched_project(conn, tmp_path, reset_deps):
search = _RecordingService(mode="with_project")
tag = _RecordingService(mode="messages_only")
qa = _RecordingService(mode="with_project")
deps.search_service = search
deps.tag_service = tag
deps.qa_service = qa

ref = _ref(tmp_path)
run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0), _rec(1)]})])

assert len(search.calls) == 1
assert search.calls[0][0] == "-a"
assert len(qa.calls) == 1
assert qa.calls[0][0] == "-a"
# Tags receives messages-only (no project name positional).
assert len(tag.calls) == 1


def test_run_ingest_does_not_reindex_when_no_new_messages(conn, tmp_path, reset_deps):
search = _RecordingService()
deps.search_service = search

ref = _ref(tmp_path)
adapter = _StubAdapter([ref], {"s1": [_rec(0)]})
run_ingest(conn, [adapter]) # first run — should index
assert len(search.calls) == 1

run_ingest(conn, [adapter]) # second run — file unchanged, no reindex
assert len(search.calls) == 1


def test_failing_service_does_not_break_others(conn, tmp_path, reset_deps):
search = _BoomService()
tag = _RecordingService(mode="messages_only")
qa = _RecordingService()
deps.search_service = search
deps.tag_service = tag
deps.qa_service = qa

ref = _ref(tmp_path)
counts = run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0)]})])

assert search.called # search blew up
assert len(tag.calls) == 1 # tag still ran
assert len(qa.calls) == 1 # qa still ran
assert counts == {"stub": 1} # ingest still reported success


def test_opt_out_skips_reindex(conn, tmp_path, reset_deps, monkeypatch):
search = _RecordingService()
deps.search_service = search

monkeypatch.setenv("AUTO_REINDEX_ON_INGEST", "false")

ref = _ref(tmp_path)
run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0)]})])

assert search.calls == []


def test_search_service_actually_indexes_after_ingest(tmp_path, reset_deps, monkeypatch):
"""End-to-end: a fresh ingest should leave the FTS index queryable
without a manual /api/search/reindex POST."""
from stackunderflow.services.search_service import SearchService

search_db = tmp_path / "search.db"
monkeypatch.setattr(
"stackunderflow.services.search_service.SEARCH_DB_PATH", search_db
)
deps.search_service = SearchService(db_path=search_db)

store_db_path = tmp_path / "store.db"
monkeypatch.setattr(deps, "store_path", store_db_path)
conn = db.connect(store_db_path)
schema.apply(conn)
try:
ref = _ref(tmp_path)
run_ingest(conn, [_StubAdapter([ref], {"s1": [_rec(0, "alpha bravo charlie")]})])
finally:
conn.close()

results = deps.search_service.search(query="bravo")
assert results["total"] >= 1
assert any("bravo" in r.get("content", "").lower() for r in results["results"])
Loading