diff --git a/collectors/nlp/__init__.py b/collectors/nlp/__init__.py new file mode 100644 index 0000000..0c235b0 --- /dev/null +++ b/collectors/nlp/__init__.py @@ -0,0 +1,64 @@ +"""News NLP pipeline — sentiment, entities, structured events. + +Wave 1 PR A.1 of the institutional data-revamp arc (plan doc: +``~/Development/alpha-engine-docs/private/data-revamp-260513.md``). + +Runs over ``AggregatedNewsArticle`` output from +``collectors/news_aggregator.py`` to produce structured per-(ticker, +date) aggregates that feed both the snapshot (PR A.2 — S3 parquet +writer) and the RAG corpus chunks (PR A.3). + +Three independent NLP components, each implementing a Protocol so +upgrade paths (FinBERT, spaCy transformer NER, larger LLMs) drop in +without changing the pipeline orchestrator: + + SentimentScorer: AggregatedNewsArticle → SentimentScore + EntityExtractor: AggregatedNewsArticle → list[EntityMention] + EventExtractor: AggregatedNewsArticle → list[EventFlag] + +Today's free-tier implementations: + + loughran_mcdonald.LoughranMcDonaldScorer — finance-domain dictionary + sentiment, the academic + gold standard + event_extraction.AnthropicEventExtractor — Haiku-tier structured + event flag extraction + (we already pay Anthropic) + +Heavier free upgrades that drop in as new adapter classes (Phase 3+): + + finbert.FinBERTScorer — HF yiyanghkust/finbert-tone + spacy_ner.SpacyEntityExtractor — en_core_web_sm or larger +""" + +from collectors.nlp.event_extraction import ( + AnthropicEventExtractor, + DEFAULT_EVENT_CATEGORIES, +) +from collectors.nlp.loughran_mcdonald import ( + LoughranMcDonaldScorer, + load_lm_master_dict, +) +from collectors.nlp.pipeline import NewsNLPPipeline +from collectors.nlp.protocols import ( + EntityExtractor, + EntityMention, + EventExtractor, + EventFlag, + SentimentScore, + SentimentScorer, +) + +__all__ = [ + "EntityMention", + "EntityExtractor", + "EventFlag", + "EventExtractor", + "SentimentScore", + "SentimentScorer", + "LoughranMcDonaldScorer", + "load_lm_master_dict", + "AnthropicEventExtractor", + "DEFAULT_EVENT_CATEGORIES", + "NewsNLPPipeline", +] diff --git a/collectors/nlp/event_extraction.py b/collectors/nlp/event_extraction.py new file mode 100644 index 0000000..ec6d9bf --- /dev/null +++ b/collectors/nlp/event_extraction.py @@ -0,0 +1,233 @@ +"""LLM-based structured event extraction (Anthropic Haiku-tier). + +Reads a news article + its associated tickers and emits a list of +structured ``EventFlag`` records, one per identified event. Uses +Anthropic's structured-output API (``tool_use``) to enforce the +schema at the model boundary — invalid outputs fail validation and +the article is logged + skipped rather than producing malformed data. + +Why LLM over rule-based regex / NER: + +- Finance events are highly heterogeneous in surface form. "Files + for IPO", "S-1 filing announced", "begins trading on NYSE today" + all describe the same IPO_FILING event. Maintaining regex rules + across this space is brittle and recall-bounded. +- We already pay Anthropic; Haiku-tier extraction is ~$0.001 per + article at typical lengths. +- Structured output via tool_use gives schema validation for free. + +Cost telemetry routes through the standard cost-tracking callback so +this extractor is billed under ``agent_id="news_event_extractor"``. + +Categories are a closed taxonomy (see DEFAULT_EVENT_CATEGORIES). The +extractor prompt names the full list — model returns at most one of +these per event. Open-vocabulary events ("management mood shift on +earnings call") map to the nearest category or are dropped. +""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone +from typing import Any + +from collectors.nlp.protocols import EventFlag + +logger = logging.getLogger(__name__) + + +DEFAULT_EVENT_CATEGORIES: tuple[str, ...] = ( + "earnings_release", # quarterly or annual earnings results + "earnings_guidance", # forward guidance update (raise/lower/maintain) + "merger_or_acquisition", # M&A announcement (any side) + "ipo_or_secondary", # IPO filing, secondary offering, direct listing + "spinoff_or_divestiture", + "management_change", # CEO/CFO/exec departure or appointment + "board_change", + "buyback_or_dividend", # capital return announcements + "regulatory_action", # SEC/DOJ/CFTC investigation, lawsuit + "fda_action", # drug approval, denial, recall, adverse events + "product_launch", + "partnership_or_contract", # major customer/supplier/JV deal + "credit_rating_change", + "analyst_action", # upgrade/downgrade/price-target change + "insider_transaction", # 10b5-1 sale, insider buying disclosure + "macro_or_sector", # company-tangential macro/sector commentary + "operational_disruption", # outage, cyberattack, supply-chain breakage + "other", # fallback — should be rare +) + + +# Tool spec for Anthropic structured-output. The schema mirrors the +# EventFlag Pydantic shape minus the fields the extractor doesn't fill +# (extractor name + article_fingerprint + extracted_at — those are +# stamped by the wrapper). +_EVENT_TOOL_NAME = "EmitEventFlags" + + +def _build_tool_spec( + categories: tuple[str, ...] = DEFAULT_EVENT_CATEGORIES, +) -> dict[str, Any]: + return { + "name": _EVENT_TOOL_NAME, + "description": ( + "Emit structured event flags for the news article. Use one " + "event per distinct material event. Return an empty list if " + "no events qualify." + ), + "input_schema": { + "type": "object", + "required": ["events"], + "properties": { + "events": { + "type": "array", + "items": { + "type": "object", + "required": ["category", "description", "tickers", "severity"], + "properties": { + "category": { + "type": "string", + "enum": list(categories), + }, + "description": {"type": "string"}, + "tickers": { + "type": "array", + "items": {"type": "string"}, + }, + "severity": { + "type": "number", + "minimum": 0, + "maximum": 1, + }, + }, + }, + }, + }, + }, + } + + +_SYSTEM_PROMPT = """You are a financial event extractor. + +Given a news article, identify each material event it reports and emit one +structured record per event via the EmitEventFlags tool. + +Severity guide: + 0.9-1.0 market-moving (M&A, FDA approval, major earnings miss/beat, + investigation announcement, CEO departure mid-cycle) + 0.6-0.8 meaningful (guidance change, analyst upgrade/downgrade + by major firm, dividend change, product launch in core market) + 0.3-0.5 routine (small partnerships, mid-tier analyst notes, + secondary product launches, scheduled events) + 0.0-0.2 background / atmospheric (macro commentary, peer mentions, + re-reports of stale events) + +Use the closed category taxonomy. If an event genuinely doesn't fit, +use 'other' — but prefer the closest category. + +Tickers should reflect WHICH companies the event directly concerns. +For a merger between A and B, list both. For an A-acquires-B with A +named in 1 ticker, list A only if B isn't tradeable. + +Return an empty events list if the article describes no material event +(e.g. pure macro commentary not tied to any single company).""" + + +class AnthropicEventExtractor: + """Haiku-tier structured event extraction. Implements ``EventExtractor``. + + ``client`` is the Anthropic SDK client. Tests inject a mock. Production + uses ``anthropic.Anthropic(api_key=...)``. + """ + + name = "anthropic_haiku" + + def __init__( + self, + client: Any, + *, + model: str = "claude-haiku-4-5", + max_tokens: int = 1024, + categories: tuple[str, ...] = DEFAULT_EVENT_CATEGORIES, + ) -> None: + self._client = client + self._model = model + self._max_tokens = max_tokens + self._categories = categories + self._tool_spec = _build_tool_spec(categories) + + def extract( + self, *, text: str, article_fingerprint: str, + article_tickers: tuple[str, ...], + ) -> list[EventFlag]: + if not text or not text.strip(): + return [] + try: + response = self._client.messages.create( + model=self._model, + max_tokens=self._max_tokens, + system=_SYSTEM_PROMPT, + tools=[self._tool_spec], + tool_choice={"type": "tool", "name": _EVENT_TOOL_NAME}, + messages=[{ + "role": "user", + "content": ( + f"Article tickers: {list(article_tickers)}\n\n" + f"Article text:\n{text}" + ), + }], + ) + except Exception as e: + logger.warning( + "[event_extraction] anthropic call failed for fingerprint " + "%s: %s", article_fingerprint, e, + ) + return [] + + events_payload = _extract_tool_input(response) + if events_payload is None: + return [] + + out: list[EventFlag] = [] + now = datetime.now(timezone.utc) + for entry in events_payload.get("events", []): + try: + out.append(EventFlag( + extractor=self.name, + article_fingerprint=article_fingerprint, + category=entry["category"], + description=entry["description"], + tickers=tuple(entry.get("tickers") or ()), + severity=float(entry.get("severity", 0.5)), + extracted_at=now, + )) + except Exception as e: + logger.warning( + "[event_extraction] dropping malformed event from " + "fingerprint %s: %s (entry=%r)", + article_fingerprint, e, entry, + ) + return out + + +def _extract_tool_input(response: Any) -> dict | None: + """Pull the EmitEventFlags tool's ``input`` dict out of the + Anthropic response. Anthropic's response.content is a list of + content blocks; we want the one with .type == 'tool_use'. + + Returns None if the response shape is unexpected (logged, not raised). + """ + try: + for block in (response.content or []): + if getattr(block, "type", None) == "tool_use": + if getattr(block, "name", None) == _EVENT_TOOL_NAME: + raw = block.input + if isinstance(raw, str): + return json.loads(raw) + return raw + except Exception as e: + logger.warning( + "[event_extraction] response parse error: %s", e + ) + return None diff --git a/collectors/nlp/loughran_mcdonald.py b/collectors/nlp/loughran_mcdonald.py new file mode 100644 index 0000000..0c94d9e --- /dev/null +++ b/collectors/nlp/loughran_mcdonald.py @@ -0,0 +1,195 @@ +"""Loughran-McDonald finance-domain sentiment scoring. + +The Loughran-McDonald (2011, Journal of Finance) sentiment dictionary +is the academic gold standard for finance-domain sentiment. General- +domain dictionaries (e.g. Harvard IV) systematically misclassify +finance vocabulary — "liability", "depreciation", "tax", "vice +president" all read as negative in general-domain but are neutral +finance terms. LM was built specifically on 10-K filings to correct +this. + +Dictionary categories (we use the first 3 for composite scoring): + + positive — clearly positive sentiment in finance context + negative — clearly negative sentiment in finance context + uncertainty — hedging language ("approximately", "perhaps") + litigious — legal-action vocabulary + modal_strong — assertive language ("must", "will") + modal_weak — hedged language ("may", "could") + constraining — restriction vocabulary ("required to", "limited") + +License: Loughran-McDonald Master Dictionary is freely available for +academic and research use from Bill McDonald's site at Notre Dame +(https://sraf.nd.edu/loughranmcdonald-master-dictionary/). Bundle the +canonical CSV under ``collectors/nlp/data/lm_master_dict.csv`` via +the operator-run ``scripts/download_lm_dict.py`` script. The constructor +accepts the loaded dict so tests use a synthetic 5-word fixture. + +Composite formula (standard LM-paper convention): + + composite = (positive_count - negative_count) / max(total_tokens, 1) + +clipped to [-1, +1]. Some practitioners normalize by (positive + +negative) for a "polarity-among-sentiment-words" reading; we use the +denominator-over-total convention because it preserves "neutral +articles get small magnitude" semantics that compose better with +trust-weighted aggregation. +""" + +from __future__ import annotations + +import csv +import logging +import re +from pathlib import Path + +from collectors.nlp.protocols import SentimentScore + +logger = logging.getLogger(__name__) + + +# ── Lexicon I/O ──────────────────────────────────────────────────────── + + +_DEFAULT_LM_PATH = ( + Path(__file__).parent / "data" / "lm_master_dict.csv" +) + + +def load_lm_master_dict( + path: Path | None = None, +) -> dict[str, dict[str, bool]]: + """Load the Loughran-McDonald Master Dictionary CSV into a per-word + category-flags dict. + + Returns a mapping like:: + + { + "good": {"positive": True, "negative": False, ...}, + "bad": {"positive": False, "negative": True, ...}, + "approximately": {"uncertainty": True, ...}, + ... + } + + The canonical CSV has the columns: Word, Sequence Number, Word + Count, Word Proportion, Average Proportion, Std Dev, Doc Count, + Negative, Positive, Uncertainty, Litigious, Strong_Modal, + Weak_Modal, Constraining, Syllables, Source. We only care about + Word + the category flag columns; the cell value is non-zero + (typically a year-stamp) when the word is in that category. + + Returns an empty dict if the file doesn't exist — caller should + fall back to a stub or fail loud per their own policy. Production + deploy must run ``scripts/download_lm_dict.py`` once. + """ + path = path or _DEFAULT_LM_PATH + if not path.exists(): + logger.warning( + "[loughran_mcdonald] master dict not found at %s — pipeline " + "will return all-zero sentiment. Run scripts/download_lm_dict.py.", + path, + ) + return {} + + out: dict[str, dict[str, bool]] = {} + with path.open("r", encoding="utf-8") as fh: + reader = csv.DictReader(fh) + for row in reader: + word = (row.get("Word") or "").strip().lower() + if not word: + continue + out[word] = { + "positive": _truthy(row.get("Positive")), + "negative": _truthy(row.get("Negative")), + "uncertainty": _truthy(row.get("Uncertainty")), + "litigious": _truthy(row.get("Litigious")), + "strong_modal": _truthy(row.get("Strong_Modal")), + "weak_modal": _truthy(row.get("Weak_Modal")), + "constraining": _truthy(row.get("Constraining")), + } + return out + + +def _truthy(cell: str | None) -> bool: + """LM CSV uses non-zero year-stamps as the truthy flag and 0 as + falsy. Tolerate empty strings.""" + if cell is None: + return False + cell = cell.strip() + if not cell: + return False + try: + return int(float(cell)) != 0 + except ValueError: + return False + + +# ── Tokenization ────────────────────────────────────────────────────── + + +_WORD_RE = re.compile(r"[A-Za-z]+") + + +def _tokenize(text: str) -> list[str]: + """Lowercase tokenization on alphabetic-only spans. + + LM's lexicon is alphabetic; numbers and punctuation contribute no + sentiment. Inline numerals + dates + currency symbols are + intentionally dropped at tokenization time.""" + return [m.group(0).lower() for m in _WORD_RE.finditer(text)] + + +# ── Scorer ───────────────────────────────────────────────────────────── + + +class LoughranMcDonaldScorer: + """LM-dict-based sentiment scorer. Implements ``SentimentScorer``.""" + + name = "loughran_mcdonald" + + def __init__( + self, + lm_dict: dict[str, dict[str, bool]] | None = None, + *, + composite_clip: float = 1.0, + ) -> None: + self._lm = lm_dict if lm_dict is not None else load_lm_master_dict() + self._clip = composite_clip + if not self._lm: + logger.warning( + "[loughran_mcdonald] initialized with empty dict — all " + "scores will be 0. Provide a dict via constructor or " + "place the canonical CSV at " + "collectors/nlp/data/lm_master_dict.csv." + ) + + def score(self, *, text: str, article_fingerprint: str) -> SentimentScore: + tokens = _tokenize(text) + total = len(tokens) + pos = neg = unc = 0 + for tok in tokens: + cats = self._lm.get(tok) + if cats is None: + continue + if cats.get("positive"): + pos += 1 + if cats.get("negative"): + neg += 1 + if cats.get("uncertainty"): + unc += 1 + + if total == 0: + composite = 0.0 + else: + raw = (pos - neg) / total + composite = max(-self._clip, min(self._clip, raw)) + + return SentimentScore( + scorer=self.name, + article_fingerprint=article_fingerprint, + composite=composite, + positive_word_count=pos, + negative_word_count=neg, + uncertainty_word_count=unc, + total_token_count=total, + ) diff --git a/collectors/nlp/pipeline.py b/collectors/nlp/pipeline.py new file mode 100644 index 0000000..a6a13db --- /dev/null +++ b/collectors/nlp/pipeline.py @@ -0,0 +1,161 @@ +"""News NLP pipeline orchestrator — composes scorers + extractors. + +Reads a list of ``AggregatedNewsArticle`` records from the aggregator +and produces three parallel output streams: + + sentiment_scores: one per (article, scorer) + entity_mentions: list (flat) across all articles + extractors + event_flags: list (flat) across all articles + extractors + +The orchestrator runs scorers/extractors independently per article; +each article's failure (transient LLM error, malformed text) drops +that article's contribution from the affected stream but doesn't fail +the batch — matches the producer-side "graceful degrade" policy. + +Output is structured-ready for the PR A.2 parquet writer (one row per +record per stream) and the PR A.3 RAG ingest pass. + +Cost telemetry: LLM-based components (event_extraction) emit cost +records under their own agent_id; the pipeline itself doesn't add +overhead. Run-level aggregate cost is summed from the cost telemetry +stream downstream. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Sequence + +from collectors.news_aggregator import AggregatedNewsArticle +from collectors.nlp.protocols import ( + EntityExtractor, + EntityMention, + EventExtractor, + EventFlag, + SentimentScore, + SentimentScorer, +) + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class NewsNLPOutput: + """Aggregate output across all articles in one pipeline run. + + Each stream is a flat list — downstream parquet writer pivots to + long-form rows; RAG-ingest path joins back to articles via the + ``article_fingerprint`` foreign key. + """ + + sentiment_scores: list[SentimentScore] = field(default_factory=list) + entity_mentions: list[EntityMention] = field(default_factory=list) + event_flags: list[EventFlag] = field(default_factory=list) + n_articles_processed: int = 0 + n_articles_failed: int = 0 + + +class NewsNLPPipeline: + """Compose sentiment scorers + entity extractors + event extractors. + + Multiple scorers may be wired (e.g. LoughranMcDonald + FinBERT for + ensemble) — each emits a per-article SentimentScore tagged with + its ``name``. Downstream aggregation chooses how to combine them + (or keeps them separate per scorer). + + Empty extractor lists are valid — the pipeline only runs the + components it's given. + """ + + def __init__( + self, + *, + sentiment_scorers: Sequence[SentimentScorer] = (), + entity_extractors: Sequence[EntityExtractor] = (), + event_extractors: Sequence[EventExtractor] = (), + ) -> None: + self._sentiment_scorers = tuple(sentiment_scorers) + self._entity_extractors = tuple(entity_extractors) + self._event_extractors = tuple(event_extractors) + + def process( + self, articles: Sequence[AggregatedNewsArticle], + ) -> NewsNLPOutput: + sentiment: list[SentimentScore] = [] + entities: list[EntityMention] = [] + events: list[EventFlag] = [] + n_processed = 0 + n_failed = 0 + + for article in articles: + text = _article_text(article) + if not text.strip(): + n_failed += 1 + continue + fp = article.canonical_fingerprint + + for scorer in self._sentiment_scorers: + try: + sentiment.append(scorer.score( + text=text, article_fingerprint=fp, + )) + except Exception as e: + logger.warning( + "[nlp_pipeline] %s sentiment failed on %s: %s", + scorer.name, fp, e, + ) + + for extractor in self._entity_extractors: + try: + entities.extend(extractor.extract( + text=text, article_fingerprint=fp, + )) + except Exception as e: + logger.warning( + "[nlp_pipeline] %s entity-extract failed on %s: %s", + extractor.name, fp, e, + ) + + for extractor in self._event_extractors: + try: + events.extend(extractor.extract( + text=text, article_fingerprint=fp, + article_tickers=article.tickers, + )) + except Exception as e: + logger.warning( + "[nlp_pipeline] %s event-extract failed on %s: %s", + extractor.name, fp, e, + ) + + n_processed += 1 + + logger.info( + "[nlp_pipeline] processed=%d failed=%d " + "sentiment_scores=%d entity_mentions=%d event_flags=%d", + n_processed, n_failed, + len(sentiment), len(entities), len(events), + ) + + return NewsNLPOutput( + sentiment_scores=sentiment, + entity_mentions=entities, + event_flags=events, + n_articles_processed=n_processed, + n_articles_failed=n_failed, + ) + + +def _article_text(article: AggregatedNewsArticle) -> str: + """Concatenate canonical title + the longest body_excerpt across + variants. Multiple vendors syndicating the same wire story can + contribute different excerpt lengths; we use the longest available + so the scorers/extractors have maximum text to work with.""" + longest_excerpt = "" + for v in article.variants: + excerpt = v.body_excerpt or "" + if len(excerpt) > len(longest_excerpt): + longest_excerpt = excerpt + pieces = [article.canonical_title or "", longest_excerpt] + return "\n\n".join(p for p in pieces if p) diff --git a/collectors/nlp/protocols.py b/collectors/nlp/protocols.py new file mode 100644 index 0000000..d20184a --- /dev/null +++ b/collectors/nlp/protocols.py @@ -0,0 +1,173 @@ +"""NLP pipeline Protocols + Pydantic output shapes. + +Component pattern: each NLP analysis dimension (sentiment, entities, +events) is a Protocol with one or more concrete implementations. The +pipeline orchestrator (``pipeline.NewsNLPPipeline``) composes them +without knowing which concrete classes are wired — upgrade paths +(FinBERT for sentiment, transformer NER for entities, larger LLMs for +events) drop in as new classes without touching the orchestrator or +the consumer side. + +Shapes are Pydantic with ``frozen=True`` + ``extra='forbid'`` so a +downstream Pandas/Parquet writer (PR A.2) can rely on a fixed column +schema. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Protocol, runtime_checkable + +from pydantic import BaseModel, ConfigDict, Field + + +# ── Sentiment ────────────────────────────────────────────────────────── + + +class SentimentScore(BaseModel): + """One sentiment-scorer's output for one article. + + Loughran-McDonald output populates ``positive``/``negative`` raw + counts + the normalized ``composite`` in [-1, +1]. Transformer- + based scorers (FinBERT) leave the raw counts None and fill + ``composite`` from the model's softmax. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + scorer: str = Field( + description="Scorer slug: 'loughran_mcdonald' | 'finbert' | " + "'vader'. Joins onto per-scorer aggregation rules " + "downstream — different scorers carry different " + "calibration." + ) + article_fingerprint: str = Field( + description="The aggregated article's canonical fingerprint " + "(from NewsAggregator) — joins back to the article " + "set without re-fingerprinting." + ) + composite: float = Field( + description="Normalized score in [-1, +1]. -1 = maximally " + "negative; +1 = maximally positive. 0 = neutral / " + "no signal." + ) + positive_word_count: int | None = Field( + default=None, + description="Loughran-McDonald-style raw positive-word count. " + "None for transformer scorers that don't expose it.", + ) + negative_word_count: int | None = Field( + default=None, + description="LM-style raw negative-word count.", + ) + uncertainty_word_count: int | None = Field( + default=None, + description="LM 'uncertainty' category — distinct from sentiment " + "polarity; useful for risk gating.", + ) + total_token_count: int | None = Field( + default=None, + description="Total tokens analyzed (denominator for word-count " + "ratios).", + ) + + +@runtime_checkable +class SentimentScorer(Protocol): + """Sentiment-scorer Protocol. + + Implementations: lexicon (Loughran-McDonald, VADER), transformer + (FinBERT, FinBERT-tone), LLM (Anthropic Haiku via structured- + output). Pipeline can compose multiple scorers for ensemble. + """ + + name: str + + def score(self, *, text: str, article_fingerprint: str) -> SentimentScore: ... + + +# ── Entities ─────────────────────────────────────────────────────────── + + +class EntityMention(BaseModel): + """One entity surfaced from an article.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + extractor: str = Field( + description="Extractor slug: 'regex_ticker' | 'spacy_en' | " + "'spacy_trf' | 'llm_haiku'." + ) + article_fingerprint: str + text: str = Field(description="Surface form as it appeared in the article.") + label: str = Field( + description="Entity type: 'ORG' | 'PERSON' | 'GPE' | 'MONEY' | " + "'PERCENT' | 'TICKER' | 'PRODUCT' | etc." + ) + canonical_ticker: str | None = Field( + default=None, + description="When the entity resolves to a tradeable ticker via " + "the ticker→name map, the canonical exchange symbol. " + "Otherwise None.", + ) + + +@runtime_checkable +class EntityExtractor(Protocol): + name: str + + def extract( + self, *, text: str, article_fingerprint: str, + ) -> list[EntityMention]: ... + + +# ── Events ───────────────────────────────────────────────────────────── + + +class EventFlag(BaseModel): + """One structured event flagged from the article.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + extractor: str = Field( + description="Extractor slug: 'anthropic_haiku' | " + "'gdelt_native' (when we use vendor-native event " + "codes) | 'rule_based'." + ) + article_fingerprint: str + category: str = Field( + description="Event category from a closed taxonomy (see " + "event_extraction.DEFAULT_EVENT_CATEGORIES). " + "Categorical for downstream aggregation by type." + ) + description: str = Field( + description="Free-text 1-sentence description of the specific " + "event ('SEC investigation announced into accounting " + "practices'). Indexable into the RAG corpus for " + "downstream agent retrieval.", + ) + tickers: tuple[str, ...] = Field( + default_factory=tuple, + description="Tickers this event directly concerns. May be a " + "subset of the article's tickers — e.g. an M&A " + "article between A and B may flag only A as the " + "acquirer." + ) + severity: float = Field( + default=0.5, + description="Subjective importance in [0, 1]. Used by downstream " + "alerting + thesis_update trigger weights.", + ) + extracted_at: datetime = Field( + description="UTC timestamp when the extractor produced this flag." + ) + + +@runtime_checkable +class EventExtractor(Protocol): + name: str + + def extract( + self, *, text: str, article_fingerprint: str, + article_tickers: tuple[str, ...], + ) -> list[EventFlag]: ... diff --git a/scripts/download_lm_dict.py b/scripts/download_lm_dict.py new file mode 100644 index 0000000..4ef9d0b --- /dev/null +++ b/scripts/download_lm_dict.py @@ -0,0 +1,87 @@ +"""Download the Loughran-McDonald Master Dictionary CSV. + +Run once at install time. The canonical source is Bill McDonald's +research page at Notre Dame: +https://sraf.nd.edu/loughranmcdonald-master-dictionary/ + +The CSV is freely available for academic and research use. + +Usage:: + + python scripts/download_lm_dict.py + # writes to collectors/nlp/data/lm_master_dict.csv + +The URL has changed between LM dict revisions historically; the +operator may need to update ``LM_DICT_URL`` if Notre Dame relocates +the file. Validate the download by sampling: the CSV should have +column "Word" and rows like "good" with Positive flag = a year +(non-zero) and Negative flag = 0. + +Idempotent: the script writes to a fixed path; re-running overwrites +with the latest version. Pin via vcs commit if reproducibility is a +concern. +""" + +from __future__ import annotations + +import argparse +import sys +import urllib.request +from pathlib import Path + + +# Update this URL when Bill McDonald rotates the dict version. As of +# 2026-05-13 the latest stable release is the 2022 Master Dictionary. +# The redirect-friendly URL pattern is preferred over the direct +# Dropbox link which has changed multiple times. +LM_DICT_URL = ( + "https://drive.google.com/uc?export=download&id=" + "17CmUZM9hGUdGUkXKZpaSPMUNVxlgnVfP" + # ^ 2022 Master Dictionary; if this 404s, fetch the latest URL from + # https://sraf.nd.edu/loughranmcdonald-master-dictionary/ +) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--out", type=Path, + default=Path(__file__).resolve().parent.parent + / "collectors" / "nlp" / "data" / "lm_master_dict.csv", + help="Destination CSV path (default: bundled location).", + ) + parser.add_argument( + "--url", type=str, default=LM_DICT_URL, + help="Override the source URL if Notre Dame relocates the dict.", + ) + args = parser.parse_args() + + args.out.parent.mkdir(parents=True, exist_ok=True) + + print(f"Fetching from {args.url}") + print(f"Writing to {args.out}") + try: + urllib.request.urlretrieve(args.url, args.out) + except Exception as e: + print(f"ERROR: download failed: {e}", file=sys.stderr) + print( + "If the URL has rotated, fetch the latest from " + "https://sraf.nd.edu/loughranmcdonald-master-dictionary/ " + "and pass --url.", + file=sys.stderr, + ) + return 1 + + size = args.out.stat().st_size + print(f"Wrote {size:,} bytes to {args.out}") + if size < 1_000_000: + print( + "WARNING: file is suspiciously small — the canonical LM " + "Master Dictionary is ~10MB. Verify the source URL.", + file=sys.stderr, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_nlp_pipeline.py b/tests/test_nlp_pipeline.py new file mode 100644 index 0000000..c34342b --- /dev/null +++ b/tests/test_nlp_pipeline.py @@ -0,0 +1,553 @@ +"""Tests for the news NLP pipeline (Wave 1 PR A.1). + +Covers: + - Pydantic shapes (frozen + extra='forbid') + - Protocol structural-subtyping for SentimentScorer / EntityExtractor / EventExtractor + - Loughran-McDonald scorer (tokenization + composite formula + edge cases) + - load_lm_master_dict CSV parser + - Anthropic event extractor (tool_use parsing + transient failure + malformed entries) + - NewsNLPPipeline orchestrator (composition + graceful degrade) +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from pydantic import ValidationError + +from alpha_engine_lib.sources import NewsArticle + +from collectors.news_aggregator import AggregatedNewsArticle +from collectors.nlp.event_extraction import ( + DEFAULT_EVENT_CATEGORIES, + AnthropicEventExtractor, + _build_tool_spec, +) +from collectors.nlp.loughran_mcdonald import ( + LoughranMcDonaldScorer, + _tokenize, + _truthy, + load_lm_master_dict, +) +from collectors.nlp.pipeline import NewsNLPPipeline, NewsNLPOutput +from collectors.nlp.protocols import ( + EntityExtractor, + EntityMention, + EventExtractor, + EventFlag, + SentimentScore, + SentimentScorer, +) + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +# ── Pydantic shapes ──────────────────────────────────────────────────── + + +class TestSentimentScoreShape: + def test_construction(self): + s = SentimentScore( + scorer="loughran_mcdonald", + article_fingerprint="abc123", + composite=0.25, + positive_word_count=4, + negative_word_count=2, + total_token_count=80, + ) + assert s.composite == 0.25 + assert s.uncertainty_word_count is None + + def test_frozen(self): + s = SentimentScore( + scorer="x", article_fingerprint="abc", composite=0.0, + ) + with pytest.raises(ValidationError): + s.composite = 0.5 # type: ignore[misc] + + def test_extra_forbidden(self): + with pytest.raises(ValidationError, match="Extra inputs are not"): + SentimentScore( + scorer="x", article_fingerprint="abc", composite=0.0, + some_unknown_field="oops", # type: ignore[call-arg] + ) + + +class TestEntityMentionShape: + def test_construction(self): + e = EntityMention( + extractor="regex_ticker", + article_fingerprint="abc", + text="NVDA", + label="TICKER", + canonical_ticker="NVDA", + ) + assert e.canonical_ticker == "NVDA" + + +class TestEventFlagShape: + def test_construction(self): + f = EventFlag( + extractor="anthropic_haiku", + article_fingerprint="abc", + category="merger_or_acquisition", + description="Acquirer announces all-stock deal.", + tickers=("AAPL",), + severity=0.9, + extracted_at=_now(), + ) + assert f.severity == 0.9 + assert f.tickers == ("AAPL",) + + def test_severity_default(self): + f = EventFlag( + extractor="x", article_fingerprint="a", + category="other", description="d", + extracted_at=_now(), + ) + assert f.severity == 0.5 + + +# ── Protocol structural subtyping ────────────────────────────────────── + + +class TestProtocolSubtyping: + def test_lm_scorer_satisfies_sentiment_protocol(self): + assert isinstance(LoughranMcDonaldScorer(lm_dict={}), SentimentScorer) + + def test_anthropic_extractor_satisfies_event_protocol(self): + extractor = AnthropicEventExtractor(client=MagicMock()) + assert isinstance(extractor, EventExtractor) + + def test_entity_protocol_structural_match(self): + class FakeExtractor: + name = "fake" + + def extract(self, *, text, article_fingerprint): + return [] + + assert isinstance(FakeExtractor(), EntityExtractor) + + +# ── LM tokenization + composite ──────────────────────────────────────── + + +class TestTokenization: + def test_alphabetic_only_lowercased(self): + assert _tokenize("The Stock Rose 5% in Q4!") == [ + "the", "stock", "rose", "in", "q", + ] + + def test_empty_text(self): + assert _tokenize("") == [] + + def test_dropped_punctuation_and_digits(self): + # Numbers + currency drop out + toks = _tokenize("revenue $1.2B beat estimates") + assert toks == ["revenue", "b", "beat", "estimates"] + + +class TestTruthyHelper: + def test_year_stamp_truthy(self): + assert _truthy("2009") is True + + def test_zero_falsy(self): + assert _truthy("0") is False + + def test_empty_string_falsy(self): + assert _truthy("") is False + + def test_none_falsy(self): + assert _truthy(None) is False + + def test_non_numeric_falsy(self): + assert _truthy("abc") is False + + +class TestLoughranMcDonaldScorer: + SYNTHETIC_DICT = { + "good": {"positive": True, "negative": False, "uncertainty": False}, + "great": {"positive": True, "negative": False, "uncertainty": False}, + "bad": {"positive": False, "negative": True, "uncertainty": False}, + "loss": {"positive": False, "negative": True, "uncertainty": False}, + "approximately": {"positive": False, "negative": False, "uncertainty": True}, + } + + def test_pure_positive(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="good good great", article_fingerprint="fp1", + ) + assert s.positive_word_count == 3 + assert s.negative_word_count == 0 + assert s.composite == pytest.approx(1.0) + + def test_pure_negative(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="bad bad loss", article_fingerprint="fp1", + ) + assert s.negative_word_count == 3 + assert s.composite == pytest.approx(-1.0) + + def test_balanced_yields_zero(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="good bad", article_fingerprint="fp1", + ) + assert s.composite == 0.0 + + def test_dilution_by_neutral_words(self): + # 1 positive in 5 tokens — composite = 1/5 = 0.2 + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="the quarterly report was good overall", + article_fingerprint="fp1", + ) + assert s.positive_word_count == 1 + assert s.total_token_count == 6 + assert s.composite == pytest.approx(1 / 6) + + def test_uncertainty_counted_separately_from_polarity(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="results approximately matched estimates", + article_fingerprint="fp1", + ) + assert s.uncertainty_word_count == 1 + assert s.positive_word_count == 0 + assert s.negative_word_count == 0 + assert s.composite == 0.0 + + def test_empty_text_returns_zero(self): + s = LoughranMcDonaldScorer(lm_dict=self.SYNTHETIC_DICT).score( + text="", article_fingerprint="fp1", + ) + assert s.composite == 0.0 + assert s.total_token_count == 0 + + def test_clipped_to_range(self): + # With clip=0.5, all-positive text caps at 0.5 + s = LoughranMcDonaldScorer( + lm_dict=self.SYNTHETIC_DICT, composite_clip=0.5, + ).score(text="good great", article_fingerprint="fp1") + assert s.composite == 0.5 + + def test_empty_dict_warns_and_yields_zero(self, caplog): + with caplog.at_level("WARNING"): + scorer = LoughranMcDonaldScorer(lm_dict={}) + assert any("empty dict" in r.message for r in caplog.records) + s = scorer.score(text="good great", article_fingerprint="fp1") + assert s.composite == 0.0 + + +# ── load_lm_master_dict CSV parsing ──────────────────────────────────── + + +class TestLoadLmMasterDict: + def test_parses_canonical_csv_format(self, tmp_path: Path): + csv = tmp_path / "lm.csv" + csv.write_text( + "Word,Sequence Number,Negative,Positive,Uncertainty," + "Litigious,Strong_Modal,Weak_Modal,Constraining," + "Syllables,Source\n" + "good,1,0,2009,0,0,0,0,0,1,LM\n" + "bad,2,2009,0,0,0,0,0,0,1,LM\n" + "approximately,3,0,0,2009,0,0,0,0,4,LM\n" + ) + out = load_lm_master_dict(csv) + assert out["good"]["positive"] is True + assert out["good"]["negative"] is False + assert out["bad"]["negative"] is True + assert out["approximately"]["uncertainty"] is True + + def test_missing_file_returns_empty_dict_with_warning(self, tmp_path, caplog): + with caplog.at_level("WARNING"): + out = load_lm_master_dict(tmp_path / "missing.csv") + assert out == {} + assert any("not found" in r.message for r in caplog.records) + + def test_blank_rows_skipped(self, tmp_path: Path): + csv = tmp_path / "lm.csv" + csv.write_text( + "Word,Negative,Positive\n" + ",0,0\n" # blank word + "good,0,2009\n" + ) + out = load_lm_master_dict(csv) + assert "good" in out + assert "" not in out + + +# ── Anthropic event extractor ────────────────────────────────────────── + + +def _make_tool_use_response(events: list[dict]) -> object: + """Build a mock Anthropic response with a tool_use content block.""" + block = MagicMock() + block.type = "tool_use" + block.name = "EmitEventFlags" + block.input = {"events": events} + response = MagicMock() + response.content = [block] + return response + + +class TestAnthropicEventExtractor: + def test_tool_spec_built_with_default_categories(self): + spec = _build_tool_spec() + cats = spec["input_schema"]["properties"]["events"]["items"][ + "properties" + ]["category"]["enum"] + # All categories present in the enum + for cat in DEFAULT_EVENT_CATEGORIES: + assert cat in cats + + def test_happy_path_parses_events(self): + client = MagicMock() + client.messages.create.return_value = _make_tool_use_response([ + { + "category": "merger_or_acquisition", + "description": "Acquirer announces all-stock deal for X.", + "tickers": ["AAPL"], + "severity": 0.9, + }, + ]) + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="Apple announces acquisition...", + article_fingerprint="fp1", + article_tickers=("AAPL",), + ) + assert len(out) == 1 + assert out[0].category == "merger_or_acquisition" + assert out[0].severity == 0.9 + + def test_empty_text_short_circuits_without_calling_llm(self): + client = MagicMock() + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="", article_fingerprint="fp1", article_tickers=(), + ) + assert out == [] + client.messages.create.assert_not_called() + + def test_transient_llm_failure_returns_empty(self): + client = MagicMock() + client.messages.create.side_effect = RuntimeError("anthropic 500") + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="some text", article_fingerprint="fp1", article_tickers=(), + ) + assert out == [] + + def test_malformed_event_entry_dropped_others_kept(self): + client = MagicMock() + client.messages.create.return_value = _make_tool_use_response([ + {"category": "earnings_release"}, # missing required description + { + "category": "earnings_release", + "description": "Q4 results released.", + "tickers": ["AAPL"], + "severity": 0.6, + }, + ]) + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="x", article_fingerprint="fp1", article_tickers=("AAPL",), + ) + # Malformed entry dropped; good entry kept + assert len(out) == 1 + assert out[0].description == "Q4 results released." + + def test_tool_use_input_as_json_string(self): + """Anthropic SDK can return tool_use.input as either a dict or + a JSON string depending on stream-vs-message mode. Tolerate + both.""" + client = MagicMock() + block = MagicMock() + block.type = "tool_use" + block.name = "EmitEventFlags" + block.input = json.dumps({"events": [{ + "category": "other", "description": "x", + "tickers": [], "severity": 0.1, + }]}) + response = MagicMock() + response.content = [block] + client.messages.create.return_value = response + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="x", article_fingerprint="fp1", article_tickers=(), + ) + assert len(out) == 1 + assert out[0].category == "other" + + def test_no_tool_use_block_returns_empty(self): + client = MagicMock() + response = MagicMock() + text_block = MagicMock() + text_block.type = "text" + response.content = [text_block] + client.messages.create.return_value = response + extractor = AnthropicEventExtractor(client=client) + out = extractor.extract( + text="x", article_fingerprint="fp1", article_tickers=(), + ) + assert out == [] + + +# ── Pipeline orchestrator ────────────────────────────────────────────── + + +def _make_aggregated( + fingerprint: str = "fp1", + title: str = "Apple Q4 results", + body: str = "Apple reported strong results.", + tickers: tuple[str, ...] = ("AAPL",), +) -> AggregatedNewsArticle: + variant = NewsArticle( + tickers=tickers, title=title, body_excerpt=body, + url="https://x.com/a", published_at=_now(), + source="polygon", fetched_at=_now(), + ) + return AggregatedNewsArticle( + canonical_title=title, + canonical_url="https://x.com/a", + tickers=tickers, + earliest_published_at=_now(), + variants=(variant,), + canonical_fingerprint=fingerprint, + ) + + +class TestPipelineOrchestrator: + def _make_lm_scorer(self): + return LoughranMcDonaldScorer(lm_dict={ + "strong": {"positive": True, "negative": False, "uncertainty": False}, + "results": {"positive": False, "negative": False, "uncertainty": False}, + "weak": {"positive": False, "negative": True, "uncertainty": False}, + }) + + def test_empty_pipeline_returns_empty_output(self): + pipeline = NewsNLPPipeline() + out = pipeline.process([_make_aggregated()]) + assert isinstance(out, NewsNLPOutput) + assert out.sentiment_scores == [] + assert out.entity_mentions == [] + assert out.event_flags == [] + assert out.n_articles_processed == 1 + + def test_sentiment_scorer_runs_per_article(self): + pipeline = NewsNLPPipeline( + sentiment_scorers=[self._make_lm_scorer()], + ) + out = pipeline.process([ + _make_aggregated( + fingerprint="a", title="strong results", + body="firm reported strong execution", + ), + _make_aggregated( + fingerprint="b", title="weak results", + body="firm reported weak execution", + ), + ]) + assert len(out.sentiment_scores) == 2 + by_fp = {s.article_fingerprint: s for s in out.sentiment_scores} + assert by_fp["a"].composite > 0 + assert by_fp["b"].composite < 0 + + def test_multiple_scorers_emit_per_scorer_per_article(self): + scorer1 = self._make_lm_scorer() + scorer2 = MagicMock(spec=["name", "score"]) + scorer2.name = "fake_other" + scorer2.score.return_value = SentimentScore( + scorer="fake_other", article_fingerprint="dummy", + composite=0.42, + ) + pipeline = NewsNLPPipeline(sentiment_scorers=[scorer1, scorer2]) + out = pipeline.process([_make_aggregated(fingerprint="a")]) + # 1 article × 2 scorers = 2 scores + assert len(out.sentiment_scores) == 2 + names = {s.scorer for s in out.sentiment_scores} + assert names == {"loughran_mcdonald", "fake_other"} + + def test_scorer_exception_skips_that_score_but_continues(self): + bad_scorer = MagicMock(spec=["name", "score"]) + bad_scorer.name = "broken" + bad_scorer.score.side_effect = RuntimeError("boom") + pipeline = NewsNLPPipeline( + sentiment_scorers=[bad_scorer, self._make_lm_scorer()], + ) + out = pipeline.process([_make_aggregated(fingerprint="a", title="strong")]) + # Only the LM score makes it through + assert len(out.sentiment_scores) == 1 + assert out.sentiment_scores[0].scorer == "loughran_mcdonald" + + def test_event_extractor_receives_article_tickers(self): + extractor = MagicMock(spec=["name", "extract"]) + extractor.name = "fake_event" + extractor.extract.return_value = [EventFlag( + extractor="fake_event", article_fingerprint="a", + category="earnings_release", description="d", + tickers=("AAPL",), severity=0.5, extracted_at=_now(), + )] + pipeline = NewsNLPPipeline(event_extractors=[extractor]) + out = pipeline.process([ + _make_aggregated(fingerprint="a", tickers=("AAPL", "NVDA")), + ]) + # Pipeline passes article tickers through verbatim + kwargs = extractor.extract.call_args.kwargs + assert kwargs["article_tickers"] == ("AAPL", "NVDA") + assert kwargs["article_fingerprint"] == "a" + assert len(out.event_flags) == 1 + + def test_empty_article_text_skipped_as_failed(self): + # Construct an aggregated article with no title or body + variant = NewsArticle( + tickers=("AAPL",), title="", body_excerpt="", + url="https://x.com/a", published_at=_now(), + source="polygon", fetched_at=_now(), + ) + agg = AggregatedNewsArticle( + canonical_title="", + canonical_url="https://x.com/a", + tickers=("AAPL",), + earliest_published_at=_now(), + variants=(variant,), + canonical_fingerprint="empty", + ) + pipeline = NewsNLPPipeline( + sentiment_scorers=[self._make_lm_scorer()], + ) + out = pipeline.process([agg]) + assert out.n_articles_failed == 1 + assert out.n_articles_processed == 0 + assert out.sentiment_scores == [] + + def test_pipeline_uses_longest_excerpt_across_variants(self): + short = NewsArticle( + tickers=("AAPL",), title="t", body_excerpt="brief", + url="https://x.com/a", published_at=_now(), + source="polygon", fetched_at=_now(), + ) + long = NewsArticle( + tickers=("AAPL",), title="t", + body_excerpt="a much longer body containing strong words", + url="https://x.com/a", published_at=_now(), + source="gdelt", fetched_at=_now(), + ) + agg = AggregatedNewsArticle( + canonical_title="t", + canonical_url="https://x.com/a", + tickers=("AAPL",), + earliest_published_at=_now(), + variants=(short, long), + canonical_fingerprint="fp", + ) + pipeline = NewsNLPPipeline( + sentiment_scorers=[self._make_lm_scorer()], + ) + out = pipeline.process([agg]) + # The longer body has "strong" — should produce positive composite + assert out.sentiment_scores[0].composite > 0