In [21]:
from dotenv import load_dotenv


from langchain_groq import ChatGroq


from langgraph.graph import StateGraph , START , END
from langgraph.prebuilt import ToolNode , tools_condition
from pydantic import BaseModel , Field

from typing_extensions import TypedDict
from langgraph.graph.message import add_messages , Annotated 
from langchain.messages import HumanMessage , AIMessage , AnyMessage , SystemMessage

from langgraph.types import Send

In [22]:
class Section(BaseModel) :
    name : str = Field(description= "name of the section")
    description : str = Field(description= "brief overview of the section")


class Sections(BaseModel):
    sections : list[Section] = Field(description= "sections of the report")

In [23]:
llm = ChatGroq(model = "openai/gpt-oss-120b")

In [24]:
planner = llm.with_structured_output(Sections)

In [25]:
import operator

In [31]:
# Graph State
class State(TypedDict) :
    topic : str 
    sections : list[Section]
    completed_sections : Annotated[list , operator.add]
    final_report : str


In [27]:
# Worker state
class WorkerState(TypedDict):
    section : Section
    completed_sections : Annotated[list , operator.add]

In [29]:
# Defining nodes

def orchestrator(state : State) :
    """Orchestrator that generates a plan for the report"""

    report_sections = planner.invoke(
        [
            SystemMessage(content= "Generate a plan for the report.") , 
            HumanMessage(content= f"Here is the report topic : {state['topic']}")


        ]

    )

    return {"sections" : report_sections.sections }

In [30]:
def llm_call(state : WorkerState) :
    """Worker writes a section of the report"""

    # Generate section 
    section = llm.invoke(
        [
            SystemMessage(content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting") , 
            HumanMessage(content= f"Here is the section name : {state['section'].name} and description : {state['section'].description}")

        ]
    )

    return {"completed_sections" :  [section.content] }

In [32]:
def assign_workers(state: State):
    """Assign a worker to each section in the plan"""

    # Kick off section writing in parallel via Send() API
    return [Send("llm_call", {"section": s}) for s in state["sections"]]

In [33]:
def synthesizer(state: State):
    """Synthesize full report from sections"""

    # List of completed sections
    completed_sections = state["completed_sections"]

    # Format completed section to str to use as context for final sections
    completed_report_sections = "\n\n---\n\n".join(completed_sections)

    return {"final_report": completed_report_sections}


In [53]:
orchestrator_worker_builder = StateGraph(State)

In [54]:
# add nodes
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call" , llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)


<langgraph.graph.state.StateGraph at 0x1622e60e0>

In [None]:
# add edges

orchestrator_worker_builder.add_edge(START , "orchestrator" )
orchestrator_worker_builder.add_conditional_edges("orchestrator" , assign_workers, ["llm_call"] )  # llm_call : List of allowed next nodes
# LangGraph needs the list of possible target nodes so it can schedule these Send tasks.

orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)



<langgraph.graph.state.StateGraph at 0x1622e60e0>

In [56]:
graph = orchestrator_worker_builder.compile()

In [57]:
graph.invoke({"topic": "Create a report on Agentic AI RAGs"})

{'topic': 'Create a report on Agentic AI RAGs',
 'sections': [Section(name='Executive Summary', description='A concise overview of the report, summarizing the purpose, key findings, and recommendations regarding Agentic AI Retrieval-Augmented Generators (RAGs).'),
  Section(name='Introduction to Agentic AI', description='Defines Agentic AI, its core principles, and how it differs from traditional AI models. Sets the stage for discussing its integration with Retrieval‑Augmented Generation.'),
  Section(name='Fundamentals of Retrieval‑Augmented Generation (RAG)', description='Explains the RAG architecture, the role of external knowledge bases, and why augmentation improves factuality and relevance.'),
  Section(name='Merging Agentic AI with RAG: Conceptual Framework', description='Describes how autonomous agents can orchestrate retrieval, reasoning, and generation steps, creating a closed‑loop system that can plan, act, and self‑correct.'),
  Section(name='Key Architectural Components', 

## What happens under the hood

Orchestrator calls assign_workers(state) → returns:

[
  Send("summary_worker", {"section": s1}),
  Send("details_worker", {"section": s2}),
  Send("details_worker", {"section": s3}),
]


LangGraph iterates over this list.

For each Send():

Extracts the node name (summary_worker or details_worker)

Creates a worker instance for that node

Injects the partial state ({"section": s1}, etc.)

Schedules the worker in parallel if multiple Send() objects exist.

## Why you don’t need to manually specify the node names in add_conditional_edges

When you return a list of Send() objects, LangGraph knows the node name inside each Send().

So it can schedule them automatically, even if there are multiple target nodes.

This is what allows dynamic routing — each task can go to a different node.

# Thats why you're returning a list in :

def assign_workers(state: State):

    """Assign a worker to each section in the plan"""
    
    return [Send("llm_call", {"section": s}) for s in state["sections"]]


# Key Point

Even though llm_call tasks were “sent” from the orchestrator, the workers do not automatically return to the orchestrator node.

• The orchestrator node is mainly used to plan and dispatch work.
• Once the workers finish, you usually want the graph to proceed to the next logical step, which is the synthesizer node.

### After all llm_call workers finish, go to synthesizer. Only goes to the next node after all sub worker-graphs are executed. 

# When does synthesizer run?

• Synthesizer only runs after all llm_call workers finish.

{
  "summary_worker": "summary_worker",
  "details_worker": "details_worker"
}

In this case , 

orchestrator_worker_builder.add_edge("summary_worker", "synthesizer"). 

orchestrator_worker_builder.add_edge("details_worker", "synthesizer"). 

• Edge meaning: once all instances of summary_worker finish and all instances of details_worker finish, LangGraph moves to synthesizer.

• The synthesizer node will see the merged state containing outputs from both types of workers.

