# News Agent Debug Notebook

This notebook mirrors the Python modules so you can tinker with the agent, adjust providers, and try out queries interactively.

In [None]:
from __future__ import annotations

import os
import re
from collections import Counter
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, Mapping, Optional
from urllib.parse import urlparse

import feedparser
import requests

In [None]:
def _split_csv(value: Optional[str]) -> List[str]:
    if not value:
        return []
    return [item.strip() for item in value.split(",") if item.strip()]


@dataclass(slots=True)
class AgentConfig:
    """Runtime configuration for the news agent."""

    newsapi_key: Optional[str] = None
    default_limit: int = 10
    allowed_domains: List[str] = field(default_factory=list)

    @classmethod
    def from_env(cls) -> "AgentConfig":
        return cls(
            newsapi_key=os.getenv("NEWSAPI_KEY"),
            default_limit=int(os.getenv("NEWS_AGENT_DEFAULT_LIMIT", "10")),
            allowed_domains=_split_csv(os.getenv("NEWS_AGENT_ALLOWED_DOMAINS")),
        )

In [None]:
@dataclass(slots=True)
class RawArticle:
    """Raw article data collected from a provider."""

    title: str
    url: str
    source: str
    published_at: Optional[datetime]
    content: Optional[str]
    description: Optional[str]


@dataclass(slots=True)
class NewsItem:
    """Structured representation of a processed article."""

    title: str
    url: str
    source: str
    published_at: Optional[datetime]
    summary: Optional[str]
    sentiment: str
    sentiment_score: float
    excerpt: Optional[str] = None

In [None]:
WORD_RE = re.compile(r"[A-Za-z']+")


def summarize(text: Optional[str], max_sentences: int = 2) -> Optional[str]:
    if not text:
        return None
    sentences = _split_sentences(text)
    if not sentences:
        return None
    if len(sentences) <= max_sentences:
        return " ".join(sentences)
    scores = _score_sentences(sentences)
    ranked = sorted(enumerate(sentences), key=lambda item: scores.get(item[0], 0.0), reverse=True)
    top_indices = sorted(idx for idx, _ in ranked[:max_sentences])
    return " ".join(sentences[idx] for idx in top_indices)


def _split_sentences(text: str) -> List[str]:
    split = re.split(r"(?<=[.!?])\s+", text.strip())
    return [sentence.strip() for sentence in split if sentence.strip()]


def _score_sentences(sentences: Iterable[str]) -> dict[int, float]:
    words = [word.lower() for sentence in sentences for word in WORD_RE.findall(sentence)]
    if not words:
        return {}
    freq = Counter(words)
    max_freq = max(freq.values())
    normalized = {word: count / max_freq for word, count in freq.items()}
    scores: dict[int, float] = {}
    for idx, sentence in enumerate(sentences):
        tokens = WORD_RE.findall(sentence)
        if not tokens:
            continue
        scores[idx] = sum(normalized.get(word.lower(), 0.0) for word in tokens) / len(tokens)
    return scores

In [None]:
POSITIVE_TOKENS = {
    "growth",
    "improve",
    "improving",
    "surge",
    "strong",
    "beat",
    "record",
    "gain",
    "positive",
    "optimistic",
    "upbeat",
    "increase",
    "exceed",
    "sustainable",
    "sustainability",
    "expansion",
}

NEGATIVE_TOKENS = {
    "loss",
    "decline",
    "drop",
    "warning",
    "weak",
    "downturn",
    "concern",
    "miss",
    "lawsuit",
    "negative",
    "risk",
    "regulatory",
    "penalty",
    "fraud",
    "downgrade",
}


def score_sentiment(text: Optional[str]) -> tuple[str, float]:
    if not text:
        return "neutral", 0.0
    lowered = text.lower()
    pos_hits = sum(lowered.count(token) for token in POSITIVE_TOKENS)
    neg_hits = sum(lowered.count(token) for token in NEGATIVE_TOKENS)
    total = pos_hits + neg_hits
    if total == 0:
        return "neutral", 0.0
    score = (pos_hits - neg_hits) / max(total, 1)
    if score > 0.2:
        return "positive", score
    if score < -0.2:
        return "negative", score
    return "neutral", score

In [None]:
class BaseProvider:
    """Abstract base class for content providers."""

    def fetch(self, query: str, limit: int = 10, **kwargs: Mapping[str, object]) -> Iterable[RawArticle]:
        raise NotImplementedError

In [None]:
class NewsAPIProvider(BaseProvider):
    """Fetch articles from newsapi.org when an API key is available."""

    BASE_URL = "https://newsapi.org/v2/everything"

    def __init__(self, api_key: str) -> None:
        if not api_key:
            raise ValueError("NewsAPIProvider requires an API key")
        self._api_key = api_key

    def fetch(self, query: str, limit: int = 10, **kwargs: Mapping[str, object]) -> Iterable[RawArticle]:
        params = {
            "q": query,
            "pageSize": limit,
            "language": kwargs.get("language", "en"),
            "sortBy": kwargs.get("sort_by", "publishedAt"),
        }
        response = requests.get(
            self.BASE_URL,
            params=params,
            headers={"Authorization": self._api_key},
            timeout=10,
        )
        response.raise_for_status()
        payload = response.json()
        for article in payload.get("articles", []):
            yield RawArticle(
                title=article.get("title") or "Untitled",
                url=article.get("url") or "",
                source=(article.get("source") or {}).get("name") or "Unknown",
                published_at=_parse_date(article.get("publishedAt")),
                content=article.get("content"),
                description=article.get("description"),
            )


def _parse_date(value: Optional[str]) -> Optional[datetime]:
    if not value:
        return None
    try:
        return datetime.fromisoformat(value.replace("Z", "+00:00"))
    except ValueError:
        return None

In [None]:
class WiredRSSProvider(BaseProvider):
    """Fetches and filters articles from Wired RSS feeds."""

    DEFAULT_SECTIONS = {
        "business": "https://www.wired.com/feed/category/business/latest/rss",
        "science": "https://www.wired.com/feed/category/science/latest/rss",
    }

    def __init__(self, sections: Mapping[str, str] | None = None) -> None:
        self._sections = dict(sections or self.DEFAULT_SECTIONS)

    def fetch(self, query: str, limit: int = 10, **kwargs: Mapping[str, object]) -> Iterable[RawArticle]:
        normalized_query = query.lower()
        results: List[RawArticle] = []
        for section, url in self._sections.items():
            try:
                response = requests.get(url, timeout=10)
                response.raise_for_status()
            except Exception:
                continue
            feed = feedparser.parse(response.content)
            for entry in feed.entries or []:
                if normalized_query not in _entry_text(entry):
                    continue
                results.append(
                    RawArticle(
                        title=entry.get("title") or f"Wired {section.title()} Update",
                        url=entry.get("link") or "",
                        source=f"Wired {section.title()}",
                        published_at=_parse_published(entry),
                        content=_get_content(entry),
                        description=entry.get("summary"),
                    )
                )
                if len(results) >= limit:
                    return results
        return results


def _entry_text(entry: Mapping[str, object]) -> str:
    title = str(entry.get("title", ""))
    summary = str(entry.get("summary", ""))
    content = ""
    contents = entry.get("content")
    if contents:
        try:
            content = " ".join(part.get("value", "") for part in contents if isinstance(part, Mapping))
        except Exception:
            content = ""
    return f"{title} {summary} {content}".lower()


def _get_content(entry: Mapping[str, object]) -> Optional[str]:
    contents = entry.get("content")
    if contents:
        parts: List[str] = []
        for part in contents:
            if isinstance(part, Mapping):
                value = part.get("value")
                if isinstance(value, str):
                    parts.append(value)
        if parts:
            return "\n\n".join(parts)
    summary = entry.get("summary")
    return summary if isinstance(summary, str) else None


def _parse_published(entry: Mapping[str, object]) -> Optional[datetime]:
    published_parsed = entry.get("published_parsed")
    if published_parsed:
        try:
            return datetime(*published_parsed[:6], tzinfo=timezone.utc)
        except Exception:
            pass
    updated = entry.get("updated_parsed")
    if updated:
        try:
            return datetime(*updated[:6], tzinfo=timezone.utc)
        except Exception:
            pass
    return None

In [None]:
class MockProvider(BaseProvider):
    """Returns hard-coded articles for offline development."""

    def fetch(self, query: str, limit: int = 10, **kwargs) -> Iterable[RawArticle]:
        now = datetime.utcnow()
        sample = [
            RawArticle(
                title=f"{query.title()} expands sustainability efforts",
                url="https://example.com/sustainability",
                source="Example News",
                published_at=now - timedelta(hours=2),
                content=(
                    f"{query} announced new sustainability targets aimed at reducing emissions by 30% "
                    "over the next five years. The initiative includes investments in renewable energy "
                    "and supply chain transparency."
                ),
                description="Company targets lower emissions and greener supply chains.",
            ),
            RawArticle(
                title=f"Analysts debate {query} quarterly earnings",
                url="https://example.com/earnings",
                source="Market Watchers",
                published_at=now - timedelta(days=1),
                content=(
                    f"Market analysts offered mixed reactions to {query}'s latest earnings report, citing "
                    "flat revenue growth but improving operating margins. Investor sentiment appears "
                    "cautious heading into the next quarter."
                ),
                description="Mixed analyst sentiment following the latest results.",
            ),
        ]
        return sample[:limit]

In [None]:
class NewsAgent:
    """Aggregates, summarizes, and scores news articles."""

    def __init__(self, config: Optional[AgentConfig] = None, providers: Optional[Iterable[BaseProvider]] = None) -> None:
        self.config = config or AgentConfig.from_env()
        if providers is not None:
            self.providers = list(providers)
        else:
            self.providers = self._build_providers()
        if not self.providers:
            raise RuntimeError("No providers configured for NewsAgent")

    def _build_providers(self) -> List[BaseProvider]:
        providers: List[BaseProvider] = []
        if getattr(self.config, "newsapi_key", None):
            try:
                providers.append(NewsAPIProvider(self.config.newsapi_key))
            except Exception as exc:
                print(f"Skipping NewsAPI provider: {exc}")
        providers.append(WiredRSSProvider())
        providers.append(MockProvider())
        return providers

    def search(self, query: str, limit: Optional[int] = None, **kwargs) -> List[NewsItem]:
        if not query or not query.strip():
            raise ValueError("Query must be provided")
        limit = limit or self.config.default_limit
        seen_urls: set[str] = set()
        seen_titles: set[str] = set()
        results: List[NewsItem] = []
        for provider in self.providers:
            for raw in provider.fetch(query=query, limit=limit, **kwargs):
                if raw.url:
                    if raw.url in seen_urls:
                        continue
                    if not self._is_allowed_domain(raw.url):
                        continue
                    seen_urls.add(raw.url)
                dedupe_key = self._dedupe_key(raw)
                if dedupe_key and dedupe_key in seen_titles:
                    continue
                item = self._process(raw)
                results.append(item)
                if dedupe_key:
                    seen_titles.add(dedupe_key)
                if len(results) >= limit:
                    return results
        return results

    def _process(self, article: RawArticle) -> NewsItem:
        text = article.content or article.description
        summary = summarize(text)
        sentiment_label, sentiment_score = score_sentiment(text or "")
        excerpt = article.description or article.content
        if excerpt and len(excerpt) > 280:
            excerpt = excerpt[:277].rstrip() + "..."
        return NewsItem(
            title=article.title,
            url=article.url,
            source=article.source,
            published_at=article.published_at,
            summary=summary,
            sentiment=sentiment_label,
            sentiment_score=sentiment_score,
            excerpt=excerpt,
        )

    def _is_allowed_domain(self, url: str) -> bool:
        if not self.config.allowed_domains:
            return True
        parsed = urlparse(url)
        if not parsed.netloc:
            return True
        hostname = parsed.netloc.lower()
        for domain in self.config.allowed_domains:
            domain = domain.lower()
            if hostname == domain or hostname.endswith(f".{domain}"):
                return True
        return False

    def to_dict(self, item: NewsItem) -> dict:
        data = asdict(item)
        if item.published_at is not None:
            data["published_at"] = item.published_at.isoformat()
        return data

In [None]:
config = AgentConfig.from_env()
agent = NewsAgent(config=config)
agent

In [None]:
sample_items = agent.search("INFY", limit=3)
for idx, item in enumerate(sample_items, start=1):
    print(f"[{idx}] {item.title} - {item.sentiment} ({item.sentiment_score:.2f})")
    print(f"    Source: {item.source}")
    if item.summary:
        print(f"    Summary: {item.summary}")
    elif item.excerpt:
        print(f"    Excerpt: {item.excerpt}")
    if item.url:
        print(f"    URL: {item.url}")
    print()

## Optional: Flask App Snippet

You can adapt the snippet below to run the Flask API inside the notebook by removing the guard and running the cell. It will block the kernel, so it's typically better to keep using `app.py` for serving requests.

In [None]:
from flask import Flask, jsonify, request

app = Flask(__name__)
_agent = agent

@app.get("/health")
def healthcheck():
    return {"status": "ok"}


@app.post("/news")
def fetch_news():
    payload = request.get_json(silent=True) or {}
    query = payload.get("query")
    limit = payload.get("limit")
    if not query:
        return jsonify({"error": "`query` is required"}), 400
    try:
        items = _agent.search(query=query, limit=limit)
        return jsonify([_agent.to_dict(item) for item in items])
    except ValueError as exc:
        return jsonify({"error": str(exc)}), 400

# To run inside notebook (blocks execution):
# app.run(host="0.0.0.0", port=8008, debug=True)