From 986e2445dad4e5b22edc1c02d2325ec9f6ee2dcc Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 13 May 2026 10:57:10 -0700 Subject: [PATCH] =?UTF-8?q?feat(nlp):=20Wave=201=20PR=20A.1=20=E2=80=94=20?= =?UTF-8?q?news=20NLP=20pipeline=20(Loughran-McDonald=20+=20LLM=20event=20?= =?UTF-8?q?extraction)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wave 1 PR A.1 of the institutional data-revamp arc (plan doc: ~/Development/alpha-engine-docs/private/data-revamp-260513.md). Producer-side NLP layer that consumes AggregatedNewsArticle output from PR β's aggregator and emits three parallel structured streams: sentiment scores, entity mentions, and event flags. Output is ready for the PR A.2 parquet writer + PR A.3 RAG ingest pass. Architecture: each NLP dimension is a Protocol with one or more concrete implementations. The pipeline orchestrator (NewsNLPPipeline) composes them without knowing which concrete classes are wired — upgrade paths drop in as new adapter classes without touching the orchestrator or downstream consumers. New modules: collectors/nlp/protocols.py SentimentScore, EntityMention, EventFlag Pydantic shapes (frozen + extra='forbid' so parquet writer has stable column schema) SentimentScorer, EntityExtractor, EventExtractor Protocols (runtime_checkable; structural subtyping) collectors/nlp/loughran_mcdonald.py LoughranMcDonaldScorer — finance-domain dictionary sentiment, the academic gold standard (Loughran & McDonald 2011). Composite = (positive - negative) / total_tokens clipped to [-clip, +clip]. Uncertainty counted separately from polarity. load_lm_master_dict() CSV parser — tolerates missing file (logs warning, returns empty dict so production fails clearly). collectors/nlp/event_extraction.py AnthropicEventExtractor — Haiku-tier structured event extraction via tool_use API. Closed taxonomy of 18 event categories (DEFAULT_EVENT_CATEGORIES — earnings, M&A, IPO, management change, regulatory action, FDA action, etc.). Cost-tracked under agent_id='news_event_extractor'. Tolerates transient failures (returns empty) + malformed entries (drops single entry, keeps others). Handles tool_use.input as either dict or JSON string. collectors/nlp/pipeline.py NewsNLPPipeline(sentiment_scorers, entity_extractors, event_extractors).process(articles) → NewsNLPOutput. Composes any number of components per stream. Per-component exceptions are isolated — one broken scorer can't take down a batch. Article text uses canonical_title + longest body_excerpt across variants. scripts/download_lm_dict.py Operator script. Fetches the canonical Loughran-McDonald 2022 Master Dictionary CSV from Notre Dame's research page. Idempotent (overwrites). Pin via VCS commit if reproducibility matters. URL is overridable when the source moves. What's deferred to subsequent sub-PRs: PR A.1.1 — FinBERT scorer (HF transformer; heavier deps; lives in collectors/nlp/finbert.py as a drop-in) PR A.1.2 — spaCy NER entity extractor PR A.2 — Structured aggregates writer (S3 parquet per (ticker, date) joining sentiment + entity + event streams) PR A.3 — RAG ingest path (raw article text → chunked → embedded → pgvector alongside SEC filings) +42 unit tests: - Pydantic shape construction + frozen + extra='forbid' (3 shapes) - Protocol structural-subtyping (3 protocols, structural matches) - Tokenization (alphabetic only, lowercase, drops digits) - _truthy helper (year-stamp / 0 / blank / non-numeric) - LM scorer: pure positive / pure negative / balanced / dilution-by-neutral / uncertainty-counted-separately / empty-text / clipped-to-range / empty-dict-warns-and-yields-zero - load_lm_master_dict: canonical-CSV / missing-file-warns / blank-rows-skipped - Anthropic event extractor: tool-spec-includes-default-categories / happy-path / empty-text-skips-call / transient-LLM-failure / malformed-entry-dropped / tool_use-input-as-JSON-string / no-tool-use-block-returns-empty - Pipeline: empty / sentiment-per-article / multiple-scorers / scorer-exception-isolated / event-extractor-receives-tickers / empty-article-text-skipped / uses-longest-excerpt-across-variants Suite: 890 passing (1 skipped). Composes with: - PR α (alpha-engine-lib #46, v0.15.0) — NewsArticle Pydantic shape - PR β (this repo #226) — AggregatedNewsArticle input - data-revamp-260513.md plan doc — full 4-wave arc context Co-Authored-By: Claude Opus 4.7 (1M context) --- collectors/nlp/__init__.py | 64 ++++ collectors/nlp/event_extraction.py | 233 ++++++++++++ collectors/nlp/loughran_mcdonald.py | 195 ++++++++++ collectors/nlp/pipeline.py | 161 ++++++++ collectors/nlp/protocols.py | 173 +++++++++ scripts/download_lm_dict.py | 87 +++++ tests/test_nlp_pipeline.py | 553 ++++++++++++++++++++++++++++ 7 files changed, 1466 insertions(+) create mode 100644 collectors/nlp/__init__.py create mode 100644 collectors/nlp/event_extraction.py create mode 100644 collectors/nlp/loughran_mcdonald.py create mode 100644 collectors/nlp/pipeline.py create mode 100644 collectors/nlp/protocols.py create mode 100644 scripts/download_lm_dict.py create mode 100644 tests/test_nlp_pipeline.py diff --git a/collectors/nlp/__init__.py b/collectors/nlp/__init__.py new file mode 100644 index 0000000..0c235b0 --- /dev/null +++ b/collectors/nlp/__init__.py @@ -0,0 +1,64 @@ +"""News NLP pipeline — sentiment, entities, structured events. + +Wave 1 PR A.1 of the institutional data-revamp arc (plan doc: +``~/Development/alpha-engine-docs/private/data-revamp-260513.md``). + +Runs over ``AggregatedNewsArticle`` output from +``collectors/news_aggregator.py`` to produce structured per-(ticker, +date) aggregates that feed both the snapshot (PR A.2 — S3 parquet +writer) and the RAG corpus chunks (PR A.3). + +Three independent NLP components, each implementing a Protocol so +upgrade paths (FinBERT, spaCy transformer NER, larger LLMs) drop in +without changing the pipeline orchestrator: + + SentimentScorer: AggregatedNewsArticle → SentimentScore + EntityExtractor: AggregatedNewsArticle → list[EntityMention] + EventExtractor: AggregatedNewsArticle → list[EventFlag] + +Today's free-tier implementations: + + loughran_mcdonald.LoughranMcDonaldScorer — finance-domain dictionary + sentiment, the academic + gold standard + event_extraction.AnthropicEventExtractor — Haiku-tier structured + event flag extraction + (we already pay Anthropic) + +Heavier free upgrades that drop in as new adapter classes (Phase 3+): + + finbert.FinBERTScorer — HF yiyanghkust/finbert-tone + spacy_ner.SpacyEntityExtractor — en_core_web_sm or larger +""" + +from collectors.nlp.event_extraction import ( + AnthropicEventExtractor, + DEFAULT_EVENT_CATEGORIES, +) +from collectors.nlp.loughran_mcdonald import ( + LoughranMcDonaldScorer, + load_lm_master_dict, +) +from collectors.nlp.pipeline import NewsNLPPipeline +from collectors.nlp.protocols import ( + EntityExtractor, + EntityMention, + EventExtractor, + EventFlag, + SentimentScore, + SentimentScorer, +) + +__all__ = [ + "EntityMention", + "EntityExtractor", + "EventFlag", + "EventExtractor", + "SentimentScore", + "SentimentScorer", + "LoughranMcDonaldScorer", + "load_lm_master_dict", + "AnthropicEventExtractor", + "DEFAULT_EVENT_CATEGORIES", + "NewsNLPPipeline", +] diff --git a/collectors/nlp/event_extraction.py b/collectors/nlp/event_extraction.py new file mode 100644 index 0000000..ec6d9bf --- /dev/null +++ b/collectors/nlp/event_extraction.py @@ -0,0 +1,233 @@ +"""LLM-based structured event extraction (Anthropic Haiku-tier). + +Reads a news article + its associated tickers and emits a list of +structured ``EventFlag`` records, one per identified event. Uses +Anthropic's structured-output API (``tool_use``) to enforce the +schema at the model boundary — invalid outputs fail validation and +the article is logged + skipped rather than producing malformed data. + +Why LLM over rule-based regex / NER: + +- Finance events are highly heterogeneous in surface form. "Files + for IPO", "S-1 filing announced", "begins trading on NYSE today" + all describe the same IPO_FILING event. Maintaining regex rules + across this space is brittle and recall-bounded. +- We already pay Anthropic; Haiku-tier extraction is ~$0.001 per + article at typical lengths. +- Structured output via tool_use gives schema validation for free. + +Cost telemetry routes through the standard cost-tracking callback so +this extractor is billed under ``agent_id="news_event_extractor"``. + +Categories are a closed taxonomy (see DEFAULT_EVENT_CATEGORIES). The +extractor prompt names the full list — model returns at most one of +these per event. Open-vocabulary events ("management mood shift on +earnings call") map to the nearest category or are dropped. +""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone +from typing import Any + +from collectors.nlp.protocols import EventFlag + +logger = logging.getLogger(__name__) + + +DEFAULT_EVENT_CATEGORIES: tuple[str, ...] = ( + "earnings_release", # quarterly or annual earnings results + "earnings_guidance", # forward guidance update (raise/lower/maintain) + "merger_or_acquisition", # M&A announcement (any side) + "ipo_or_secondary", # IPO filing, secondary offering, direct listing + "spinoff_or_divestiture", + "management_change", # CEO/CFO/exec departure or appointment + "board_change", + "buyback_or_dividend", # capital return announcements + "regulatory_action", # SEC/DOJ/CFTC investigation, lawsuit + "fda_action", # drug approval, denial, recall, adverse events + "product_launch", + "partnership_or_contract", # major customer/supplier/JV deal + "credit_rating_change", + "analyst_action", # upgrade/downgrade/price-target change + "insider_transaction", # 10b5-1 sale, insider buying disclosure + "macro_or_sector", # company-tangential macro/sector commentary + "operational_disruption", # outage, cyberattack, supply-chain breakage + "other", # fallback — should be rare +) + + +# Tool spec for Anthropic structured-output. The schema mirrors the +# EventFlag Pydantic shape minus the fields the extractor doesn't fill +# (extractor name + article_fingerprint + extracted_at — those are +# stamped by the wrapper). +_EVENT_TOOL_NAME = "EmitEventFlags" + + +def _build_tool_spec( + categories: tuple[str, ...] = DEFAULT_EVENT_CATEGORIES, +) -> dict[str, Any]: + return { + "name": _EVENT_TOOL_NAME, + "description": ( + "Emit structured event flags for the news article. Use one " + "event per distinct material event. Return an empty list if " + "no events qualify." + ), + "input_schema": { + "type": "object", + "required": ["events"], + "properties": { + "events": { + "type": "array", + "items": { + "type": "object", + "required": ["category", "description", "tickers", "severity"], + "properties": { + "category": { + "type": "string", + "enum": list(categories), + }, + "description": {"type": "string"}, + "tickers": { + "type": "array", + "items": {"type": "string"}, + }, + "severity": { + "type": "number", + "minimum": 0, + "maximum": 1, + }, + }, + }, + }, + }, + }, + } + + +_SYSTEM_PROMPT = """You are a financial event extractor. + +Given a news article, identify each material event it reports and emit one +structured record per event via the EmitEventFlags tool. + +Severity guide: + 0.9-1.0 market-moving (M&A, FDA approval, major earnings miss/beat, + investigation announcement, CEO departure mid-cycle) + 0.6-0.8 meaningful (guidance change, analyst upgrade/downgrade + by major firm, dividend change, product launch in core market) + 0.3-0.5 routine (small partnerships, mid-tier analyst notes, + secondary product launches, scheduled events) + 0.0-0.2 background / atmospheric (macro commentary, peer mentions, + re-reports of stale events) + +Use the closed category taxonomy. If an event genuinely doesn't fit, +use 'other' — but prefer the closest category. + +Tickers should reflect WHICH companies the event directly concerns. +For a merger between A and B, list both. For an A-acquires-B with A +named in 1 ticker, list A only if B isn't tradeable. + +Return an empty events list if the article describes no material event +(e.g. pure macro commentary not tied to any single company).""" + + +class AnthropicEventExtractor: + """Haiku-tier structured event extraction. Implements ``EventExtractor``. + + ``client`` is the Anthropic SDK client. Tests inject a mock. Production + uses ``anthropic.Anthropic(api_key=...)``. + """ + + name = "anthropic_haiku" + + def __init__( + self, + client: Any, + *, + model: str = "claude-haiku-4-5", + max_tokens: int = 1024, + categories: tuple[str, ...] = DEFAULT_EVENT_CATEGORIES, + ) -> None: + self._client = client + self._model = model + self._max_tokens = max_tokens + self._categories = categories + self._tool_spec = _build_tool_spec(categories) + + def extract( + self, *, text: str, article_fingerprint: str, + article_tickers: tuple[str, ...], + ) -> list[EventFlag]: + if not text or not text.strip(): + return [] + try: + response = self._client.messages.create( + model=self._model, + max_tokens=self._max_tokens, + system=_SYSTEM_PROMPT, + tools=[self._tool_spec], + tool_choice={"type": "tool", "name": _EVENT_TOOL_NAME}, + messages=[{ + "role": "user", + "content": ( + f"Article tickers: {list(article_tickers)}\n\n" + f"Article text:\n{text}" + ), + }], + ) + except Exception as e: + logger.warning( + "[event_extraction] anthropic call failed for fingerprint " + "%s: %s", article_fingerprint, e, + ) + return [] + + events_payload = _extract_tool_input(response) + if events_payload is None: + return [] + + out: list[EventFlag] = [] + now = datetime.now(timezone.utc) + for entry in events_payload.get("events", []): + try: + out.append(EventFlag( + extractor=self.name, + article_fingerprint=article_fingerprint, + category=entry["category"], + description=entry["description"], + tickers=tuple(entry.get("tickers") or ()), + severity=float(entry.get("severity", 0.5)), + extracted_at=now, + )) + except Exception as e: + logger.warning( + "[event_extraction] dropping malformed event from " + "fingerprint %s: %s (entry=%r)", + article_fingerprint, e, entry, + ) + return out + + +def _extract_tool_input(response: Any) -> dict | None: + """Pull the EmitEventFlags tool's ``input`` dict out of the + Anthropic response. Anthropic's response.content is a list of + content blocks; we want the one with .type == 'tool_use'. + + Returns None if the response shape is unexpected (logged, not raised). + """ + try: + for block in (response.content or []): + if getattr(block, "type", None) == "tool_use": + if getattr(block, "name", None) == _EVENT_TOOL_NAME: + raw = block.input + if isinstance(raw, str): + return json.loads(raw) + return raw + except Exception as e: + logger.warning( + "[event_extraction] response parse error: %s", e + ) + return None diff --git a/collectors/nlp/loughran_mcdonald.py b/collectors/nlp/loughran_mcdonald.py new file mode 100644 index 0000000..0c94d9e --- /dev/null +++ b/collectors/nlp/loughran_mcdonald.py @@ -0,0 +1,195 @@ +"""Loughran-McDonald finance-domain sentiment scoring. + +The Loughran-McDonald (2011, Journal of Finance) sentiment dictionary +is the academic gold standard for finance-domain sentiment. General- +domain dictionaries (e.g. Harvard IV) systematically misclassify +finance vocabulary — "liability", "depreciation", "tax", "vice +president" all read as negative in general-domain but are neutral +finance terms. LM was built specifically on 10-K filings to correct +this. + +Dictionary categories (we use the first 3 for composite scoring): + + positive — clearly positive sentiment in finance context + negative — clearly negative sentiment in finance context + uncertainty — hedging language ("approximately", "perhaps") + litigious — legal-action vocabulary + modal_strong — assertive language ("must", "will") + modal_weak — hedged language ("may", "could") + constraining — restriction vocabulary ("required to", "limited") + +License: Loughran-McDonald Master Dictionary is freely available for +academic and research use from Bill McDonald's site at Notre Dame +(https://sraf.nd.edu/loughranmcdonald-master-dictionary/). Bundle the +canonical CSV under ``collectors/nlp/data/lm_master_dict.csv`` via +the operator-run ``scripts/download_lm_dict.py`` script. The constructor +accepts the loaded dict so tests use a synthetic 5-word fixture. + +Composite formula (standard LM-paper convention): + + composite = (positive_count - negative_count) / max(total_tokens, 1) + +clipped to [-1, +1]. Some practitioners normalize by (positive + +negative) for a "polarity-among-sentiment-words" reading; we use the +denominator-over-total convention because it preserves "neutral +articles get small magnitude" semantics that compose better with +trust-weighted aggregation. +""" + +from __future__ import annotations + +import csv +import logging +import re +from pathlib import Path + +from collectors.nlp.protocols import SentimentScore + +logger = logging.getLogger(__name__) + + +# ── Lexicon I/O ──────────────────────────────────────────────────────── + + +_DEFAULT_LM_PATH = ( + Path(__file__).parent / "data" / "lm_master_dict.csv" +) + + +def load_lm_master_dict( + path: Path | None = None, +) -> dict[str, dict[str, bool]]: + """Load the Loughran-McDonald Master Dictionary CSV into a per-word + category-flags dict. + + Returns a mapping like:: + + { + "good": {"positive": True, "negative": False, ...}, + "bad": {"positive": False, "negative": True, ...}, + "approximately": {"uncertainty": True, ...}, + ... + } + + The canonical CSV has the columns: Word, Sequence Number, Word + Count, Word Proportion, Average Proportion, Std Dev, Doc Count, + Negative, Positive, Uncertainty, Litigious, Strong_Modal, + Weak_Modal, Constraining, Syllables, Source. We only care about + Word + the category flag columns; the cell value is non-zero + (typically a year-stamp) when the word is in that category. + + Returns an empty dict if the file doesn't exist — caller should + fall back to a stub or fail loud per their own policy. Production + deploy must run ``scripts/download_lm_dict.py`` once. + """ + path = path or _DEFAULT_LM_PATH + if not path.exists(): + logger.warning( + "[loughran_mcdonald] master dict not found at %s — pipeline " + "will return all-zero sentiment. Run scripts/download_lm_dict.py.", + path, + ) + return {} + + out: dict[str, dict[str, bool]] = {} + with path.open("r", encoding="utf-8") as fh: + reader = csv.DictReader(fh) + for row in reader: + word = (row.get("Word") or "").strip().lower() + if not word: + continue + out[word] = { + "positive": _truthy(row.get("Positive")), + "negative": _truthy(row.get("Negative")), + "uncertainty": _truthy(row.get("Uncertainty")), + "litigious": _truthy(row.get("Litigious")), + "strong_modal": _truthy(row.get("Strong_Modal")), + "weak_modal": _truthy(row.get("Weak_Modal")), + "constraining": _truthy(row.get("Constraining")), + } + return out + + +def _truthy(cell: str | None) -> bool: + """LM CSV uses non-zero year-stamps as the truthy flag and 0 as + falsy. Tolerate empty strings.""" + if cell is None: + return False + cell = cell.strip() + if not cell: + return False + try: + return int(float(cell)) != 0 + except ValueError: + return False + + +# ── Tokenization ────────────────────────────────────────────────────── + + +_WORD_RE = re.compile(r"[A-Za-z]+") + + +def _tokenize(text: str) -> list[str]: + """Lowercase tokenization on alphabetic-only spans. + + LM's lexicon is alphabetic; numbers and punctuation contribute no + sentiment. Inline numerals + dates + currency symbols are + intentionally dropped at tokenization time.""" + return [m.group(0).lower() for m in _WORD_RE.finditer(text)] + + +# ── Scorer ───────────────────────────────────────────────────────────── + + +class LoughranMcDonaldScorer: + """LM-dict-based sentiment scorer. Implements ``SentimentScorer``.""" + + name = "loughran_mcdonald" + + def __init__( + self, + lm_dict: dict[str, dict[str, bool]] | None = None, + *, + composite_clip: float = 1.0, + ) -> None: + self._lm = lm_dict if lm_dict is not None else load_lm_master_dict() + self._clip = composite_clip + if not self._lm: + logger.warning( + "[loughran_mcdonald] initialized with empty dict — all " + "scores will be 0. Provide a dict via constructor or " + "place the canonical CSV at " + "collectors/nlp/data/lm_master_dict.csv." + ) + + def score(self, *, text: str, article_fingerprint: str) -> SentimentScore: + tokens = _tokenize(text) + total = len(tokens) + pos = neg = unc = 0 + for tok in tokens: + cats = self._lm.get(tok) + if cats is None: + continue + if cats.get("positive"): + pos += 1 + if cats.get("negative"): + neg += 1 + if cats.get("uncertainty"): + unc += 1 + + if total == 0: + composite = 0.0 + else: + raw = (pos - neg) / total + composite = max(-self._clip, min(self._clip, raw)) + + return SentimentScore( + scorer=self.name, + article_fingerprint=article_fingerprint, + composite=composite, + positive_word_count=pos, + negative_word_count=neg, + uncertainty_word_count=unc, + total_token_count=total, + ) diff --git a/collectors/nlp/pipeline.py b/collectors/nlp/pipeline.py new file mode 100644 index 0000000..a6a13db --- /dev/null +++ b/collectors/nlp/pipeline.py @@ -0,0 +1,161 @@ +"""News NLP pipeline orchestrator — composes scorers + extractors. + +Reads a list of ``AggregatedNewsArticle`` records from the aggregator +and produces three parallel output streams: + + sentiment_scores: one per (article, scorer) + entity_mentions: list (flat) across all articles + extractors + event_flags: list (flat) across all articles + extractors + +The orchestrator runs scorers/extractors independently per article; +each article's failure (transient LLM error, malformed text) drops +that article's contribution from the affected stream but doesn't fail +the batch — matches the producer-side "graceful degrade" policy. + +Output is structured-ready for the PR A.2 parquet writer (one row per +record per stream) and the PR A.3 RAG ingest pass. + +Cost telemetry: LLM-based components (event_extraction) emit cost +records under their own agent_id; the pipeline itself doesn't add +overhead. Run-level aggregate cost is summed from the cost telemetry +stream downstream. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Sequence + +from collectors.news_aggregator import AggregatedNewsArticle +from collectors.nlp.protocols import ( + EntityExtractor, + EntityMention, + EventExtractor, + EventFlag, + SentimentScore, + SentimentScorer, +) + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class NewsNLPOutput: + """Aggregate output across all articles in one pipeline run. + + Each stream is a flat list — downstream parquet writer pivots to + long-form rows; RAG-ingest path joins back to articles via the + ``article_fingerprint`` foreign key. + """ + + sentiment_scores: list[SentimentScore] = field(default_factory=list) + entity_mentions: list[EntityMention] = field(default_factory=list) + event_flags: list[EventFlag] = field(default_factory=list) + n_articles_processed: int = 0 + n_articles_failed: int = 0 + + +class NewsNLPPipeline: + """Compose sentiment scorers + entity extractors + event extractors. + + Multiple scorers may be wired (e.g. LoughranMcDonald + FinBERT for + ensemble) — each emits a per-article SentimentScore tagged with + its ``name``. Downstream aggregation chooses how to combine them + (or keeps them separate per scorer). + + Empty extractor lists are valid — the pipeline only runs the + components it's given. + """ + + def __init__( + self, + *, + sentiment_scorers: Sequence[SentimentScorer] = (), + entity_extractors: Sequence[EntityExtractor] = (), + event_extractors: Sequence[EventExtractor] = (), + ) -> None: + self._sentiment_scorers = tuple(sentiment_scorers) + self._entity_extractors = tuple(entity_extractors) + self._event_extractors = tuple(event_extractors) + + def process( + self, articles: Sequence[AggregatedNewsArticle], + ) -> NewsNLPOutput: + sentiment: list[SentimentScore] = [] + entities: list[EntityMention] = [] + events: list[EventFlag] = [] + n_processed = 0 + n_failed = 0 + + for article in articles: + text = _article_text(article) + if not text.strip(): + n_failed += 1 + continue + fp = article.canonical_fingerprint + + for scorer in self._sentiment_scorers: + try: + sentiment.append(scorer.score( + text=text, article_fingerprint=fp, + )) + except Exception as e: + logger.warning( + "[nlp_pipeline] %s sentiment failed on %s: %s", + scorer.name, fp, e, + ) + + for extractor in self._entity_extractors: + try: + entities.extend(extractor.extract( + text=text, article_fingerprint=fp, + )) + except Exception as e: + logger.warning( + "[nlp_pipeline] %s entity-extract failed on %s: %s", + extractor.name, fp, e, + ) + + for extractor in self._event_extractors: + try: + events.extend(extractor.extract( + text=text, article_fingerprint=fp, + article_tickers=article.tickers, + )) + except Exception as e: + logger.warning( + "[nlp_pipeline] %s event-extract failed on %s: %s", + extractor.name, fp, e, + ) + + n_processed += 1 + + logger.info( + "[nlp_pipeline] processed=%d failed=%d " + "sentiment_scores=%d entity_mentions=%d event_flags=%d", + n_processed, n_failed, + len(sentiment), len(entities), len(events), + ) + + return NewsNLPOutput( + sentiment_scores=sentiment, + entity_mentions=entities, + event_flags=events, + n_articles_processed=n_processed, + n_articles_failed=n_failed, + ) + + +def _article_text(article: AggregatedNewsArticle) -> str: + """Concatenate canonical title + the longest body_excerpt across + variants. Multiple vendors syndicating the same wire story can + contribute different excerpt lengths; we use the longest available + so the scorers/extractors have maximum text to work with.""" + longest_excerpt = "" + for v in article.variants: + excerpt = v.body_excerpt or "" + if len(excerpt) > len(longest_excerpt): + longest_excerpt = excerpt + pieces = [article.canonical_title or "", longest_excerpt] + return "\n\n".join(p for p in pieces if p) diff --git a/collectors/nlp/protocols.py b/collectors/nlp/protocols.py new file mode 100644 index 0000000..d20184a --- /dev/null +++ b/collectors/nlp/protocols.py @@ -0,0 +1,173 @@ +"""NLP pipeline Protocols + Pydantic output shapes. + +Component pattern: each NLP analysis dimension (sentiment, entities, +events) is a Protocol with one or more concrete implementations. The +pipeline orchestrator (``pipeline.NewsNLPPipeline``) composes them +without knowing which concrete classes are wired — upgrade paths +(FinBERT for sentiment, transformer NER for entities, larger LLMs for +events) drop in as new classes without touching the orchestrator or +the consumer side. + +Shapes are Pydantic with ``frozen=True`` + ``extra='forbid'`` so a +downstream Pandas/Parquet writer (PR A.2) can rely on a fixed column +schema. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Protocol, runtime_checkable + +from pydantic import BaseModel, ConfigDict, Field + + +# ── Sentiment ────────────────────────────────────────────────────────── + + +class SentimentScore(BaseModel): + """One sentiment-scorer's output for one article. + + Loughran-McDonald output populates ``positive``/``negative`` raw + counts + the normalized ``composite`` in [-1, +1]. Transformer- + based scorers (FinBERT) leave the raw counts None and fill + ``composite`` from the model's softmax. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + scorer: str = Field( + description="Scorer slug: 'loughran_mcdonald' | 'finbert' | " + "'vader'. Joins onto per-scorer aggregation rules " + "downstream — different scorers carry different " + "calibration." + ) + article_fingerprint: str = Field( + description="The aggregated article's canonical fingerprint " + "(from NewsAggregator) — joins back to the article " + "set without re-fingerprinting." + ) + composite: float = Field( + description="Normalized score in [-1, +1]. -1 = maximally " + "negative; +1 = maximally positive. 0 = neutral / " + "no signal." + ) + positive_word_count: int | None = Field( + default=None, + description="Loughran-McDonald-style raw positive-word count. " + "None for transformer scorers that don't expose it.", + ) + negative_word_count: int | None = Field( + default=None, + description="LM-style raw negative-word count.", + ) + uncertainty_word_count: int | None = Field( + default=None, + description="LM 'uncertainty' category — distinct from sentiment " + "polarity; useful for risk gating.", + ) + total_token_count: int | None = Field( + default=None, + description="Total tokens analyzed (denominator for word-count " + "ratios).", + ) + + +@runtime_checkable +class SentimentScorer(Protocol): + """Sentiment-scorer Protocol. + + Implementations: lexicon (Loughran-McDonald, VADER), transformer + (FinBERT, FinBERT-tone), LLM (Anthropic Haiku via structured- + output). Pipeline can compose multiple scorers for ensemble. + """ + + name: str + + def score(self, *, text: str, article_fingerprint: str) -> SentimentScore: ... + + +# ── Entities ─────────────────────────────────────────────────────────── + + +class EntityMention(BaseModel): + """One entity surfaced from an article.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + extractor: str = Field( + description="Extractor slug: 'regex_ticker' | 'spacy_en' | " + "'spacy_trf' | 'llm_haiku'." + ) + article_fingerprint: str + text: str = Field(description="Surface form as it appeared in the article.") + label: str = Field( + description="Entity type: 'ORG' | 'PERSON' | 'GPE' | 'MONEY' | " + "'PERCENT' | 'TICKER' | 'PRODUCT' | etc." + ) + canonical_ticker: str | None = Field( + default=None, + description="When the entity resolves to a tradeable ticker via " + "the ticker→name map, the canonical exchange symbol. " + "Otherwise None.", + ) + + +@runtime_checkable +class EntityExtractor(Protocol): + name: str + + def extract( + self, *, text: str, article_fingerprint: str, + ) -> list[EntityMention]: ... + + +# ── Events ───────────────────────────────────────────────────────────── + + +class EventFlag(BaseModel): + """One structured event flagged from the article.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + extractor: str = Field( + description="Extractor slug: 'anthropic_haiku' | " + "'gdelt_native' (when we use vendor-native event " + "codes) | 'rule_based'." + ) + article_fingerprint: str + category: str = Field( + description="Event category from a closed taxonomy (see " + "event_extraction.DEFAULT_EVENT_CATEGORIES). " + "Categorical for downstream aggregation by type." + ) + description: str = Field( + description="Free-text 1-sentence description of the specific " + "event ('SEC investigation announced into accounting " + "practices'). Indexable into the RAG corpus for " + "downstream agent retrieval.", + ) + tickers: tuple[str, ...] = Field( + default_factory=tuple, + description="Tickers this event directly concerns. May be a " + "subset of the article's tickers — e.g. an M&A " + "article between A and B may flag only A as the " + "acquirer." + ) + severity: float = Field( + default=0.5, + description="Subjective importance in [0, 1]. Used by downstream " + "alerting + thesis_update trigger weights.", + ) + extracted_at: datetime = Field( + description="UTC timestamp when the extractor produced this flag." + ) + + +@runtime_checkable +class EventExtractor(Protocol): + name: str + + def extract( + self, *, text: str, article_fingerprint: str, + article_tickers: tuple[str, ...], + ) -> list[EventFlag]: ... diff --git a/scripts/download_lm_dict.py b/scripts/download_lm_dict.py new file mode 100644 index 0000000..4ef9d0b --- /dev/null +++ b/scripts/download_lm_dict.py @@ -0,0 +1,87 @@ +"""Download the Loughran-McDonald Master Dictionary CSV. + +Run once at install time. The canonical source is Bill McDonald's +research page at Notre Dame: +https://sraf.nd.edu/loughranmcdonald-master-dictionary/ + +The CSV is freely available for academic and research use. + +Usage:: + + python scripts/download_lm_dict.py + # writes to collectors/nlp/data/lm_master_dict.csv + +The URL has changed between LM dict revisions historically; the +operator may need to update ``LM_DICT_URL`` if Notre Dame relocates +the file. Validate the download by sampling: the CSV should have +column "Word" and rows like "good" with Positive flag = a year +(non-zero) and Negative flag = 0. + +Idempotent: the script writes to a fixed path; re-running overwrites +with the latest version. Pin via vcs commit if reproducibility is a +concern. +""" + +from __future__ import annotations + +import argparse +import sys +import urllib.request +from pathlib import Path + + +# Update this URL when Bill McDonald rotates the dict version. As of +# 2026-05-13 the latest stable release is the 2022 Master Dictionary. +# The redirect-friendly URL pattern is preferred over the direct +# Dropbox link which has changed multiple times. +LM_DICT_URL = ( + "https://drive.google.com/uc?export=download&id=" + "17CmUZM9hGUdGUkXKZpaSPMUNVxlgnVfP" + # ^ 2022 Master Dictionary; if this 404s, fetch the latest URL from + # https://sraf.nd.edu/loughranmcdonald-master-dictionary/ +) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--out", type=Path, + default=Path(__file__).resolve().parent.parent + / "collectors" / "nlp" / "data" / "lm_master_dict.csv", + help="Destination CSV path (default: bundled location).", + ) + parser.add_argument( + "--url", type=str, default=LM_DICT_URL, + help="Override the source URL if Notre Dame relocates the dict.", + ) + args = parser.parse_args() + + args.out.parent.mkdir(parents=True, exist_ok=True) + + print(f"Fetching from {args.url}") + print(f"Writing to {args.out}") + try: + urllib.request.urlretrieve(args.url, args.out) + except Exception as e: + print(f"ERROR: download failed: {e}", file=sys.stderr) + print( + "If the URL has rotated, fetch the latest from " + "https://sraf.nd.edu/loughranmcdonald-master-dictionary/ " + "and pass --url.", + file=sys.stderr, + ) + return 1 + + size = args.out.stat().st_size + print(f"Wrote {size:,} bytes to {args.out}") + if size < 1_000_000: + print( + "WARNING: file is suspiciously small — the canonical LM " + "Master Dictionary is ~10MB. Verify the source URL.", + file=sys.stderr, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_nlp_pipeline.py b/tests/test_nlp_pipeline.py new file mode 100644 index 0000000..c34342b --- /dev/null +++ b/tests/test_nlp_pipeline.py @@ -0,0 +1,553 @@ +"""Tests for the news NLP pipeline (Wave 1 PR A.1). + +Covers: + - Pydantic shapes (frozen + extra='forbid') + - Protocol structural-subtyping for SentimentScorer / EntityExtractor / EventExtractor + - Loughran-McDonald scorer (tokenization + composite formula + edge cases) + - load_lm_master_dict CSV parser + - Anthropic event extractor (tool_use parsing + transient failure + malformed entries) + - NewsNLPPipeline orchestrator (composition + graceful degrade) +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from pydantic import ValidationError + +from alpha_engine_lib.sources import NewsArticle + +from collectors.news_aggregator import AggregatedNewsArticle +from collectors.nlp.event_extraction import ( + DEFAULT_EVENT_CATEGORIES, + AnthropicEventExtractor, + _build_tool_spec, +) +from collectors.nlp.loughran_mcdonald import ( + LoughranMcDonaldScorer, + _tokenize, + _truthy, + load_lm_master_dict, +) +from collectors.nlp.pipeline import NewsNLPPipeline, NewsNLPOutput +from collectors.nlp.protocols import ( + EntityExtractor, + EntityMention, + EventExtractor, + EventFlag, + SentimentScore, + SentimentScorer, +) + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +# ── Pydantic shapes ──────────────────────────────────────────────────── + + +class TestSentimentScoreShape: + def test_construction(self): + s = SentimentScore( + scorer="loughran_mcdonald", + article_fingerprint="abc123", + composite=0.25, + positive_word_count=4, + negative_word_count=2, + total_token_count=80, + ) + assert s.composite == 0.25 + assert s.uncertainty_word_count is None + + def test_frozen(self): + s = SentimentScore( + scorer="x", article_fingerprint="abc", composite=0.0, + ) + with pytest.raises(ValidationError): + s.composite = 0.5 # type: ignore[misc] + + def test_extra_forbidden(self): + with pytest.raises(ValidationError, match="Extra inputs are not"): + SentimentScore( + scorer="x", article_fingerprint="abc", composite=0.0, + some_unknown_field="oops", # type: ignore[call-arg] + ) + + +class TestEntityMentionShape: + def test_construction(self): + e = EntityMention( + extractor="regex_ticker", + article_fingerprint="abc", + text="NVDA", + label="TICKER", + canonical_ticker="NVDA", + ) + assert e.canonical_ticker == "NVDA" + + +class TestEventFlagShape: + def test_construction(self): + f = EventFlag( + extractor="anthropic_haiku", + article_fingerprint="abc", + category="merger_or_acquisition", + description="Acquirer announces all-stock deal.", + tickers=("AAPL",), + severity=0.9, + extracted_at=_now(), + ) + assert f.severity == 0.9 + assert f.tickers == ("AAPL",) + + def test_severity_default(self): + f = EventFlag( + extractor="x", article_fingerprint="a", + category="other", description="d", + extracted_at=_now(), + ) + assert f.severity == 0.5 + + +# ── Protocol structural subtyping ────────────────────────────────────── + + +class TestProtocolSubtyping: + def test_lm_scorer_satisfies_sentiment_protocol(self): + assert isinstance(LoughranMcDonaldScorer(lm_dict={}), SentimentScorer) + + def test_anthropic_extractor_satisfies_event_protocol(self): + extractor = AnthropicEventExtractor(client=MagicMock()) + assert isinstance(extractor, EventExtractor) + + def test_entity_protocol_structural_match(self): + class FakeExtractor: + name = "fake" + + def extract(self, *, text, article_fingerprint): + return [] + + assert isinstance(FakeExtractor(), EntityExtractor) + + +# ── LM tokenization + composite ──────────────────────────────────────── + + +class TestTokenization: + def test_alphabetic_only_lowercased(self): + assert _tokenize("The Stock Rose 5% in Q4!") == [ + "the", "stock", "rose", "in", "q", + ] + + def test_empty_text(self): + assert _tokenize("") == [] + + def test_dropped_punctuation_and_digits(self): + # Numbers + currency drop out + toks = _tokenize("revenue $1.2B beat estimates") + assert toks == ["revenue", "b", "beat", "estimates"] + + +class TestTruthyHelper: + def test_year_stamp_truthy(self): + assert _truthy("2009") is True + + def test_zero_falsy(self): + assert _truthy("0") is False + + def test_empty_string_falsy(self): + assert _truthy("") is False + + def test_none_falsy(self): + assert _truthy(None) is False + + def test_non_numeric_falsy(self): + assert _truthy("abc") is False + + +class TestLoughranMcDonaldScorer: + SYNTHETIC_DICT = { + "good": {"positive": True, "negative": False, "uncertainty": False}, + "great": {"positive": True, "negative": False, "uncertainty": False}, + "bad": {"positive": False, "negative": True, "uncertainty": False}, + "loss": {"positive": False, "negative": True, "uncertainty": False}, + "approximately": {"positive": False, "negative": False, "uncertainty": True}, + } + + def test_pure_positive(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="good good great", article_fingerprint="fp1", + ) + assert s.positive_word_count == 3 + assert s.negative_word_count == 0 + assert s.composite == pytest.approx(1.0) + + def test_pure_negative(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="bad bad loss", article_fingerprint="fp1", + ) + assert s.negative_word_count == 3 + assert s.composite == pytest.approx(-1.0) + + def test_balanced_yields_zero(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="good bad", article_fingerprint="fp1", + ) + assert s.composite == 0.0 + + def test_dilution_by_neutral_words(self): + # 1 positive in 5 tokens — composite = 1/5 = 0.2 + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="the quarterly report was good overall", + article_fingerprint="fp1", + ) + assert s.positive_word_count == 1 + assert s.total_token_count == 6 + assert s.composite == pytest.approx(1 / 6) + + def test_uncertainty_counted_separately_from_polarity(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="results approximately matched estimates", + article_fingerprint="fp1", + ) + assert s.uncertainty_word_count == 1 + assert s.positive_word_count == 0 + assert s.negative_word_count == 0 + assert s.composite == 0.0 + + def test_empty_text_returns_zero(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="", article_fingerprint="fp1", + ) + assert s.composite == 0.0 + assert s.total_token_count == 0 + + def test_clipped_to_range(self): + # With clip=0.5, all-positive text caps at 0.5 + s = LoughranMcDonaldScorer( + lm_dict=self.SYNTHETIC_DICT, composite_clip=0.5, + ).score(text="good great", article_fingerprint="fp1") + assert s.composite == 0.5 + + def test_empty_dict_warns_and_yields_zero(self, caplog): + with caplog.at_level("WARNING"): + scorer = LoughranMcDonaldScorer(lm_dict={}) + assert any("empty dict" in r.message for r in caplog.records) + s = scorer.score(text="good great", article_fingerprint="fp1") + assert s.composite == 0.0 + + +# ── load_lm_master_dict CSV parsing ──────────────────────────────────── + + +class TestLoadLmMasterDict: + def test_parses_canonical_csv_format(self, tmp_path: Path): + csv = tmp_path / "lm.csv" + csv.write_text( + "Word,Sequence Number,Negative,Positive,Uncertainty," + "Litigious,Strong_Modal,Weak_Modal,Constraining," + "Syllables,Source\n" + "good,1,0,2009,0,0,0,0,0,1,LM\n" + "bad,2,2009,0,0,0,0,0,0,1,LM\n" + "approximately,3,0,0,2009,0,0,0,0,4,LM\n" + ) + out = load_lm_master_dict(csv) + assert out["good"]["positive"] is True + assert out["good"]["negative"] is False + assert out["bad"]["negative"] is True + assert out["approximately"]["uncertainty"] is True + + def test_missing_file_returns_empty_dict_with_warning(self, tmp_path, caplog): + with caplog.at_level("WARNING"): + out = load_lm_master_dict(tmp_path / "missing.csv") + assert out == {} + assert any("not found" in r.message for r in caplog.records) + + def test_blank_rows_skipped(self, tmp_path: Path): + csv = tmp_path / "lm.csv" + csv.write_text( + "Word,Negative,Positive\n" + ",0,0\n" # blank word + "good,0,2009\n" + ) + out = load_lm_master_dict(csv) + assert "good" in out + assert "" not in out + + +# ── Anthropic event extractor ────────────────────────────────────────── + + +def _make_tool_use_response(events: list[dict]) -> object: + """Build a mock Anthropic response with a tool_use content block.""" + block = MagicMock() + block.type = "tool_use" + block.name = "EmitEventFlags" + block.input = {"events": events} + response = MagicMock() + response.content = [block] + return response + + +class TestAnthropicEventExtractor: + def test_tool_spec_built_with_default_categories(self): + spec = _build_tool_spec() + cats = spec["input_schema"]["properties"]["events"]["items"][ + "properties" + ]["category"]["enum"] + # All categories present in the enum + for cat in DEFAULT_EVENT_CATEGORIES: + assert cat in cats + + def test_happy_path_parses_events(self): + client = MagicMock() + client.messages.create.return_value = _make_tool_use_response([ + { + "category": "merger_or_acquisition", + "description": "Acquirer announces all-stock deal for X.", + "tickers": ["AAPL"], + "severity": 0.9, + }, + ]) + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="Apple announces acquisition...", + article_fingerprint="fp1", + article_tickers=("AAPL",), + ) + assert len(out) == 1 + assert out[0].category == "merger_or_acquisition" + assert out[0].severity == 0.9 + + def test_empty_text_short_circuits_without_calling_llm(self): + client = MagicMock() + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="", article_fingerprint="fp1", article_tickers=(), + ) + assert out == [] + client.messages.create.assert_not_called() + + def test_transient_llm_failure_returns_empty(self): + client = MagicMock() + client.messages.create.side_effect = RuntimeError("anthropic 500") + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="some text", article_fingerprint="fp1", article_tickers=(), + ) + assert out == [] + + def test_malformed_event_entry_dropped_others_kept(self): + client = MagicMock() + client.messages.create.return_value = _make_tool_use_response([ + {"category": "earnings_release"}, # missing required description + { + "category": "earnings_release", + "description": "Q4 results released.", + "tickers": ["AAPL"], + "severity": 0.6, + }, + ]) + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="x", article_fingerprint="fp1", article_tickers=("AAPL",), + ) + # Malformed entry dropped; good entry kept + assert len(out) == 1 + assert out[0].description == "Q4 results released." + + def test_tool_use_input_as_json_string(self): + """Anthropic SDK can return tool_use.input as either a dict or + a JSON string depending on stream-vs-message mode. Tolerate + both.""" + client = MagicMock() + block = MagicMock() + block.type = "tool_use" + block.name = "EmitEventFlags" + block.input = json.dumps({"events": [{ + "category": "other", "description": "x", + "tickers": [], "severity": 0.1, + }]}) + response = MagicMock() + response.content = [block] + client.messages.create.return_value = response + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="x", article_fingerprint="fp1", article_tickers=(), + ) + assert len(out) == 1 + assert out[0].category == "other" + + def test_no_tool_use_block_returns_empty(self): + client = MagicMock() + response = MagicMock() + text_block = MagicMock() + text_block.type = "text" + response.content = [text_block] + client.messages.create.return_value = response + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="x", article_fingerprint="fp1", article_tickers=(), + ) + assert out == [] + + +# ── Pipeline orchestrator ────────────────────────────────────────────── + + +def _make_aggregated( + fingerprint: str = "fp1", + title: str = "Apple Q4 results", + body: str = "Apple reported strong results.", + tickers: tuple[str, ...] = ("AAPL",), +) -> AggregatedNewsArticle: + variant = NewsArticle( + tickers=tickers, title=title, body_excerpt=body, + url="https://x.com/a", published_at=_now(), + source="polygon", fetched_at=_now(), + ) + return AggregatedNewsArticle( + canonical_title=title, + canonical_url="https://x.com/a", + tickers=tickers, + earliest_published_at=_now(), + variants=(variant,), + canonical_fingerprint=fingerprint, + ) + + +class TestPipelineOrchestrator: + def _make_lm_scorer(self): + return LoughranMcDonaldScorer(lm_dict={ + "strong": {"positive": True, "negative": False, "uncertainty": False}, + "results": {"positive": False, "negative": False, "uncertainty": False}, + "weak": {"positive": False, "negative": True, "uncertainty": False}, + }) + + def test_empty_pipeline_returns_empty_output(self): + pipeline = NewsNLPPipeline() + out = pipeline.process([_make_aggregated()]) + assert isinstance(out, NewsNLPOutput) + assert out.sentiment_scores == [] + assert out.entity_mentions == [] + assert out.event_flags == [] + assert out.n_articles_processed == 1 + + def test_sentiment_scorer_runs_per_article(self): + pipeline = NewsNLPPipeline( + sentiment_scorers=[self._make_lm_scorer()], + ) + out = pipeline.process([ + _make_aggregated( + fingerprint="a", title="strong results", + body="firm reported strong execution", + ), + _make_aggregated( + fingerprint="b", title="weak results", + body="firm reported weak execution", + ), + ]) + assert len(out.sentiment_scores) == 2 + by_fp = {s.article_fingerprint: s for s in out.sentiment_scores} + assert by_fp["a"].composite > 0 + assert by_fp["b"].composite < 0 + + def test_multiple_scorers_emit_per_scorer_per_article(self): + scorer1 = self._make_lm_scorer() + scorer2 = MagicMock(spec=["name", "score"]) + scorer2.name = "fake_other" + scorer2.score.return_value = SentimentScore( + scorer="fake_other", article_fingerprint="dummy", + composite=0.42, + ) + pipeline = NewsNLPPipeline(sentiment_scorers=[scorer1, scorer2]) + out = pipeline.process([_make_aggregated(fingerprint="a")]) + # 1 article × 2 scorers = 2 scores + assert len(out.sentiment_scores) == 2 + names = {s.scorer for s in out.sentiment_scores} + assert names == {"loughran_mcdonald", "fake_other"} + + def test_scorer_exception_skips_that_score_but_continues(self): + bad_scorer = MagicMock(spec=["name", "score"]) + bad_scorer.name = "broken" + bad_scorer.score.side_effect = RuntimeError("boom") + pipeline = NewsNLPPipeline( + sentiment_scorers=[bad_scorer, self._make_lm_scorer()], + ) + out = pipeline.process([_make_aggregated(fingerprint="a", title="strong")]) + # Only the LM score makes it through + assert len(out.sentiment_scores) == 1 + assert out.sentiment_scores[0].scorer == "loughran_mcdonald" + + def test_event_extractor_receives_article_tickers(self): + extractor = MagicMock(spec=["name", "extract"]) + extractor.name = "fake_event" + extractor.extract.return_value = [EventFlag( + extractor="fake_event", article_fingerprint="a", + category="earnings_release", description="d", + tickers=("AAPL",), severity=0.5, extracted_at=_now(), + )] + pipeline = NewsNLPPipeline(event_extractors=[extractor]) + out = pipeline.process([ + _make_aggregated(fingerprint="a", tickers=("AAPL", "NVDA")), + ]) + # Pipeline passes article tickers through verbatim + kwargs = extractor.extract.call_args.kwargs + assert kwargs["article_tickers"] == ("AAPL", "NVDA") + assert kwargs["article_fingerprint"] == "a" + assert len(out.event_flags) == 1 + + def test_empty_article_text_skipped_as_failed(self): + # Construct an aggregated article with no title or body + variant = NewsArticle( + tickers=("AAPL",), title="", body_excerpt="", + url="https://x.com/a", published_at=_now(), + source="polygon", fetched_at=_now(), + ) + agg = AggregatedNewsArticle( + canonical_title="", + canonical_url="https://x.com/a", + tickers=("AAPL",), + earliest_published_at=_now(), + variants=(variant,), + canonical_fingerprint="empty", + ) + pipeline = NewsNLPPipeline( + sentiment_scorers=[self._make_lm_scorer()], + ) + out = pipeline.process([agg]) + assert out.n_articles_failed == 1 + assert out.n_articles_processed == 0 + assert out.sentiment_scores == [] + + def test_pipeline_uses_longest_excerpt_across_variants(self): + short = NewsArticle( + tickers=("AAPL",), title="t", body_excerpt="brief", + url="https://x.com/a", published_at=_now(), + source="polygon", fetched_at=_now(), + ) + long = NewsArticle( + tickers=("AAPL",), title="t", + body_excerpt="a much longer body containing strong words", + url="https://x.com/a", published_at=_now(), + source="gdelt", fetched_at=_now(), + ) + agg = AggregatedNewsArticle( + canonical_title="t", + canonical_url="https://x.com/a", + tickers=("AAPL",), + earliest_published_at=_now(), + variants=(short, long), + canonical_fingerprint="fp", + ) + pipeline = NewsNLPPipeline( + sentiment_scorers=[self._make_lm_scorer()], + ) + out = pipeline.process([agg]) + # The longer body has "strong" — should produce positive composite + assert out.sentiment_scores[0].composite > 0