Skip to content

feat(derived): Wave 1 PR A.2 — news structured aggregates writer (per-ticker per-day parquet)#228

Merged
cipher813 merged 1 commit into
mainfrom
feat/wave1-news-aggregates-writer
May 13, 2026
Merged

feat(derived): Wave 1 PR A.2 — news structured aggregates writer (per-ticker per-day parquet)#228
cipher813 merged 1 commit into
mainfrom
feat/wave1-news-aggregates-writer

Conversation

@cipher813
Copy link
Copy Markdown
Owner

Summary

Wave 1 PR A.2 of the institutional data-revamp arc. Joins the 3 NLP-pipeline output streams (sentiment_scores, entity_mentions, event_flags) with the aggregator's source-provenance information and produces one row per (ticker, aggregate_date) in S3 parquet.

This is the canonical structured signal that downstream consumers (research's fetch_data) read into input_data_snapshot for thesis_update + sector_quant + sector_qual agents.

What's in

New module data/derived/news_aggregates.py:

NewsTickerDailyAggregate row shape (19 columns)

Group Columns
Identity ticker, aggregate_date, schema_version
Volume n_articles, n_articles_trusted_weighted, n_articles_by_source_json
LM sentiment lm_sentiment_mean/max/min/trusted_mean, lm_positive/negative/uncertainty_words_total, lm_total_tokens
Events event_count, event_severity_max/mean, event_categories (sorted comma-joined), top_event_descriptions (top-N by severity desc, |-joined)
Entities entity_mentions_count

schema_version=1 pinned to column so consumers can shim through future evolution.

Aggregation semantics

  • Multi-ticker article emits one row per ticker (each row sees the full article)
  • event.tickers gates per-ticker event attachment; empty tickers list means "inherit from article"
  • Trust-weighted sentiment weights each article by max trust across its source variants
  • n_articles_trusted_weighted sums trust per VARIANT (so 3-source dedup counts 3× its trust contribution)
  • Top-3 event descriptions chosen by severity desc; stored as " | "-joined string

S3 layout

  • write_news_aggregates_parquet() / read_news_aggregates_parquet() — I/O at s3://alpha-engine-research/data/news_aggregates/{YYYY-MM-DD}.parquet
  • Idempotent overwrite (single file per day vs per-ticker — cheaper LIST, atomic write, simpler schema migration)
  • aggregate_and_write() — orchestrator-friendly end-to-end helper returning (key, df)

What's deferred

  • PR A.3 — RAG ingest path (full article text → chunked → embedded → pgvector alongside SEC filings)
  • PR D — Async + S3 cache + per-vendor rate limiters
  • PR F — Wire substrate into research's fetch_data (supersedes fix(infra): drop inline EB-SFN role IAM writes #170's per-ticker pre-fetch)
  • PRs B / C / E — Filings expansion, analyst substrate, RAG-as-tool in research

Test plan

  • +21 unit tests covering: per-ticker aggregation (one-article-one-ticker / multi-ticker emits per-ticker / multiple articles per ticker / empty produces empty-with-schema), trust weighting (trusted mean / weighted count / per-variant source counts JSON), event aggregation (ticker-filter / no-tickers-inherits / top-N sorted desc / categories sorted+joined / severity mean), S3 round-trip (write→read / missing-parquet returns empty-schema-df / overwrite / key format / custom prefix), end-to-end aggregate_and_write + datetime tolerance, schema_version pinned to int constant + on every row.
  • In-memory S3 mock used for round-trip tests — no moto dep added.
  • Full data suite: 911 passing (1 skipped) in 4s.

🤖 Generated with Claude Code

…-ticker per-day parquet)

Wave 1 PR A.2 of the institutional data-revamp arc (plan doc:
~/Development/alpha-engine-docs/private/data-revamp-260513.md).

Joins the 3 NLP-pipeline output streams (sentiment_scores,
entity_mentions, event_flags) with the aggregator's source-provenance
information and produces one row per (ticker, aggregate_date) in S3
parquet. This is the canonical structured signal that downstream
consumers (research's fetch_data) read into input_data_snapshot for
thesis_update + sector_quant + sector_qual agents.

New module: data/derived/news_aggregates.py

  NewsTickerDailyAggregate dataclass (frozen) — canonical row shape
  with schema_version pinned to column. 19 columns covering:
    - identity: ticker, aggregate_date, schema_version
    - volume: n_articles, n_articles_trusted_weighted,
              n_articles_by_source_json
    - LM sentiment: lm_sentiment_mean/max/min/trusted_mean +
                    LM word counts (positive, negative, uncertainty,
                    total_tokens)
    - events: event_count, severity_max/mean, categories (sorted
              comma-joined), top_event_descriptions (top-N by
              severity desc, " | "-joined)
    - entities: entity_mentions_count

  build_news_aggregates_df() — pure aggregation, no I/O. Accepts
  NewsNLPOutput + AggregatedNewsArticle list + aggregate_date + optional
  NewsAggregator (for trust-weighted means).

  write_news_aggregates_parquet() / read_news_aggregates_parquet()
    — S3 parquet I/O. Idempotent overwrite at
    ``s3://alpha-engine-research/data/news_aggregates/{date}.parquet``.

  aggregate_and_write() — orchestrator-friendly end-to-end helper
    returning (key, df) so callers can log row counts / emit CW
    metrics without re-reading from S3.

Aggregation semantics:

- Multi-ticker article emits one row per ticker (each row sees the
  full article).
- Event.tickers gates per-ticker event attachment; empty tickers list
  means "inherit from article" (extractor didn't gate).
- Trust-weighted sentiment mean weights each article by max trust
  across its source variants. n_articles_trusted_weighted sums trust
  per VARIANT (so a 3-source dedup counts 3× its trust contribution).
- Top-3 event descriptions chosen by severity desc; ties broken by
  appearance order. Stored as " | "-joined string for parquet
  column friendliness.

Why one parquet per date (vs per ticker):
- ~25 held + ~50 universe tickers = ~75 rows/day. Single parquet
  reads faster than 75 separate files.
- Single S3 object per day = cheaper LIST, atomic overwrite.
- Schema migration simpler — one file per day, not 75.

What's deferred:
  PR A.3 — RAG ingest path (full article text → chunked → embedded
           → pgvector alongside SEC filings corpus)
  PR D   — Async + S3 cache + per-vendor rate limiters
  PR F   — Wire substrate into research's fetch_data (supersedes
           #170's per-ticker pre-fetch)

+21 unit tests:
  - Per-ticker aggregation: one-article-one-ticker / multi-ticker
    article emits one row per / multiple-articles-per-ticker /
    empty-articles-produces-empty-df-with-schema
  - Trust weighting: trusted-mean / n_articles_trusted_weighted /
    source counts json per variant
  - Event aggregation: tickers-set filters to those tickers /
    no-tickers inherits from article / top-N sorted by severity desc /
    categories sorted+joined / severity_mean computed
  - S3 round-trip: write-read preserves rows / missing parquet
    returns empty schema df / overwrite / s3 key format /
    custom prefix
  - End-to-end aggregate_and_write helper + accepts datetime
  - Schema version pinned to int constant + on every row

In-memory S3 mock used for round-trip tests (no moto dep added).

Suite: 911 passing (1 skipped).

Composes with:
  - PR β (#226) — AggregatedNewsArticle input shape
  - PR A.1 (#227) — NewsNLPOutput shape
  - PR α (lib v0.15.0) — NewsArticle base shape
  - data-revamp-260513.md plan doc

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@cipher813 cipher813 merged commit b5404b2 into main May 13, 2026
1 check passed
@cipher813 cipher813 deleted the feat/wave1-news-aggregates-writer branch May 13, 2026 18:08
cipher813 added a commit that referenced this pull request May 13, 2026
Wave 1 PR A.3 of the institutional data-revamp arc (plan doc:
~/Development/alpha-engine-docs/private/data-revamp-260513.md).

Indexes aggregated news articles into the existing pgvector RAG
corpus alongside SEC filings. Consumer agents (thesis_update,
sector_quant/qual) retrieve relevant news at inference time via the
same hybrid-retrieval API the qual analyst's query_filings tool
already uses.

Pairs with PRs β/A.1/A.2 — completes the Wave 1 producer-side news
substrate. Consumer-side tool wiring (PR E) and fetch_data
integration (PR F) follow.

New module: rag/pipelines/ingest_news.py

  ingest_articles(articles, *, filed_date, ticker_to_sector,
                  embed_texts_fn, document_exists_fn,
                  ingest_document_fn, dry_run) -> stats dict

  Architecture:
    - Mirrors ingest_8k_filings.py pattern (canonical RAG-pipeline shape)
    - One document per (ticker, article) pair — multi-ticker articles
      index once per ticker so the ticker-keyed RAG schema surfaces
      them when the qual agent queries by any constituent
    - Idempotent via document_exists pre-check — re-runs skip the
      embedding call entirely (saves vector-API cost)
    - Chunking: one chunk per article (title + longest body excerpt).
      Polygon/GDELT/Yahoo bodies are short (typically <500 tokens);
      multi-chunk splitting can land in a follow-up if Benzinga or
      a full-body adapter joins
    - Source labeling: RAG `source` field prefixed `news_` so
      consumer queries can filter "news only" vs "filings only" by
      source-prefix without enumerating vendors

  Canonical source selection:
    Picks alphabetically across variants so re-ingests produce the
    same document (deterministic across runs). Composes with the
    aggregator's source-provenance preservation in PR β.

  Failure isolation:
    Per-document failures (transient pgvector / embed-API hiccup)
    isolated to that document; batch continues. ingest_document
    returning None (lib's failure signal) counts as a failure
    without crashing.

Composition chain (full Wave 1 dataflow):

  PolygonNewsAdapter + GdeltNewsAdapter + YahooRssNewsAdapter (PR β)
                              │
                              ▼
                  NewsAggregator.fetch() — fan-in + dedup + trust
                              │
                              ▼
                    AggregatedNewsArticle list
                              │
            ┌─────────────────┼─────────────────┐
            ▼                 ▼                 ▼
  NewsNLPPipeline       aggregate_and_write    ingest_articles
  (PR A.1)              (PR A.2 — parquet)     (PR A.3 — RAG)
            │                 │                 │
            ▼                 ▼                 ▼
  sentiment+events       per-ticker per-day    pgvector docs
  +entities streams      structured parquet    alongside filings

+18 unit tests:
  - _rag_source prefix
  - _chunk_text combines title + longest body
  - _chunk_text handles missing title / all empty
  - _canonical_source deterministic alphabetical / single variant
  - Single-ticker happy path: embed + ingest called with right shape
  - Idempotency: document_exists short-circuits embed AND ingest
  - Empty / too-short bodies skipped (counter increments)
  - Multi-ticker: emits one doc per ticker
  - Multi-ticker per-ticker existence check (AAPL exists, MSFT new)
  - Sector lookup: ticker_to_sector passed through; missing -> None
  - dry_run mode skips embed/ingest but counts
  - Failure isolation: one bad doc continues batch
  - ingest returning None counts as failure not crash
  - Stats dict shape pinned to 6 canonical keys

Suite: 929 passing (1 skipped).

Composes with:
  - PR β (#226) — AggregatedNewsArticle input shape
  - PR A.1 (#227) — NewsNLPOutput shape (joined to article fingerprints)
  - PR A.2 (#228) — structured aggregates writer (parallel write path)
  - PR α (lib v0.15.0) — NewsArticle base shape
  - alpha_engine_lib.rag (embed_texts + document_exists + ingest_document)
  - data-revamp-260513.md plan doc

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant