In [1]:
import os
from typing import TypedDict, Annotated, List
from langgraph.graph import Graph, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from langchain_core.runnables.graph import MermaidDrawMethod
from datetime import datetime
import re
from dotenv import load_dotenv
from newsapi import NewsApiClient
import requests
from bs4 import BeautifulSoup
from IPython.display import display, Image as IPImage
from langchain_core.messages import SystemMessage

In [2]:
load_dotenv()

# Use GPT-4o-mini
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.5
)

class GraphState(TypedDict):
    news_query: Annotated[str, "Input query to extract news search parameters from."]
    num_searches_remaining: Annotated[int, "Number of searches remaining."]
    newsapi_params: Annotated[dict, "Structured argument for the News API."]
    past_searches: Annotated[List[dict], "List of search params already used."]
    articles_metadata: Annotated[List[dict], "Article metadata response from the News API."]
    scraped_urls: Annotated[List[str], "List of urls already scraped."]
    num_articles_tldr: Annotated[int, "Number of articles to create TL;DR for."]
    potential_articles: Annotated[List[dict], "Articles with full text to consider summarizing."]
    tldr_articles: Annotated[List[dict], "Selected article TL;DRs."]
    formatted_results: Annotated[str, "Formatted results to display."]


In [3]:
class NewsApiParams(BaseModel):
    q: str = Field(description="1-3 concise keyword search terms that are not too specific")
    sources: str =Field(description="comma-separated list of sources from: 'abc-news,abc-news-au,associated-press,australian-financial-review,axios,bbc-news,bbc-sport,bloomberg,business-insider,cbc-news,cbs-news,cnn,financial-post,fortune'")
    from_param: str = Field(description="date in format 'YYYY-MM-DD' Two days ago minimum. Extend up to 30 days on second and subsequent requests.")
    to: str = Field(description="date in format 'YYYY-MM-DD' today's date unless specified")
    language: str = Field(description="language of articles 'en' unless specified one of ['ar', 'de', 'en', 'es', 'fr', 'he', 'it', 'nl', 'no', 'pt', 'ru', 'se', 'ud', 'zh']")
    sort_by: str = Field(description="sort by 'relevancy', 'popularity', or 'publishedAt'")

In [25]:
# 4. generate_newsapi_params
def generate_newsapi_params(state: dict):
    """Generate News API params from the user's query, then enforce a tight, fresh search."""
    today_date = datetime.now().strftime("%Y-%m-%d")
    news_query = state['news_query']
    num_searches_remaining = state['num_searches_remaining']
    past_searches = state['past_searches']
    last_req = state.get("last_newsapi_request")
    sys_prompt = """
    Today's date is {today_date}.
    Create a param dict for the News API on the user query:
    {query}

    These searches have already been made. Loosen the search terms to get more results:
    {past_searches}

    Including this one, you have {num_searches_remaining} searches remaining.
    If this is your last search, use all news resources and a 30-day search range.
    """
    sys_msg = sys_prompt.format(
        today_date=today_date,
        query=news_query,
        past_searches=past_searches,
        num_searches_remaining=num_searches_remaining
    )

    llm_with_news_structured_output = llm.with_structured_output(NewsApiParams)
    result = llm_with_news_structured_output.invoke([SystemMessage(content=sys_msg)])

    # Base params from LLM
    params = {
        'q': result.q,
        'sources': result.sources,
        'from_param': result.from_param,
        'to': result.to,
        'language': result.language,
        'sort_by': result.sort_by
    }

    q_lower = news_query.lower()

    # --- Smart quoting for model-like queries (contains both letters & numbers)
    import re
    if re.search(r"[a-zA-Z]+\s*\d", q_lower):
        params["q"] = f"\"{news_query}\" OR {news_query.split()[0]}"
        params.pop("sources", None)

    # --- Broaden if we’re repeating the same params as last time
    if last_req and params.get("q") == last_req.get("q") and params.get("sources") == last_req.get("sources"):
        params.pop("sources", None)
        if "OR" not in params["q"]:
            params["q"] = f"{params['q']} OR {news_query}"

    state["newsapi_params"] = params
    return state

    # ✅ Prefer most-recent
    params['sort_by'] = 'publishedAt'
    state['newsapi_params'] = params
    return state


In [5]:
from datetime import datetime, timedelta, UTC

VALID_SORT = {"relevancy", "popularity", "publishedAt"}

def _parse_date_yyyy_mm_dd(d):
    if not d:
        return None
    s = str(d)[:10]
    try:
        return datetime.fromisoformat(s).date()
    except Exception:
        return None

def _clamp_30_days(from_param, to_param):
    today = datetime.now(UTC).date()
    max_age = today - timedelta(days=30)
    t = _parse_date_yyyy_mm_dd(to_param) or today
    if t > today:
        t = today
    f = _parse_date_yyyy_mm_dd(from_param) or max_age
    if f < max_age:
        f = max_age
    if f > t:
        f = t - timedelta(days=1)
    return f.isoformat(), t.isoformat()

def _clean_and_validate_params(newsapi, params: dict) -> dict:
    q = (params.get("q") or "").strip()
    language = (params.get("language") or "en").strip()
    sort_by = (params.get("sort_by") or "publishedAt").strip()
    if sort_by not in VALID_SORT:
        sort_by = "publishedAt"

    from_param, to_param = _clamp_30_days(params.get("from_param"), params.get("to"))

    requested_sources = params.get("sources")
    cleaned_sources = None
    if requested_sources:
        if isinstance(requested_sources, (list, tuple, set)):
            requested_sources = ",".join([s for s in requested_sources if s])
        elif isinstance(requested_sources, str):
            requested_sources = ",".join([s.strip() for s in requested_sources.split(",") if s.strip()])
        else:
            requested_sources = None

        if requested_sources:
            try:
                src_resp = newsapi.get_sources(language=language)
                valid_ids = {s["id"] for s in src_resp.get("sources", []) if s.get("id")}
                keep = [s for s in requested_sources.split(",") if s in valid_ids]
                if keep:
                    cleaned_sources = ",".join(keep)
            except Exception:
                cleaned_sources = None

    # Python SDK param names
    safe = {
        "language": language,
        "sort_by": sort_by,
        "from_param": from_param,
        "to": to_param,
        "page_size": 50,
    }
    if q:
        safe["q"] = q
    # keep sources only if validated; otherwise omit for broader coverage
    if cleaned_sources:
        safe["sources"] = cleaned_sources

    if "q" not in safe and "sources" not in safe:
        safe["q"] = "news"
    return safe

def _dec(state: dict) -> None:
    state["num_searches_remaining"] = max(0, int(state.get("num_searches_remaining", 0)) - 1)

def retrieve_article_metadata(state: dict) -> dict:
    state.setdefault("articles_metadata", [])
    state.setdefault("past_searches", [])
    state.setdefault("potential_articles", [])
    state.setdefault("scraped_urls", set())

    newsapi_params = dict(state.get("newsapi_params", {}))
    scraped_urls = set(state.get("scraped_urls") or [])
    potential_articles = state.get("potential_articles") or []
    past_searches = state.get("past_searches") or []

    api_key = os.getenv("NEWS_API_KEY")
    if not api_key:
        state["last_newsapi_error"] = "NEWS_API_KEY not set"
        state["articles_metadata"] = []
        _dec(state)
        return state
    newsapi = NewsApiClient(api_key=api_key)

    safe_params = _clean_and_validate_params(newsapi, newsapi_params)
    state["last_newsapi_request"] = dict(safe_params)

    try:
        res = newsapi.get_everything(**safe_params)
    except Exception as e:
        state["last_newsapi_error"] = f"Request exception: {e}"
        state["articles_metadata"] = []
        _dec(state)
        return state

    if not isinstance(res, dict) or res.get("status") != "ok":
        msg = res.get("message", "Unknown NewsAPI error") if isinstance(res, dict) else "Invalid response"
        state["last_newsapi_error"] = f"API error: {msg}"
        state["articles_metadata"] = []
        _dec(state)
        return state

    if not past_searches or past_searches[-1] != dict(safe_params):
        past_searches.append(dict(safe_params))

    target_total = 10
    remaining_slots = max(0, target_total - len(potential_articles))
    new_articles = []

    for article in res.get("articles", []):
        url = (article or {}).get("url")
        if not url or url in scraped_urls:
            continue
        if len(new_articles) >= remaining_slots:
            break
        new_articles.append(article)

    state["articles_metadata"] = new_articles
    state["past_searches"] = past_searches
    state["last_newsapi_error"] = None
    _dec(state)
    return state


In [6]:
from typing import Dict, Any
import requests
from bs4 import BeautifulSoup

# Soften filters
PAYWALLED_OR_FUSSY = {
    "bloomberg.com", "wsj.com", "ft.com", "nytimes.com",
    "economist.com", "washingtonpost.com"
}
ANTI_BOT_MARKERS = [
    "are you a robot", "enable javascript", "turn on javascript",
    "cookie policy", "terms of service", "subscribe to", "paywall"
]
MIN_TEXT_LEN = 200   # was 800

def _looks_bad(text: str) -> bool:
    t = text.lower()
    return any(marker in t for marker in ANTI_BOT_MARKERS)

def _domain(url: str) -> str:
    try:
        return url.split("//", 1)[1].split("/", 1)[0].replace("www.", "")
    except Exception:
        return ""

def retrieve_articles_text(state: Dict[str, Any]) -> Dict[str, Any]:
    """Fetch full text for each article and populate potential_articles with good content."""
    state = dict(state or {})
    articles_meta = state.get("articles_metadata") or []
    scraped_urls = set(state.get("scraped_urls") or [])
    potential = state.get("potential_articles") or []
    max_to_fetch = max(0, 10 - len(potential))

    state["last_scrape_error"] = None

    if not isinstance(articles_meta, list):
        state["articles_metadata"] = []
        state["last_scrape_error"] = "articles_metadata not list"
        return state

    headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/121.0.0.0 Safari/537.36"
        ),
        "Accept-Language": "en-US,en;q=0.9",
    }

    new_items = []
    for art in articles_meta:
        if len(new_items) >= max_to_fetch:
            break
        url = (art or {}).get("url")
        if not url or url in scraped_urls:
            continue

        dom = _domain(url)

        # Try network scrape unless the domain is notoriously hostile
        scraped_ok = False
        text = ""
        if dom not in PAYWALLED_OR_FUSSY:
            try:
                resp = requests.get(url, timeout=10, headers=headers)
                if resp.status_code == 200 and resp.text:
                    soup = BeautifulSoup(resp.text, "html.parser")
                    text = " ".join(t.strip() for t in soup.stripped_strings)
                    if text and not _looks_bad(text) and len(text) >= MIN_TEXT_LEN:
                        scraped_ok = True
            except Exception as e:
                state["last_scrape_error"] = f"scrape error for {url}: {e}"

        # Fallback to NewsAPI fields if scrape failed or is paywalled
        if not scraped_ok:
            fallback_text = ((art.get("content") or "") + "\n" + (art.get("description") or "")).strip()
            if len(fallback_text) >= 150:  # was 400
                text = fallback_text
                scraped_ok = True

        if not scraped_ok:
            scraped_urls.add(url)
            continue

        new_items.append({
            "title": art.get("title") or "",
            "url": url,
            "text": text[:20000],
            "publishedAt": art.get("publishedAt", ""),
            "description": art.get("description", "")
        })
        scraped_urls.add(url)

    state["potential_articles"] = potential + new_items
    state["scraped_urls"] = list(scraped_urls)
    return state


In [7]:
def articles_text_decision(state: Dict[str, Any]) -> str:
    # If we have at least one scraped article, proceed to select top urls
    if state.get("potential_articles"):
        return "select_top_urls"

    # If we never got metadata, either broaden or end
    if not state.get("articles_metadata"):
        # try again if you still have searches left
        if state.get("num_searches_remaining", 0) > 1:
            return "generate_newsapi_params"
        return "END"

    # We had metadata but got no text (paywall, scraper failed, etc.)
    return "select_top_urls" if state.get("articles_metadata") else "END"

In [8]:
def select_top_urls(state: GraphState) -> GraphState:
    """Based on the article synopses, choose the top-n articles to summarize."""
    news_query = state["news_query"]
    num_articles_tldr = state["num_articles_tldr"]

    # processed articles 
    potential_articles = state.get("potential_articles", [])

    # format the metadata
    formatted_metadata = "\n".join(
        f"{article.get('url','')}\n{article.get('description','')}\n"
        for article in potential_articles
        if article.get("url")
    )

    prompt = f"""
    Based on the user news query:
    {news_query}

    Reply with a list of strings of up to {num_articles_tldr} relevant urls.
    Don't add any urls that are not relevant or aren't listed specifically.
    {formatted_metadata}
    """
    result = llm.invoke(prompt).content

    # extract URLs
    url_pattern = r'(https?://[^\s",]+)'
    urls = re.findall(url_pattern, result)

    # keep only selected articles
    tldr_articles = [a for a in potential_articles if a.get("url") in urls]

    state["tldr_articles"] = tldr_articles
    return state


In [9]:
import asyncio

async def summarize_articles_parallel(state: GraphState) -> GraphState:
    """Summarize the articles concurrently based on full text."""
    tldr_articles = state.get("tldr_articles", [])

    prompt_template = """
    Create a * bulleted summarizing tldr for the article:
    {text}

    Be sure to follow the following format exactly with nothing else:
    {title}
    {url}
    * tl;dr bulleted summary
    * use bullet points for each sentence
    """

    async def summarize_article(article):
        text = article.get("text", "")
        title = article.get("title", "")
        url = article.get("url", "")
        prompt = prompt_template.format(title=title, url=url, text=text)
        result = await llm.ainvoke(prompt)
        article["summary"] = result.content
        return article

    if tldr_articles:
        tldr_articles = await asyncio.gather(*[summarize_article(a) for a in tldr_articles])

    state["tldr_articles"] = tldr_articles
    return state


In [10]:
def format_results(state: GraphState) -> GraphState:
    """Format the results for display."""
    # load a list of past search queries
    q = [newsapi_params["q"] for newsapi_params in state["past_searches"]]
    formatted_results = f"Here are the top {len(state['tldr_articles'])} articles based on search terms:\\n{', '.join(q)}\\n\\n"

    # load the summarized articles
    tldr_articles = state["tldr_articles"]

    # format article tl;dr summaries
    tldr_articles = "\\n\\n".join([f"{article['summary']}" for article in tldr_articles])

    # concatenate summaries to the formatted results
    formatted_results += tldr_articles

    state["formatted_results"] = formatted_results

    return state

In [11]:
def format_results(state: GraphState) -> GraphState:
    """Format the results for display."""
    # load a list of past search queries
    q = [params.get("q", "") for params in state.get("past_searches", [])]
    formatted_results = (
        f"Here are the top {len(state['tldr_articles'])} articles based on search terms:\n"
        f"{', '.join(q)}\n\n"
    )

    # load the summarized articles
    tldr_articles = state["tldr_articles"]

    # format article tl;dr summaries
    tldr_articles = "\n\n".join([f"{article['summary']}" for article in tldr_articles])

    # concatenate summaries to the formatted results
    formatted_results += tldr_articles

    state["formatted_results"] = formatted_results
    return state


In [12]:
def articles_text_decision(state: GraphState) -> str:
    """Route after retrieve_articles_text with strict stop conditions."""
    remaining = int(state.get("num_searches_remaining", 0))
    have = len(state.get("potential_articles", []) or [])
    need = int(state.get("num_articles_tldr", 0))

    # If we already have enough content, proceed to selection
    if have >= max(1, need):
        return "select_top_urls"

    # No searches left: end if empty, otherwise proceed with what we have
    if remaining <= 0:
        if have == 0:
            state["formatted_results"] = "No articles with text found."
            return "END"
        return "select_top_urls"

    # Still have searches left but not enough content -> broaden & retry
    return "generate_newsapi_params"


In [None]:
from langgraph.graph import START, END

def make_app() -> Graph:
    workflow = Graph()

    # nodes
    workflow.add_node("generate_newsapi_params", generate_newsapi_params)
    workflow.add_node("retrieve_articles_metadata", retrieve_article_metadata)
    workflow.add_node("retrieve_articles_text", retrieve_articles_text)
    workflow.add_node("select_top_urls", select_top_urls)
    workflow.add_node("summarize_articles_parallel", summarize_articles_parallel)
    workflow.add_node("format_results", format_results)

    # entrypoint
    workflow.add_edge(START, "generate_newsapi_params")

    # edges
    workflow.add_edge("generate_newsapi_params", "retrieve_articles_metadata")
    workflow.add_edge("retrieve_articles_metadata", "retrieve_articles_text")

    workflow.add_conditional_edges(
        "retrieve_articles_text",
        articles_text_decision,
        {
            "generate_newsapi_params": "generate_newsapi_params",
            "select_top_urls": "select_top_urls",
            "END": END,
        },
    )

    workflow.add_edge("select_top_urls", "summarize_articles_parallel")

    workflow.add_conditional_edges(
        "summarize_articles_parallel",
        lambda state: "format_results" if len(state.get("tldr_articles", [])) > 0 else "END",
        {
            "format_results": "format_results",
            "END": END,
        },
    )

    workflow.add_edge("format_results", END)
    return workflow.compile()

# Build the app (this must run before calling run_workflow)
app = make_app()


In [26]:
async def run_workflow(query: str, num_searches_remaining: int = 3, num_articles_tldr: int = 2):
    """Run the LangGraph workflow and display results."""
    initial_state = {
        "news_query": query,
        "num_searches_remaining": num_searches_remaining,
        "newsapi_params": {},
        "past_searches": [],
        "articles_metadata": [],
        "scraped_urls": [],
        "num_articles_tldr": num_articles_tldr,
        "potential_articles": [],
        "tldr_articles": [],
        "formatted_results": "No articles with text found."
    }
    try:
        # Add recursion_limit to prevent infinite loops
        result = await app.ainvoke(initial_state, config={"recursion_limit": 50})
        return result["formatted_results"]
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None


In [27]:
diag_state = await app.ainvoke({
    "news_query": "delhi red fort blast",
    "num_searches_remaining": 2,
    "newsapi_params": {},
    "past_searches": [],
    "articles_metadata": [],
    "scraped_urls": [],
    "num_articles_tldr": 3,
    "potential_articles": [],
    "tldr_articles": [],
    "formatted_results": ""
}, config={"recursion_limit": 40})

print("last_newsapi_request =", diag_state.get("last_newsapi_request"))
print("last_newsapi_error   =", diag_state.get("last_newsapi_error"))
print("last_scrape_error    =", diag_state.get("last_scrape_error"))
print("metadata len         =", len(diag_state.get("articles_metadata", [])))
print("potential len        =", len(diag_state.get("potential_articles", [])))
print("tldr len             =", len(diag_state.get("tldr_articles", [])))


last_newsapi_request = {'language': 'en', 'sort_by': 'publishedAt', 'from_param': '2025-10-11', 'to': '2025-11-10', 'page_size': 50, 'q': 'Delhi Red Fort incident', 'sources': 'abc-news,bbc-news,cnn,associated-press'}
last_newsapi_error   = None
last_scrape_error    = None
metadata len         = 0
potential len        = 0
tldr len             = 0


In [22]:
query = "delhi red fort blast"
result=await run_workflow(query, num_articles_tldr=3)
print(result)

No articles with text found.
