In [9]:
from langgraph.graph import StateGraph, END, START
from langgraph.types import Send

In [10]:
from typing import TypedDict, Annotated, List, Literal
from pydantic import BaseModel, Field
import operator
from pprint import pprint

In [11]:
from IPython.display import Image, display

In [16]:
from langchain_openai import ChatOpenAI
from langchain_community.chat_models import ChatOllama
from langchain_core.prompts import ChatPromptTemplate

# Initialize the language model - using gpt-4o-mini for cost-effective experimentation
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)


In [13]:
# Travel destination schema
class Destination(BaseModel):
    name: str = Field(
        description="Name of the travel destination (e.g., Paris, Tokyo, New York)."
    )
    activities: List[str] = Field(
        description="List of suggested activities or attractions at this destination."
    )
    region: str = Field(
        description="Region or country of the destination (e.g., France, Japan, USA)."
    )

class Destinations(BaseModel):
    sections: List[Destination] = Field(
        description="A list of destination sections with activities."
    )

In [14]:
# Construct a prompt template for planning destinations
destination_prompt = ChatPromptTemplate.from_messages([
    (
        "system",
        "You are an assistant that generates a structured travel plan.\n\n"
        "The user wants to visit the following destinations: {destinations}\n\n"
        "For each destination, return a section with:\n"
        "- the name of the destination\n"
        "- a comma-separated list of suggested activities or attractions\n"
        "- the region or country"
    )
])

In [17]:
# Use LCEL to pipe the prompt to an LLM with structured output of Destinations
planner_pipe = destination_prompt | llm.with_structured_output(Destinations)

In [None]:
class State(TypedDict):
    destinations: str  # User input: list of destinations
    sections: List[Destination] # Sections for each destination
    completed_plan: Annotated[List[str], operator.add]  # Worker-generated travel guides
    final_travel_guide: str  # Fully compiled travel guide

In [None]:
def orchestrator(state: State):
    """Orchestrator agent that generates a structured travel plan"""
    destination_descriptions = planner_pipe.invoke({"destinations": state["destinations"]})
    return {"sections": destination_descriptions.sections}

In [None]:
# Travel Agent prompt
travel_agent_prompt = ChatPromptTemplate.from_messages([
    (
        "system",
        "You are a world-class travel agent specialized in {region}.\n\n"
        "Please introduce yourself briefly and present a detailed itinerary for: {name}.\n"
        "Your response should include:\n"
        "- Greeting with your name and expertise\n"
        "- List of activities with step-by-step recommendations\n"
        "- Any tips for travelers\n\n"
        "Use the following activities: {activities}."
    )
])

In [None]:
travel_agent_pipe = travel_agent_prompt | llm

In [None]:
class WorkerState(TypedDict):
    section: Destination
    completed_plan: Annotated[list, operator.add]

In [None]:
def assign_workers(state: State):
    """Assign a worker to each destination section"""
    return [Send("travel_agent_worker", {"section": s}) for s in state["sections"]]

In [None]:
def travel_agent_worker(state: WorkerState):
    """Worker agent that generates itinerary for one destination"""
    itinerary = travel_agent_pipe.invoke({
        "name": state["section"].name,
        "region": state["section"].region,
        "activities": state["section"].activities
    })
    return {"completed_plan": [itinerary.content]}

In [None]:
def synthesizer(state: State):
    """Synthesizer agent to merge all travel sections"""
    completed_sections = state["completed_plan"]
    full_guide = "\n\n---\n\n".join(completed_sections)
    return {"final_travel_guide": full_guide}

In [None]:
# Build the workflow
orchestrator_worker_builder = StateGraph(State)

# Add nodes
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("travel_agent_worker", travel_agent_worker)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

In [None]:
# Add edges
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator", assign_workers, ["travel_agent_worker"]

In [None]:
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_edge("travel_agent_worker", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

In [None]:
# Compile workflow
orchestrator_worker = orchestrator_worker_builder.compile()

In [None]:
# Display workflow
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))

In [None]:
# Invoke the workflow with a string of destinations in a dict
state = orchestrator_worker.invoke({
    "destinations": "Paris, Tokyo, and New York"
})

# Print the final travel guide
print(state["final_travel_guide"])