In [None]:
from typing import Annotated, List
import operator 
from typing_extensions import Literal 
from pydantic import BaseModel, Field 
from langchain_core.messages import SystemMessage, HumanMessage
from typing_extensions import TypedDict
from langchain_groq import ChatGroq 

llm = ChatGroq(model="openai/gpt-oss-20b")

In [None]:
# Schema for structured output 
class Section(BaseModel):
    name: str = Field(description="Name of this section of the report")
    description: str = Field(description="Brief overview of the main topics and concepts of this section")

class Sections(BaseModel):
    sections: List[Section] = Field(description="Sections of the report")

# Assign the LLM with a schema for structured output
planner = llm.with_structured_output(Sections)

In [None]:
from langgraph.types import Send 

# Graph state 
class State(TypedDict):
    topic: str  # Report topic 
    sections: list[Section]  # List of report sections 
    completed_sections: Annotated[
        list, operator.add 
    ] # All workers write to this key in parallel
    final_report: str

# Worker state 
class WorkerState(TypedDict):
    section: Section 
    completed_sections: Annotated[list, operator.add]

In [None]:
def orchestrator(state: State):
    """Orchestrator that generates a plan for the report"""

    report_sections = planner.invoke(
        [
            SystemMessage(content="Generate a plan for the report"),
            HumanMessage(content=f"Here is the report topic {state["topic"]}")
        ]
    )
    print("Report sections: ", report_sections)

    return {"sections": report_sections.sections}

In [None]:
def llm_call(state: WorkerState):
    """Worker writes a section of the report"""

    section = llm.invoke(
        [
            SystemMessage(
                content="Write a report section following the name and description. Don't include any introduction for each section."
            ),
            HumanMessage(
                content=f"Here is the section name: {state["section"].name} and description: {state["section"].description}"
            )
        ]
    )
    # Write the updated section to completed sections
    return {"completed_sections": [section.content]}

In [None]:
# Conditional edge function to create llm_call workers that each write a section of the report
def assign_workers(state: State):
    """Assign a worker to each section in the plan"""

    # Section writing in parallel using Send API 
    return [
        Send("llm_call", {"section": s}) for s in state["sections"]
    ]

In [None]:
def synthesizer(state: State):
    """Synthesize full report from sections"""

    completed_sections = state["completed_sections"]
    completed_report_sections = "\n\n----\n\n".join(completed_sections)
    return {"final_report": completed_report_sections}

In [None]:
# Build workflow 

from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display

orchestrator_worker_builder = StateGraph(State)

# Add the nodes 
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

# Add edges to connect nodes
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

# Compile the workflow 
orchestrator_worker = orchestrator_worker_builder.compile()

display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))

In [None]:
from IPython.display import Markdown

result = orchestrator_worker.invoke({"topic": "Create a report on Agentic AI RAG"})
Markdown(result["final_report"])