# Multi-Agent LLM System with LangGraph

This notebook demonstrates a simple multi-agent LLM system where each agent is responsible for a specific subtask:
- **Planning Agent**: Creates a plan for handling user queries
- **Summarizing Agent**: Summarizes information and extracts key points
- **Answering Agent**: Provides final answers based on processed information

The system uses **LangGraph** for orchestrating communication between agents through message passing and shared memory.

In [None]:
# Install required packages
!pip install langgraph langchain langchain-openai langchain-community python-dotenv

In [None]:
import os
import json
from typing import Dict, List, Any, TypedDict, Annotated
from datetime import datetime
import operator

# LangGraph and LangChain imports
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain.prompts import ChatPromptTemplate
import dotenv

# Load environment variables (create .env file with OPENAI_API_KEY)
dotenv.load_dotenv()

# Initialize the LLM
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0.7,
    max_tokens=1000
)

In [None]:
# Define the shared state structure
class AgentState(TypedDict):
    """Shared state that all agents can read from and write to"""
    user_query: str
    messages: Annotated[List[Dict], operator.add]  # Message history between agents
    plan: str  # Generated by Planning Agent
    summary: str  # Generated by Summarizing Agent
    final_answer: str  # Generated by Answering Agent
    current_step: str  # Current processing step
    context_data: List[str]  # Any additional context data
    
# Message structure for inter-agent communication
class AgentMessage:
    def __init__(self, sender: str, recipient: str, content: str, message_type: str = "info"):
        self.sender = sender
        self.recipient = recipient
        self.content = content
        self.message_type = message_type
        self.timestamp = datetime.now().isoformat()
    
    def to_dict(self):
        return {
            "sender": self.sender,
            "recipient": self.recipient,
            "content": self.content,
            "message_type": self.message_type,
            "timestamp": self.timestamp
        }

In [None]:
# Planning Agent - Creates a plan for handling user queries
def planning_agent(state: AgentState) -> AgentState:
    """
    Planning Agent: Analyzes the user query and creates a structured plan
    """
    print("🧠 Planning Agent: Analyzing query and creating plan...")
    
    planning_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Planning Agent responsible for creating structured plans to answer user queries.
        Analyze the user's question and create a step-by-step plan that includes:
        1. What information needs to be gathered
        2. What analysis needs to be performed
        3. How the information should be processed
        4. What the final output should contain
        
        Keep the plan concise but comprehensive."""),
        ("human", "User Query: {query}")
    ])
    
    # Generate the plan
    chain = planning_prompt | llm
    response = chain.invoke({"query": state["user_query"]})
    plan = response.content
    
    # Create message to next agent
    message = AgentMessage(
        sender="Planning Agent",
        recipient="Summarizing Agent",
        content=f"Plan created for query: '{state['user_query']}'. Plan: {plan}",
        message_type="plan"
    )
    
    # Update state
    state["plan"] = plan
    state["current_step"] = "planning_complete"
    state["messages"].append(message.to_dict())
    
    print(f"✅ Plan created: {plan[:100]}...")
    return state

In [None]:
# Summarizing Agent - Processes information and extracts key points
def summarizing_agent(state: AgentState) -> AgentState:
    """
    Summarizing Agent: Takes the plan and user query, gathers relevant information,
    and creates a summary of key points
    """
    print("📝 Summarizing Agent: Processing information and creating summary...")
    
    summarizing_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Summarizing Agent responsible for gathering and summarizing information.
        Based on the plan provided by the Planning Agent, analyze the user's query and create:
        1. Key points that need to be addressed
        2. Relevant information and context
        3. Important facts or data points
        4. A structured summary that will help the Answering Agent
        
        Be thorough but concise in your summary."""),
        ("human", """User Query: {query}
        Plan from Planning Agent: {plan}
        
        Please create a comprehensive summary.""")
    ])
    
    # Generate the summary
    chain = summarizing_prompt | llm
    response = chain.invoke({
        "query": state["user_query"],
        "plan": state["plan"]
    })
    summary = response.content
    
    # Create message to next agent
    message = AgentMessage(
        sender="Summarizing Agent",
        recipient="Answering Agent",
        content=f"Summary prepared based on plan. Key points: {summary[:200]}...",
        message_type="summary"
    )
    
    # Update state
    state["summary"] = summary
    state["current_step"] = "summarizing_complete"
    state["messages"].append(message.to_dict())
    
    print(f"✅ Summary created: {summary[:100]}...")
    return state

In [None]:
# Answering Agent - Provides final answers based on processed information
def answering_agent(state: AgentState) -> AgentState:
    """
    Answering Agent: Takes the plan and summary to provide a comprehensive final answer
    """
    print("💬 Answering Agent: Generating final answer...")
    
    answering_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are an Answering Agent responsible for providing comprehensive final answers.
        Using the plan from the Planning Agent and the summary from the Summarizing Agent,
        create a well-structured, informative, and helpful answer to the user's query.
        
        Your answer should:
        1. Directly address the user's question
        2. Be clear and easy to understand
        3. Include relevant details from the summary
        4. Follow the structure suggested in the plan
        5. Be engaging and helpful"""),
        ("human", """User Query: {query}
        Plan: {plan}
        Summary: {summary}
        
        Please provide a comprehensive final answer.""")
    ])
    
    # Generate the final answer
    chain = answering_prompt | llm
    response = chain.invoke({
        "query": state["user_query"],
        "plan": state["plan"],
        "summary": state["summary"]
    })
    final_answer = response.content
    
    # Create completion message
    message = AgentMessage(
        sender="Answering Agent",
        recipient="System",
        content="Final answer generated and ready for user.",
        message_type="completion"
    )
    
    # Update state
    state["final_answer"] = final_answer
    state["current_step"] = "complete"
    state["messages"].append(message.to_dict())
    
    print("✅ Final answer generated!")
    return state

In [None]:
# Create the Multi-Agent Workflow using LangGraph
def create_multi_agent_workflow():
    """
    Creates and returns a LangGraph workflow that orchestrates the three agents
    """
    
    # Initialize the StateGraph
    workflow = StateGraph(AgentState)
    
    # Add nodes (agents) to the graph
    workflow.add_node("planning", planning_agent)
    workflow.add_node("summarizing", summarizing_agent)
    workflow.add_node("answering", answering_agent)
    
    # Define the flow between agents
    workflow.set_entry_point("planning")  # Start with planning
    workflow.add_edge("planning", "summarizing")  # Planning -> Summarizing
    workflow.add_edge("summarizing", "answering")  # Summarizing -> Answering
    workflow.add_edge("answering", END)  # Answering -> End
    
    # Compile the workflow
    app = workflow.compile()
    
    return app

# Create the workflow
multi_agent_system = create_multi_agent_workflow()

print("🚀 Multi-Agent System created successfully!")

In [None]:
# Utility functions for system interaction
def run_multi_agent_system(user_query: str, context_data: List[str] = None):
    """
    Run the multi-agent system with a user query
    """
    print(f"🎯 Processing query: {user_query}")
    print("=" * 60)
    
    # Initialize the state
    initial_state = {
        "user_query": user_query,
        "messages": [],
        "plan": "",
        "summary": "",
        "final_answer": "",
        "current_step": "initialized",
        "context_data": context_data or []
    }
    
    # Run the workflow
    result = multi_agent_system.invoke(initial_state)
    
    return result

def display_results(result: AgentState):
    """
    Display the results in a formatted way
    """
    print("\n" + "=" * 60)
    print("📊 MULTI-AGENT SYSTEM RESULTS")
    print("=" * 60)
    
    print(f"\n🎯 Original Query: {result['user_query']}")
    
    print(f"\n🧠 Plan (Planning Agent):")
    print("-" * 40)
    print(result['plan'])
    
    print(f"\n📝 Summary (Summarizing Agent):")
    print("-" * 40)
    print(result['summary'])
    
    print(f"\n💬 Final Answer (Answering Agent):")
    print("-" * 40)
    print(result['final_answer'])
    
    print(f"\n📨 Message Flow:")
    print("-" * 40)
    for i, msg in enumerate(result['messages'], 1):
        print(f"{i}. {msg['sender']} → {msg['recipient']}: {msg['message_type']}")
        print(f"   {msg['content'][:100]}...")
        print(f"   Time: {msg['timestamp']}")
        print()

def analyze_system_performance(result: AgentState):
    """
    Analyze the performance and communication patterns of the system
    """
    print("\n" + "=" * 60)
    print("📈 SYSTEM PERFORMANCE ANALYSIS")
    print("=" * 60)
    
    # Message analysis
    print(f"Total messages exchanged: {len(result['messages'])}")
    
    # Agent participation
    agents = set()
    for msg in result['messages']:
        agents.add(msg['sender'])
    print(f"Active agents: {', '.join(agents)}")
    
    # Communication pattern
    print("\nCommunication Pattern:")
    for msg in result['messages']:
        print(f"  {msg['sender']} → {msg['recipient']} ({msg['message_type']})")
    
    # State transitions
    print(f"\nFinal state: {result['current_step']}")
    
    # Memory usage (shared state)
    state_size = sum([
        len(str(result['plan'])),
        len(str(result['summary'])),
        len(str(result['final_answer'])),
        len(str(result['messages']))
    ])
    print(f"Shared memory usage: ~{state_size} characters")

## 🚀 Demonstration Examples

Let's test our multi-agent system with different types of queries to see how the agents collaborate and communicate through message passing and shared memory.

In [None]:
# Example 1: Simple factual query
print("🔍 Example 1: Simple Factual Query")
print("=" * 50)

query1 = "What are the main benefits of renewable energy?"
result1 = run_multi_agent_system(query1)
display_results(result1)

In [None]:
# Example 2: Complex analytical query
print("\n🔍 Example 2: Complex Analytical Query")
print("=" * 50)

query2 = "How can artificial intelligence be used to improve healthcare, and what are the potential risks?"
result2 = run_multi_agent_system(query2)
display_results(result2)

In [None]:
# Analyze system performance for the second example
analyze_system_performance(result2)

## 🎮 Interactive Testing

Test the multi-agent system with your own queries!

In [None]:
# Interactive testing function
def interactive_test():
    """
    Interactive function to test the multi-agent system
    """
    print("🎮 Interactive Multi-Agent System Testing")
    print("Enter your query below, or 'quit' to exit")
    print("=" * 50)
    
    while True:
        user_input = input("\n💭 Your query: ").strip()
        
        if user_input.lower() in ['quit', 'exit', 'q']:
            print("👋 Goodbye!")
            break
        
        if not user_input:
            print("Please enter a valid query.")
            continue
        
        try:
            # Run the multi-agent system
            result = run_multi_agent_system(user_input)
            
            # Display results
            display_results(result)
            
            # Ask if user wants performance analysis
            analyze = input("\n🔍 Show performance analysis? (y/n): ").strip().lower()
            if analyze == 'y':
                analyze_system_performance(result)
                
        except Exception as e:
            print(f"❌ Error: {e}")
            print("Please try again with a different query.")

# Uncomment the line below to run interactive testing
# interactive_test()

## 🏗️ System Architecture and Features

### Key Components:

1. **Shared State (AgentState)**: Acts as shared memory that all agents can read from and write to
   - `user_query`: Original user input
   - `messages`: Message history for inter-agent communication
   - `plan`, `summary`, `final_answer`: Agent outputs
   - `current_step`: Current processing stage
   - `context_data`: Additional context information

2. **Message Passing System**: 
   - Agents communicate through structured messages
   - Messages include sender, recipient, content, type, and timestamp
   - Enables traceability and debugging of agent interactions

3. **LangGraph Workflow**:
   - Sequential flow: Planning → Summarizing → Answering
   - Each agent has a specific responsibility
   - State is passed between agents automatically

### Advanced Features:

- **Error Handling**: Robust error handling for production use
- **Performance Analysis**: Detailed metrics on agent collaboration
- **Extensible Architecture**: Easy to add new agents or modify workflow
- **Interactive Testing**: Real-time testing capabilities

In [None]:
# Advanced Multi-Agent System with Conditional Routing
def create_advanced_workflow():
    """
    Creates an advanced workflow with conditional routing based on query complexity
    """
    
    def route_after_planning(state: AgentState) -> str:
        """Decide whether to go directly to answering or through summarizing"""
        plan = state["plan"].lower()
        
        # Simple queries can skip summarizing
        if any(keyword in plan for keyword in ["simple", "direct", "basic", "straightforward"]):
            return "answering"
        else:
            return "summarizing"
    
    def complexity_analyzer(state: AgentState) -> AgentState:
        """Analyze query complexity to determine routing"""
        complexity_prompt = ChatPromptTemplate.from_messages([
            ("system", "Analyze the complexity of this query. Respond with 'SIMPLE' for straightforward factual questions or 'COMPLEX' for questions requiring analysis, comparison, or multiple steps."),
            ("human", "{query}")
        ])
        
        chain = complexity_prompt | llm
        response = chain.invoke({"query": state["user_query"]})
        complexity = response.content.strip()
        
        state["complexity"] = complexity
        return state
    
    # Create advanced workflow
    workflow = StateGraph(AgentState)
    
    # Add nodes
    workflow.add_node("complexity_analysis", complexity_analyzer)
    workflow.add_node("planning", planning_agent)
    workflow.add_node("summarizing", summarizing_agent)
    workflow.add_node("answering", answering_agent)
    
    # Define conditional routing
    workflow.set_entry_point("complexity_analysis")
    workflow.add_edge("complexity_analysis", "planning")
    workflow.add_conditional_edges(
        "planning",
        route_after_planning,
        {
            "summarizing": "summarizing",
            "answering": "answering"
        }
    )
    workflow.add_edge("summarizing", "answering")
    workflow.add_edge("answering", END)
    
    return workflow.compile()

# Create advanced system
advanced_multi_agent_system = create_advanced_workflow()
print("🚀 Advanced Multi-Agent System with conditional routing created!")

In [None]:
# Test both systems for comparison
def compare_systems(query: str):
    """Compare basic and advanced multi-agent systems"""
    print(f"🔬 Comparing Systems for Query: {query}")
    print("=" * 60)
    
    # Test basic system
    print("\n📊 BASIC SYSTEM:")
    print("-" * 30)
    basic_result = multi_agent_system.invoke({
        "user_query": query,
        "messages": [],
        "plan": "",
        "summary": "",
        "final_answer": "",
        "current_step": "initialized",
        "context_data": []
    })
    
    # Test advanced system
    print("\n🔬 ADVANCED SYSTEM:")
    print("-" * 30)
    advanced_result = advanced_multi_agent_system.invoke({
        "user_query": query,
        "messages": [],
        "plan": "",
        "summary": "",
        "final_answer": "",
        "current_step": "initialized",
        "context_data": [],
        "complexity": ""
    })
    
    # Compare results
    print(f"\n📈 COMPARISON:")
    print("-" * 30)
    print(f"Basic system messages: {len(basic_result['messages'])}")
    print(f"Advanced system messages: {len(advanced_result['messages'])}")
    print(f"Advanced system detected complexity: {advanced_result.get('complexity', 'N/A')}")
    
    return basic_result, advanced_result

# Example comparison
query_simple = "What is the capital of France?"
basic_result, advanced_result = compare_systems(query_simple)

## 🎯 Conclusion and Key Learnings

### What We Built:
✅ **Multi-Agent LLM System** with three specialized agents:
- **Planning Agent**: Creates structured plans for query handling
- **Summarizing Agent**: Processes information and extracts key points  
- **Answering Agent**: Generates comprehensive final answers

✅ **Communication Mechanisms**:
- **Message Passing**: Structured messages between agents with timestamps
- **Shared Memory**: AgentState that all agents can read from and write to

✅ **Advanced Features**:
- Conditional routing based on query complexity
- Performance analysis and metrics
- Interactive testing capabilities
- Error handling and robustness

### Key Benefits:
1. **Modularity**: Each agent has a clear, specific responsibility
2. **Scalability**: Easy to add new agents or modify workflows
3. **Traceability**: Complete message history for debugging and analysis
4. **Flexibility**: Conditional routing based on query characteristics

### Next Steps:
- Add more specialized agents (Research Agent, Validation Agent, etc.)
- Implement parallel processing for independent tasks
- Add external tool integration (web search, database queries)
- Implement memory persistence across sessions
- Add agent learning and adaptation capabilities

### Requirements Fulfilled:
✅ Multi-agent system with specialized subtasks  
✅ Message passing communication  
✅ Shared memory implementation  
✅ LangGraph orchestration  
✅ Comprehensive testing and analysis