Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions rag/pipelines/emit_manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""RAG corpus manifest emitter.

Aggregates the live pgvector corpus into a single JSON snapshot and writes
it to S3 as the public-safe RAG inventory artifact. The presentation layer
(public Knowledge Base panel + private dashboard inventory page) reads from
this manifest — it never queries pgvector directly. Per Decision 11 of the
presentation revamp plan, presentation surfaces are *views* of upstream
outputs, not new measurement layers.

What lands in the manifest:

- ``by_source``: per ``doc_type`` rollup (10-K, 10-Q, 8-K,
earnings_transcript, thesis) — document count, ticker count, chunk count
- ``by_ticker_coverage``: how many tickers are covered + per-ticker depth
percentiles (p25 / p50 / p75)
- ``totals``: documents, chunks, tickers
- ``embedding``: model name + dimension + chunk vector dimension from the
``rag.chunks.embedding`` column
- ``ingestion``: latest ``ingested_at`` overall + per ``doc_type``

What is intentionally *not* in the manifest (disclosure boundary):
per-ticker doc lists, individual document titles, chunk content. Those
stay private and only surface on dashboard.nousergon.ai under Cloudflare
Access during interview screenshare.

Usage::

python -m rag.pipelines.emit_manifest --output-s3
python -m rag.pipelines.emit_manifest --output-local /tmp/manifest.json
"""

from __future__ import annotations

import argparse
import json
import logging
from datetime import date, datetime, timezone
from typing import Any

from alpha_engine_lib.rag.db import execute_query

logger = logging.getLogger(__name__)

# Hardcoded; ``rag.embeddings.embed_*`` defaults to voyage-3-lite (1024d).
# Surfaced in the manifest so consumers don't have to re-derive it.
_EMBEDDING_MODEL = "voyage-3-lite"
_EMBEDDING_DIMENSION = 1024


def _by_source() -> dict[str, dict[str, int]]:
"""Per ``doc_type``: documents, distinct tickers, chunks."""
rows = execute_query(
"""
SELECT
d.doc_type,
COUNT(DISTINCT d.id) AS documents,
COUNT(DISTINCT d.ticker) AS tickers,
COUNT(c.id) AS chunks
FROM rag.documents d
LEFT JOIN rag.chunks c ON c.document_id = d.id
GROUP BY d.doc_type
ORDER BY d.doc_type
"""
)
return {
r["doc_type"]: {
"documents": int(r["documents"]),
"tickers": int(r["tickers"]),
"chunks": int(r["chunks"]),
}
for r in rows
}


def _by_ticker_coverage() -> dict[str, Any]:
"""Universe coverage rollup: how many tickers, depth percentiles."""
rows = execute_query(
"""
WITH per_ticker AS (
SELECT ticker, COUNT(*) AS doc_count
FROM rag.documents
GROUP BY ticker
)
SELECT
COUNT(*) AS tickers_with_any_doc,
PERCENTILE_DISC(0.25) WITHIN GROUP (ORDER BY doc_count) AS p25_docs,
PERCENTILE_DISC(0.50) WITHIN GROUP (ORDER BY doc_count) AS p50_docs,
PERCENTILE_DISC(0.75) WITHIN GROUP (ORDER BY doc_count) AS p75_docs
FROM per_ticker
"""
)
if not rows:
return {
"tickers_with_any_doc": 0,
"p25_docs_per_ticker": 0,
"p50_docs_per_ticker": 0,
"p75_docs_per_ticker": 0,
}
r = rows[0]
return {
"tickers_with_any_doc": int(r["tickers_with_any_doc"] or 0),
"p25_docs_per_ticker": int(r["p25_docs"] or 0),
"p50_docs_per_ticker": int(r["p50_docs"] or 0),
"p75_docs_per_ticker": int(r["p75_docs"] or 0),
}


def _totals() -> dict[str, int]:
rows = execute_query(
"""
SELECT
(SELECT COUNT(*) FROM rag.documents) AS documents,
(SELECT COUNT(*) FROM rag.chunks) AS chunks,
(SELECT COUNT(DISTINCT ticker) FROM rag.documents) AS tickers
"""
)
r = rows[0] if rows else {"documents": 0, "chunks": 0, "tickers": 0}
return {
"documents": int(r["documents"] or 0),
"chunks": int(r["chunks"] or 0),
"tickers": int(r["tickers"] or 0),
}


def _ingestion() -> dict[str, Any]:
rows = execute_query(
"""
SELECT doc_type, MAX(ingested_at) AS last_ts
FROM rag.documents
GROUP BY doc_type
ORDER BY doc_type
"""
)
by_source_last_ts = {r["doc_type"]: r["last_ts"].isoformat() for r in rows if r["last_ts"]}
overall = max(by_source_last_ts.values(), default=None)
return {
"last_run_ts": overall,
"by_source_last_ts": by_source_last_ts,
}


def build_manifest() -> dict[str, Any]:
"""Assemble the manifest dict by querying pgvector."""
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"schema_version": "1.0.0",
"totals": _totals(),
"by_source": _by_source(),
"by_ticker_coverage": _by_ticker_coverage(),
"embedding": {
"model": _EMBEDDING_MODEL,
"dimension": _EMBEDDING_DIMENSION,
},
"ingestion": _ingestion(),
}


def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

parser = argparse.ArgumentParser(description="Emit RAG corpus manifest")
parser.add_argument("--output-s3", action="store_true", help="Write manifest to S3 (date + latest pointer)")
parser.add_argument("--output-local", type=str, help="Write manifest to local file")
parser.add_argument("--bucket", type=str, default="alpha-engine-research")
args = parser.parse_args()

manifest = build_manifest()

if args.output_local:
with open(args.output_local, "w") as f:
json.dump(manifest, f, indent=2, default=str)
logger.info("Written to %s", args.output_local)

if args.output_s3:
import boto3
s3 = boto3.client("s3")
body = json.dumps(manifest, indent=2, default=str).encode()
dated_key = f"rag/manifest/{date.today().isoformat()}.json"
s3.put_object(
Bucket=args.bucket, Key=dated_key,
Body=body, ContentType="application/json",
)
s3.put_object(
Bucket=args.bucket, Key="rag/manifest/latest.json",
Body=body, ContentType="application/json",
)
logger.info("Written to s3://%s/%s (+ latest)", args.bucket, dated_key)


if __name__ == "__main__":
main()
13 changes: 11 additions & 2 deletions rag/pipelines/run_weekly_ingestion.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,28 @@ $PYTHON_BIN -m rag.pipelines.ingest_earnings_finnhub --from-signals --max-per-ti

# ── Step 4: Thesis history (v2 quant/qual from signals.json) ─────────────────
echo ""
echo "==> Step 4/5: Thesis history..."
echo "==> Step 4/6: Thesis history..."
SINCE=$(date -u -d '14 days ago' '+%Y-%m-%d' 2>/dev/null || date -u -v-14d '+%Y-%m-%d')
$PYTHON_BIN -m rag.pipelines.ingest_theses --signals --since "$SINCE" $DRY_RUN

# ── Step 5: Filing change detection ──────────────────────────────────────────
echo ""
echo "==> Step 5/5: Filing change detection..."
echo "==> Step 5/6: Filing change detection..."
if [ -z "$DRY_RUN" ]; then
$PYTHON_BIN -m rag.pipelines.filing_change_detection --output-s3
else
echo " SKIPPED in dry-run mode"
fi

# ── Step 6: Manifest emit (presentation-layer source of truth) ───────────────
echo ""
echo "==> Step 6/6: Emit corpus manifest..."
if [ -z "$DRY_RUN" ]; then
$PYTHON_BIN -m rag.pipelines.emit_manifest --output-s3
else
echo " SKIPPED in dry-run mode"
fi

echo ""
echo "========================================"
echo "RAG Weekly Ingestion Complete — $(date -u '+%Y-%m-%d %H:%M UTC')"
Expand Down
134 changes: 134 additions & 0 deletions tests/test_emit_manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""Unit tests for ``rag.pipelines.emit_manifest``.

Mocks ``alpha_engine_lib.rag.db.execute_query`` so the manifest assembly
runs without a live pgvector connection. Verifies the manifest schema
shape, the per-source rollup math, and the S3 put-object key pattern.
"""

from __future__ import annotations

import json
from datetime import datetime, timezone
from unittest.mock import patch

import pytest


# Each query in the module is identified by the substring that uniquely
# distinguishes it. ``_fake_execute_query`` dispatches off these substrings.
_FAKE_BY_SOURCE = [
{"doc_type": "10-K", "documents": 432, "tickers": 430, "chunks": 12459},
{"doc_type": "10-Q", "documents": 1287, "tickers": 428, "chunks": 38110},
{"doc_type": "8-K", "documents": 845, "tickers": 380, "chunks": 5230},
{"doc_type": "earnings_transcript", "documents": 612, "tickers": 410, "chunks": 18450},
{"doc_type": "thesis", "documents": 73, "tickers": 22, "chunks": 1850},
]
_FAKE_COVERAGE = [
{"tickers_with_any_doc": 882, "p25_docs": 8, "p50_docs": 14, "p75_docs": 21}
]
_FAKE_TOTALS = [{"documents": 3249, "chunks": 76099, "tickers": 882}]
_FAKE_INGESTION = [
{"doc_type": "10-K", "last_ts": datetime(2026, 5, 2, 9, 23, 34, tzinfo=timezone.utc)},
{"doc_type": "10-Q", "last_ts": datetime(2026, 5, 2, 9, 24, 1, tzinfo=timezone.utc)},
{"doc_type": "earnings_transcript", "last_ts": datetime(2026, 5, 2, 9, 25, 12, tzinfo=timezone.utc)},
]


def _fake_execute_query(sql: str, *args, **kwargs):
if "GROUP BY d.doc_type" in sql:
return _FAKE_BY_SOURCE
if "WITHIN GROUP (ORDER BY doc_count)" in sql:
return _FAKE_COVERAGE
if "SELECT COUNT(*) FROM rag.documents" in sql.replace("\n", " "):
return _FAKE_TOTALS
if "MAX(ingested_at)" in sql:
return _FAKE_INGESTION
raise AssertionError(f"unexpected query: {sql[:80]!r}")


@pytest.fixture
def manifest():
from rag.pipelines import emit_manifest
with patch.object(emit_manifest, "execute_query", side_effect=_fake_execute_query):
return emit_manifest.build_manifest()


def test_top_level_keys(manifest):
assert set(manifest) == {
"generated_at", "schema_version", "totals", "by_source",
"by_ticker_coverage", "embedding", "ingestion",
}
assert manifest["schema_version"] == "1.0.0"


def test_totals_match_fake(manifest):
assert manifest["totals"] == {"documents": 3249, "chunks": 76099, "tickers": 882}


def test_by_source_rollup_shape(manifest):
by_source = manifest["by_source"]
assert set(by_source) == {"10-K", "10-Q", "8-K", "earnings_transcript", "thesis"}
assert by_source["10-K"] == {"documents": 432, "tickers": 430, "chunks": 12459}
# All values must be plain ints (JSON-serializable, no Decimal leakage).
for entry in by_source.values():
for v in entry.values():
assert isinstance(v, int)


def test_coverage_percentiles(manifest):
cov = manifest["by_ticker_coverage"]
assert cov == {
"tickers_with_any_doc": 882,
"p25_docs_per_ticker": 8,
"p50_docs_per_ticker": 14,
"p75_docs_per_ticker": 21,
}


def test_embedding_metadata(manifest):
assert manifest["embedding"] == {"model": "voyage-3-lite", "dimension": 1024}


def test_ingestion_overall_picks_max(manifest):
# Overall last_run_ts must be the max across per-source timestamps.
assert manifest["ingestion"]["last_run_ts"] == "2026-05-02T09:25:12+00:00"
# Per-source map preserves all reported sources.
assert set(manifest["ingestion"]["by_source_last_ts"]) == {
"10-K", "10-Q", "earnings_transcript",
}


def test_manifest_is_json_serializable(manifest):
# default=str handles datetime; the build itself should round-trip cleanly.
payload = json.dumps(manifest, default=str)
reloaded = json.loads(payload)
assert reloaded["totals"]["documents"] == 3249


def test_s3_keys_use_dated_path_and_latest_pointer():
"""Verify the CLI writes to both `rag/manifest/{date}.json` and `latest.json`."""
from rag.pipelines import emit_manifest

captured = []

class FakeS3:
def put_object(self, **kwargs):
captured.append(kwargs)

with patch.object(emit_manifest, "execute_query", side_effect=_fake_execute_query):
with patch("boto3.client", return_value=FakeS3()):
import sys
argv_save = sys.argv
sys.argv = ["emit_manifest", "--output-s3", "--bucket", "test-bucket"]
try:
emit_manifest.main()
finally:
sys.argv = argv_save

assert len(captured) == 2
keys = sorted(c["Key"] for c in captured)
assert keys[0].startswith("rag/manifest/")
assert keys[0].endswith(".json")
assert keys[1] == "rag/manifest/latest.json"
assert all(c["Bucket"] == "test-bucket" for c in captured)
assert all(c["ContentType"] == "application/json" for c in captured)
Loading