In [1]:
from __future__ import annotations
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import SystemMessage, HumanMessage
from pydantic import BaseModel, Field
from typing import TypedDict, List, Literal, Annotated, Optional
from dotenv import load_dotenv
from pathlib import Path
from datetime import date, timedelta
import os
import re
import operator
load_dotenv()


  from pydantic.v1.fields import FieldInfo as FieldInfoV1


True

In [2]:
# Schemas
class Task(BaseModel):
    id: int
    title: str
    goal: str = Field(
        ...,
        description="One sentence discribing what the reader should be able to do/understand after this section."
    )
    bullets: List[str] = Field(
        ...,
        min_length=3,
        max_length=6,
        description="3-6 concrete, non-overlapping subpoints to cover in this section."
    )
    target_words: int = Field(
        ...,
        description="Target word count for this section (120-550)."
    )
    tags: List[str] = Field(default_factory=list)
    requires_research: bool = False
    requires_citations: bool = False
    requires_code: bool = False


In [3]:
# Schema for plan
class Plan(BaseModel):
    blog_title: str
    audience: str
    tone: str
    blog_kind: Literal["explainer","tutorial","news_roundup","camparison","system_design"]="explainer"
    constraints: List[str] = Field(default_factory=list)
    task: List[Task]




In [4]:
# Schema EvidenceItem
class EvidenceItem(BaseModel):
    title: str
    url: str
    published_at: Optional[str] = None
    snippet: Optional[str] = None
    source: Optional[str] = None

In [5]:
# Router
class RouterDecision(BaseModel):
    needs_research: bool
    mode: Literal["closed_book","hybrid","open_book"]
    queries: List[str] = Field(default_factory=list)



In [6]:
# Pack
class EvidencePack(BaseModel):
    evidence: List[EvidenceItem]= Field(default_factory=list)

In [7]:
# Image Specification
class ImageSpec(BaseModel):
    placeholder: str = Field(..., description="e.g. [[IMAGE_1]]")
    filename: str = Field(..., description="Save under images/,e.g. qkv_flow.png")
    alt: str
    caption: str
    prompt: str = Field(...,description="Prompt to send to the image model.")
    size: Literal["1024*1024", "1024*1536", "1536*1024"]="1024*1024"
    quality: Literal["low","medium","high"]="medium"

class GlobalImagePlan(BaseModel):
    md_with_placeholders: str
    images: List[ImageSpec]=Field(default_factory=list)


In [8]:
# state
class blog_state(TypedDict):
    topic: str
    # Evidence\Research
    mode: str
    needs_research: bool
    queries: List[str]
    evidence: List[EvidenceItem]
    plan: Optional[Plan]
    # Workers
    sections: Annotated[List[tuple[int,str]], operator.add]

    # reducer/image
    merged_md: str
    md_with_placeholders: str
    image_specs: List[dict]
    final: str

In [9]:
llm=ChatOpenAI(model="gpt-4.1-mini")
#Router
ROUTER_SYSTEM="""you are a routing module for a technical blog planner.
Decide whether web research is needed before planning
Modes:
- closed_book (needs_research=false)
Evergreen topics where correctness does not depand on the recent facts (concept, fundamentals).
- hybrid (needs_research=true)
Mostly evergreen but needs up-to-date examples/tools/models to be usefull
- open_book (needs_research=true)
Mostly volatile: weekly roundups, "this_week", "latest", rankings, pricing, policy/regulation.

if needs_research=true:
- Output 3-10 high signal queries.
- Queries should be scoped and specific (avoid generic queries like just "AI" or "LLM).
- If user ask for "this week/last_week/latest", reflect that constraint in the queries.
"""
def router_node(state: blog_state):
    topic=state["topic"]
    decider=llm.with_structured_output(RouterDecision)
    decision=decider.invoke(
        [SystemMessage(content=ROUTER_SYSTEM),
         HumanMessage(content=f"Topic: {topic}")]
    )
    return {
        "needs_research":decision.needs_research,
        "mode": decision.mode,
        "queries": decision.queries,
    }
def route_next(state: blog_state)->str:
    if state["needs_research"]==True:
        return "research"
    else:
        "orchestrator"

In [10]:
def _tavily_search(query: str, max_results: int = 5)-> List[dict]:
    tool=TavilySearchResults(max_results=max_results)
    results=tool.invoke({"query":query})
    normalized: List[dict]=[]
    for r in results or []:
        normalized.append({
            "title":r.get("title") or "",
            "url":r.get("url") or "",
            "snippet":r.get("snippet") or r.get("content") or "",
            "published_at":r.get("published_date") or r.get("published_at"),
            "source": r.get("source"),
        })
    return normalized

RESEARCH_SYSTEM="""you are a research synthesizer for technical writing.
Given raw web search results, produce a deduplicated list of EvidenceItem objects.

Rules:
- Only include items with a non-empty url.
- Prefer relevent + authoritative sources (company_blogs, docs, reputable outlets).
- If a published date is explicitly present in the result payload , keep it as YYY-MM-DD.
  If missing or uclear, set published_at=null. Do NOT guess.
- Keep snippets short
- Deduplicated by URL.
"""

def research_node(state: blog_state)->dict:
    queries=(state.get("queries",[]) or [])
    max_results=6
    raw_results: List[dict]=[]
    for q in queries:
        raw_results.extend(_tavily_search(q, max_results=max_results))
    if not raw_results:
        return {"evidence":[]}
    extractor=llm.with_structured_output(EvidencePack)
    pack=extractor.invoke([
        SystemMessage(content=RESEARCH_SYSTEM),
        HumanMessage(content=f"Raw Results:\n {raw_results}")
    ])
    # Deduplilcate 
    dedup={}
    for e in pack.evidence:
        if e.url:
            dedup[e.url]=e
    return {"evidence":list(dedup.values())}


In [11]:
# Orchestrator
ORCH_SYSTEM="""You are a senior technical writer and developer advocate.
Your job is to produce a highly actionable outline for a technical blog post.

Hard requirements:
- Create 5–9 sections (tasks) suitable for the topic and audience.
- Each task must include:
  1) goal (1 sentence)
  2) 3–6 bullets that are concrete, specific, and non-overlapping
  3) target word count (120–550)

Quality bar:
- Assume the reader is a developer; use correct terminology.
- Bullets must be actionable: build/compare/measure/verify/debug.
- Ensure the overall plan includes at least 2 of these somewhere:
  * minimal code sketch / MWE (set requires_code=True for that section)
  * edge cases / failure modes
  * performance/cost considerations
  * security/privacy considerations (if relevant)
  * debugging/observability tips

Grounding rules:
- Mode closed_book: keep it evergreen; do not depend on evidence.
- Mode hybrid:
  - Use evidence for up-to-date examples (models/tools/releases) in bullets.
  - Mark sections using fresh info as requires_research=True and requires_citations=True.
- Mode open_book:
  - Set blog_kind = "news_roundup".
  - Every section is about summarizing events + implications.
  - DO NOT include tutorial/how-to sections unless user explicitly asked for that.
  - If evidence is empty or insufficient, create a plan that transparently says "insufficient sources"
    and includes only what can be supported.

Output must strictly match the Plan schema.
"""
def orchestrator_node(state: blog_state)-> dict:
    planner=llm.with_structured_output(Plan)
    evidence=state.get("evidence",[])
    mode=state.get("mode","closed_book")
    plan=planner.invoke([
        SystemMessage(content=ORCH_SYSTEM),
        HumanMessage(
            content=(
                f"Topic: {state["topic"]}\n"
                f"Mode: {mode}\n\n"
                f"Evidence (Only use for the fresh claims; may be empty):\n"
                f"{[e.model_dump() for e in evidence][:16]}"
            )
        )
    ])
    return {"plan":plan}
    



In [13]:
def fanout(state: blog_state):
    sends=[]
    for task in state["plan"].tasks:
        sends.append(
            send(
                "worker",
                {
                    "task":task.model_dump(),
                    "topic":state["topic"],
                    "mode": state["mode"],
                    "plan": state["plan"].model_dump(),
                    "evidence":[e.model_dump() for e in state.get("evidence",[])],
                }
            )
        )
    return sends

In [None]:
# worker 
WORKER_SYSTEM="""You are a senior technical writer and developer advocate.
Write ONE section of a technical blog post in Markdown.

Hard constraints:
- Follow the provided Goal and cover ALL Bullets in order (do not skip or merge bullets).
- Stay close to Target words (±15%).
- Output ONLY the section content in Markdown (no blog title H1, no extra commentary).
- Start with a '## <Section Title>' heading.

Scope guard:
- If blog_kind == "news_roundup": do NOT turn this into a tutorial/how-to guide.
  Do NOT teach web scraping, RSS, automation, or "how to fetch news" unless bullets explicitly ask for it.
  Focus on summarizing events and implications.

Grounding policy:
- If mode == open_book:
  - Do NOT introduce any specific event/company/model/funding/policy claim unless it is supported by provided Evidence URLs.
  - For each event claim, attach a source as a Markdown link: ([Source](URL)).
  - Only use URLs provided in Evidence. If not supported, write: "Not found in provided sources."
- If requires_citations == true:
  - For outside-world claims, cite Evidence URLs the same way.
- Evergreen reasoning is OK without citations unless requires_citations is true.

Code:
- If requires_code == true, include at least one minimal, correct code snippet relevant to the bullets.

Style:
- Short paragraphs, bullets where helpful, code fences for code.
- Avoid fluff/marketing. Be precise and implementation-oriented.
"""
def worker_node(payload: dict)-> dict:
    
