In [2]:
from dotenv import load_dotenv
from __future__ import annotations

import operator
from typing import TypedDict, List, Annotated

from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START , END
from langgraph.types import Send

from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage, HumanMessage

load_dotenv()

True

In [3]:
class Task(BaseModel):
    id: int
    title: str
    brief: str = Field(..., description="What to cover")
    

In [4]:
class Plan(BaseModel):
    blog_title: str
    tasks: List[Task]

In [5]:
class State(TypedDict):
    topic: str
    plan: Plan
    # reducer: results from workers get concatenated automatically
    sections: Annotated[List[str], operator.add]
    final: str

In [6]:
llm = init_chat_model("gpt-4.1-mini")

In [7]:
# orchestrator node

def orchestrator(state: State) -> dict:
    plan = llm.with_structured_output(Plan).invoke(
        [
            SystemMessage(content=("Create a blog with 5-7 sections on the following topic")),
            HumanMessage(content=f"Topic: {state['topic']}"),
        ]
    )
    return {
        "plan": plan 
    }

In [8]:
'''
The `Send` class is used within a `StateGraph`'s conditional edges to
    dynamically invoke a node with a custom state at the next step.

    Importantly, the sent state can differ from the core graph's state,
    allowing for flexible and dynamic workflow management.

    Send is LangGraph's way of saying "spawn a new parallel execution to this node with this specific data."
    
    ## Visual Representation

                    ┌─── Send("Worker", {task_1, topic, plan}) ──► Worker
                    │
                    ├─── Send("Worker", {task_2, topic, plan}) ──► Worker
                    │
fanout() returns ───┼─── Send("Worker", {task_3, topic, plan}) ──► Worker
                    │
                    ├─── Send("Worker", {task_4, topic, plan}) ──► Worker
                    │
                    └─── Send("Worker", {task_5, topic, plan}) ──► Worker

'''


def fanout(state: State):
    return [
        Send("Worker", {"task":task, "topic": state["topic"], "plan": state["plan"]})
        for task in state["plan"].tasks
    ]

In [9]:
def worker(payload:dict) -> dict:
    task = payload["task"]
    topic= payload["topic"]
    plan =payload["plan"]

    blog_title = plan.blog_title

    section_md = llm.invoke(
        [
            SystemMessage(content="Write one clean Markdown Section"),
            HumanMessage(content=(
                f"Blog: {blog_title}\n",
                f"Topic: {topic}\n",
                f"Section: {task.title}\n",
                f"Brief: {task.brief}\n\n",
                "Return only the section content in Markdown"
            ))
        ]
    ).content.strip()

    return {
        "sections": [section_md]
    }


In [None]:
from pathlib import Path


def reducer(state: State) -> dict:
    title = state["plan"].blog_title
    body = "\n\n".join(state["sections"]).strip()

    final_md = f"# {title}\n\n{body}\n"

    # save to file 

    filename = title.lower().replace(" ", "_") + ".md"
    output_path = Path(filename)
    output_path.write_text(final_md, encoding="utf-8")

    return {
        "final" : final_md
    }