diff --git a/rag/pipelines/emit_manifest.py b/rag/pipelines/emit_manifest.py new file mode 100644 index 0000000..a5d0cc7 --- /dev/null +++ b/rag/pipelines/emit_manifest.py @@ -0,0 +1,191 @@ +"""RAG corpus manifest emitter. + +Aggregates the live pgvector corpus into a single JSON snapshot and writes +it to S3 as the public-safe RAG inventory artifact. The presentation layer +(public Knowledge Base panel + private dashboard inventory page) reads from +this manifest — it never queries pgvector directly. Per Decision 11 of the +presentation revamp plan, presentation surfaces are *views* of upstream +outputs, not new measurement layers. + +What lands in the manifest: + +- ``by_source``: per ``doc_type`` rollup (10-K, 10-Q, 8-K, + earnings_transcript, thesis) — document count, ticker count, chunk count +- ``by_ticker_coverage``: how many tickers are covered + per-ticker depth + percentiles (p25 / p50 / p75) +- ``totals``: documents, chunks, tickers +- ``embedding``: model name + dimension + chunk vector dimension from the + ``rag.chunks.embedding`` column +- ``ingestion``: latest ``ingested_at`` overall + per ``doc_type`` + +What is intentionally *not* in the manifest (disclosure boundary): +per-ticker doc lists, individual document titles, chunk content. Those +stay private and only surface on dashboard.nousergon.ai under Cloudflare +Access during interview screenshare. + +Usage:: + + python -m rag.pipelines.emit_manifest --output-s3 + python -m rag.pipelines.emit_manifest --output-local /tmp/manifest.json +""" + +from __future__ import annotations + +import argparse +import json +import logging +from datetime import date, datetime, timezone +from typing import Any + +from alpha_engine_lib.rag.db import execute_query + +logger = logging.getLogger(__name__) + +# Hardcoded; ``rag.embeddings.embed_*`` defaults to voyage-3-lite (1024d). +# Surfaced in the manifest so consumers don't have to re-derive it. +_EMBEDDING_MODEL = "voyage-3-lite" +_EMBEDDING_DIMENSION = 1024 + + +def _by_source() -> dict[str, dict[str, int]]: + """Per ``doc_type``: documents, distinct tickers, chunks.""" + rows = execute_query( + """ + SELECT + d.doc_type, + COUNT(DISTINCT d.id) AS documents, + COUNT(DISTINCT d.ticker) AS tickers, + COUNT(c.id) AS chunks + FROM rag.documents d + LEFT JOIN rag.chunks c ON c.document_id = d.id + GROUP BY d.doc_type + ORDER BY d.doc_type + """ + ) + return { + r["doc_type"]: { + "documents": int(r["documents"]), + "tickers": int(r["tickers"]), + "chunks": int(r["chunks"]), + } + for r in rows + } + + +def _by_ticker_coverage() -> dict[str, Any]: + """Universe coverage rollup: how many tickers, depth percentiles.""" + rows = execute_query( + """ + WITH per_ticker AS ( + SELECT ticker, COUNT(*) AS doc_count + FROM rag.documents + GROUP BY ticker + ) + SELECT + COUNT(*) AS tickers_with_any_doc, + PERCENTILE_DISC(0.25) WITHIN GROUP (ORDER BY doc_count) AS p25_docs, + PERCENTILE_DISC(0.50) WITHIN GROUP (ORDER BY doc_count) AS p50_docs, + PERCENTILE_DISC(0.75) WITHIN GROUP (ORDER BY doc_count) AS p75_docs + FROM per_ticker + """ + ) + if not rows: + return { + "tickers_with_any_doc": 0, + "p25_docs_per_ticker": 0, + "p50_docs_per_ticker": 0, + "p75_docs_per_ticker": 0, + } + r = rows[0] + return { + "tickers_with_any_doc": int(r["tickers_with_any_doc"] or 0), + "p25_docs_per_ticker": int(r["p25_docs"] or 0), + "p50_docs_per_ticker": int(r["p50_docs"] or 0), + "p75_docs_per_ticker": int(r["p75_docs"] or 0), + } + + +def _totals() -> dict[str, int]: + rows = execute_query( + """ + SELECT + (SELECT COUNT(*) FROM rag.documents) AS documents, + (SELECT COUNT(*) FROM rag.chunks) AS chunks, + (SELECT COUNT(DISTINCT ticker) FROM rag.documents) AS tickers + """ + ) + r = rows[0] if rows else {"documents": 0, "chunks": 0, "tickers": 0} + return { + "documents": int(r["documents"] or 0), + "chunks": int(r["chunks"] or 0), + "tickers": int(r["tickers"] or 0), + } + + +def _ingestion() -> dict[str, Any]: + rows = execute_query( + """ + SELECT doc_type, MAX(ingested_at) AS last_ts + FROM rag.documents + GROUP BY doc_type + ORDER BY doc_type + """ + ) + by_source_last_ts = {r["doc_type"]: r["last_ts"].isoformat() for r in rows if r["last_ts"]} + overall = max(by_source_last_ts.values(), default=None) + return { + "last_run_ts": overall, + "by_source_last_ts": by_source_last_ts, + } + + +def build_manifest() -> dict[str, Any]: + """Assemble the manifest dict by querying pgvector.""" + return { + "generated_at": datetime.now(timezone.utc).isoformat(), + "schema_version": "1.0.0", + "totals": _totals(), + "by_source": _by_source(), + "by_ticker_coverage": _by_ticker_coverage(), + "embedding": { + "model": _EMBEDDING_MODEL, + "dimension": _EMBEDDING_DIMENSION, + }, + "ingestion": _ingestion(), + } + + +def main(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + + parser = argparse.ArgumentParser(description="Emit RAG corpus manifest") + parser.add_argument("--output-s3", action="store_true", help="Write manifest to S3 (date + latest pointer)") + parser.add_argument("--output-local", type=str, help="Write manifest to local file") + parser.add_argument("--bucket", type=str, default="alpha-engine-research") + args = parser.parse_args() + + manifest = build_manifest() + + if args.output_local: + with open(args.output_local, "w") as f: + json.dump(manifest, f, indent=2, default=str) + logger.info("Written to %s", args.output_local) + + if args.output_s3: + import boto3 + s3 = boto3.client("s3") + body = json.dumps(manifest, indent=2, default=str).encode() + dated_key = f"rag/manifest/{date.today().isoformat()}.json" + s3.put_object( + Bucket=args.bucket, Key=dated_key, + Body=body, ContentType="application/json", + ) + s3.put_object( + Bucket=args.bucket, Key="rag/manifest/latest.json", + Body=body, ContentType="application/json", + ) + logger.info("Written to s3://%s/%s (+ latest)", args.bucket, dated_key) + + +if __name__ == "__main__": + main() diff --git a/rag/pipelines/run_weekly_ingestion.sh b/rag/pipelines/run_weekly_ingestion.sh index 8d57b34..3a491f7 100755 --- a/rag/pipelines/run_weekly_ingestion.sh +++ b/rag/pipelines/run_weekly_ingestion.sh @@ -93,19 +93,28 @@ $PYTHON_BIN -m rag.pipelines.ingest_earnings_finnhub --from-signals --max-per-ti # ── Step 4: Thesis history (v2 quant/qual from signals.json) ───────────────── echo "" -echo "==> Step 4/5: Thesis history..." +echo "==> Step 4/6: Thesis history..." SINCE=$(date -u -d '14 days ago' '+%Y-%m-%d' 2>/dev/null || date -u -v-14d '+%Y-%m-%d') $PYTHON_BIN -m rag.pipelines.ingest_theses --signals --since "$SINCE" $DRY_RUN # ── Step 5: Filing change detection ────────────────────────────────────────── echo "" -echo "==> Step 5/5: Filing change detection..." +echo "==> Step 5/6: Filing change detection..." if [ -z "$DRY_RUN" ]; then $PYTHON_BIN -m rag.pipelines.filing_change_detection --output-s3 else echo " SKIPPED in dry-run mode" fi +# ── Step 6: Manifest emit (presentation-layer source of truth) ─────────────── +echo "" +echo "==> Step 6/6: Emit corpus manifest..." +if [ -z "$DRY_RUN" ]; then + $PYTHON_BIN -m rag.pipelines.emit_manifest --output-s3 +else + echo " SKIPPED in dry-run mode" +fi + echo "" echo "========================================" echo "RAG Weekly Ingestion Complete — $(date -u '+%Y-%m-%d %H:%M UTC')" diff --git a/tests/test_emit_manifest.py b/tests/test_emit_manifest.py new file mode 100644 index 0000000..9e261bd --- /dev/null +++ b/tests/test_emit_manifest.py @@ -0,0 +1,134 @@ +"""Unit tests for ``rag.pipelines.emit_manifest``. + +Mocks ``alpha_engine_lib.rag.db.execute_query`` so the manifest assembly +runs without a live pgvector connection. Verifies the manifest schema +shape, the per-source rollup math, and the S3 put-object key pattern. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest + + +# Each query in the module is identified by the substring that uniquely +# distinguishes it. ``_fake_execute_query`` dispatches off these substrings. +_FAKE_BY_SOURCE = [ + {"doc_type": "10-K", "documents": 432, "tickers": 430, "chunks": 12459}, + {"doc_type": "10-Q", "documents": 1287, "tickers": 428, "chunks": 38110}, + {"doc_type": "8-K", "documents": 845, "tickers": 380, "chunks": 5230}, + {"doc_type": "earnings_transcript", "documents": 612, "tickers": 410, "chunks": 18450}, + {"doc_type": "thesis", "documents": 73, "tickers": 22, "chunks": 1850}, +] +_FAKE_COVERAGE = [ + {"tickers_with_any_doc": 882, "p25_docs": 8, "p50_docs": 14, "p75_docs": 21} +] +_FAKE_TOTALS = [{"documents": 3249, "chunks": 76099, "tickers": 882}] +_FAKE_INGESTION = [ + {"doc_type": "10-K", "last_ts": datetime(2026, 5, 2, 9, 23, 34, tzinfo=timezone.utc)}, + {"doc_type": "10-Q", "last_ts": datetime(2026, 5, 2, 9, 24, 1, tzinfo=timezone.utc)}, + {"doc_type": "earnings_transcript", "last_ts": datetime(2026, 5, 2, 9, 25, 12, tzinfo=timezone.utc)}, +] + + +def _fake_execute_query(sql: str, *args, **kwargs): + if "GROUP BY d.doc_type" in sql: + return _FAKE_BY_SOURCE + if "WITHIN GROUP (ORDER BY doc_count)" in sql: + return _FAKE_COVERAGE + if "SELECT COUNT(*) FROM rag.documents" in sql.replace("\n", " "): + return _FAKE_TOTALS + if "MAX(ingested_at)" in sql: + return _FAKE_INGESTION + raise AssertionError(f"unexpected query: {sql[:80]!r}") + + +@pytest.fixture +def manifest(): + from rag.pipelines import emit_manifest + with patch.object(emit_manifest, "execute_query", side_effect=_fake_execute_query): + return emit_manifest.build_manifest() + + +def test_top_level_keys(manifest): + assert set(manifest) == { + "generated_at", "schema_version", "totals", "by_source", + "by_ticker_coverage", "embedding", "ingestion", + } + assert manifest["schema_version"] == "1.0.0" + + +def test_totals_match_fake(manifest): + assert manifest["totals"] == {"documents": 3249, "chunks": 76099, "tickers": 882} + + +def test_by_source_rollup_shape(manifest): + by_source = manifest["by_source"] + assert set(by_source) == {"10-K", "10-Q", "8-K", "earnings_transcript", "thesis"} + assert by_source["10-K"] == {"documents": 432, "tickers": 430, "chunks": 12459} + # All values must be plain ints (JSON-serializable, no Decimal leakage). + for entry in by_source.values(): + for v in entry.values(): + assert isinstance(v, int) + + +def test_coverage_percentiles(manifest): + cov = manifest["by_ticker_coverage"] + assert cov == { + "tickers_with_any_doc": 882, + "p25_docs_per_ticker": 8, + "p50_docs_per_ticker": 14, + "p75_docs_per_ticker": 21, + } + + +def test_embedding_metadata(manifest): + assert manifest["embedding"] == {"model": "voyage-3-lite", "dimension": 1024} + + +def test_ingestion_overall_picks_max(manifest): + # Overall last_run_ts must be the max across per-source timestamps. + assert manifest["ingestion"]["last_run_ts"] == "2026-05-02T09:25:12+00:00" + # Per-source map preserves all reported sources. + assert set(manifest["ingestion"]["by_source_last_ts"]) == { + "10-K", "10-Q", "earnings_transcript", + } + + +def test_manifest_is_json_serializable(manifest): + # default=str handles datetime; the build itself should round-trip cleanly. + payload = json.dumps(manifest, default=str) + reloaded = json.loads(payload) + assert reloaded["totals"]["documents"] == 3249 + + +def test_s3_keys_use_dated_path_and_latest_pointer(): + """Verify the CLI writes to both `rag/manifest/{date}.json` and `latest.json`.""" + from rag.pipelines import emit_manifest + + captured = [] + + class FakeS3: + def put_object(self, **kwargs): + captured.append(kwargs) + + with patch.object(emit_manifest, "execute_query", side_effect=_fake_execute_query): + with patch("boto3.client", return_value=FakeS3()): + import sys + argv_save = sys.argv + sys.argv = ["emit_manifest", "--output-s3", "--bucket", "test-bucket"] + try: + emit_manifest.main() + finally: + sys.argv = argv_save + + assert len(captured) == 2 + keys = sorted(c["Key"] for c in captured) + assert keys[0].startswith("rag/manifest/") + assert keys[0].endswith(".json") + assert keys[1] == "rag/manifest/latest.json" + assert all(c["Bucket"] == "test-bucket" for c in captured) + assert all(c["ContentType"] == "application/json" for c in captured)