<a href="https://colab.research.google.com/github/dimitarpg13/agentic_architectures_and_design_patterns/blob/main/notebooks/live_web_search/duckduckgo_langgraph_search_simple_ex1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Multi-Agent Workflow with Google ADK and Search Grounding

This notebook demonstrates a production-ready multi-agent system using Google's Agentic Development Kit (ADK) with Google Search for grounding responses in real-world data.

## Architecture Overview

- **Supervisor Agent**: Orchestrates the workflow and routes tasks
- **Research Agent**: Performs web searches and gathers information
- **Analysis Agent**: Analyzes and synthesizes research findings
- **Writer Agent**: Generates final outputs based on analyzed data

## Key Features

- Google Search integration for real-time grounding
- State management with LangGraph
- Error handling and retry logic
- Structured output validation

## 1. Environment Setup

In [None]:
# Install required packages
!pip install -q google-genai langgraph langchain-google-genai langchain-google-community python-dotenv

In [None]:
import os
from typing import Annotated, TypedDict, List, Dict, Any
from enum import Enum
import json
from datetime import datetime

# Google ADK imports
import google.generativeai as genai
from google.generativeai.types import Tool, FunctionDeclaration

# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# LangChain imports for Google Search
from langchain_google_community import GoogleSearchAPIWrapper
from langchain.tools import Tool as LangChainTool

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# Configure Google API
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID")  # Custom Search Engine ID

if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY environment variable not set")

genai.configure(api_key=GOOGLE_API_KEY)

## 2. Define Agent State

The shared state that all agents can access and modify.

In [None]:
class AgentState(TypedDict):
    """State shared across all agents in the workflow."""
    query: str
    messages: List[Dict[str, str]]
    search_results: List[Dict[str, Any]]
    analysis: str
    final_output: str
    current_agent: str
    next_agent: str
    iteration: int
    max_iterations: int
    errors: List[str]


class AgentRole(str, Enum):
    """Enumeration of agent roles."""
    SUPERVISOR = "supervisor"
    RESEARCHER = "researcher"
    ANALYST = "analyst"
    WRITER = "writer"
    END = "end"

## 3. Google Search Tool Setup

Configure Google Custom Search for grounding.

In [None]:
class GoogleSearchGrounding:
    """Google Search tool for grounding agent responses."""
    
    def __init__(self, api_key: str, cse_id: str, num_results: int = 5):
        self.api_key = api_key
        self.cse_id = cse_id
        self.num_results = num_results
        
        # Initialize Google Search wrapper
        self.search_wrapper = GoogleSearchAPIWrapper(
            google_api_key=api_key,
            google_cse_id=cse_id,
            k=num_results
        )
    
    def search(self, query: str) -> List[Dict[str, Any]]:
        """Execute search and return structured results."""
        try:
            raw_results = self.search_wrapper.results(query, num_results=self.num_results)
            
            structured_results = []
            for result in raw_results:
                structured_results.append({
                    "title": result.get("title", ""),
                    "snippet": result.get("snippet", ""),
                    "link": result.get("link", ""),
                    "timestamp": datetime.now().isoformat()
                })
            
            return structured_results
        
        except Exception as e:
            print(f"Search error: {str(e)}")
            return [{"error": str(e)}]
    
    def format_results_for_context(self, results: List[Dict[str, Any]]) -> str:
        """Format search results for LLM context."""
        if not results or "error" in results[0]:
            return "No search results available."
        
        formatted = "Search Results:\n\n"
        for i, result in enumerate(results, 1):
            formatted += f"{i}. {result['title']}\n"
            formatted += f"   {result['snippet']}\n"
            formatted += f"   Source: {result['link']}\n\n"
        
        return formatted


# Initialize search tool
search_tool = GoogleSearchGrounding(
    api_key=GOOGLE_API_KEY,
    cse_id=GOOGLE_CSE_ID or "demo-cse-id",  # Use demo ID if not set
    num_results=5
)

print("âœ“ Google Search tool initialized")

## 4. Define Individual Agents

Each agent has a specific role and uses Google's Gemini model.

In [None]:
class BaseAgent:
    """Base class for all agents."""
    
    def __init__(self, role: str, model_name: str = "gemini-1.5-pro"):
        self.role = role
        self.model = genai.GenerativeModel(model_name)
    
    def generate_response(self, prompt: str, temperature: float = 0.7) -> str:
        """Generate response using Gemini."""
        try:
            response = self.model.generate_content(
                prompt,
                generation_config=genai.types.GenerationConfig(
                    temperature=temperature,
                    max_output_tokens=2048,
                )
            )
            return response.text
        except Exception as e:
            return f"Error generating response: {str(e)}"


class SupervisorAgent(BaseAgent):
    """Orchestrates the multi-agent workflow."""
    
    def __init__(self):
        super().__init__(AgentRole.SUPERVISOR)
    
    def route(self, state: AgentState) -> AgentState:
        """Determine next agent based on current state."""
        
        # Check iteration limit
        if state["iteration"] >= state["max_iterations"]:
            state["next_agent"] = AgentRole.END
            return state
        
        # Route based on current progress
        if not state["search_results"]:
            state["next_agent"] = AgentRole.RESEARCHER
        elif not state["analysis"]:
            state["next_agent"] = AgentRole.ANALYST
        elif not state["final_output"]:
            state["next_agent"] = AgentRole.WRITER
        else:
            state["next_agent"] = AgentRole.END
        
        state["current_agent"] = AgentRole.SUPERVISOR
        state["iteration"] += 1
        
        return state


class ResearchAgent(BaseAgent):
    """Performs web searches and gathers information."""
    
    def __init__(self, search_tool: GoogleSearchGrounding):
        super().__init__(AgentRole.RESEARCHER)
        self.search_tool = search_tool
    
    def research(self, state: AgentState) -> AgentState:
        """Execute search queries and collect results."""
        
        query = state["query"]
        
        # Generate search query refinement using LLM
        refinement_prompt = f"""
You are a research agent. Given this user query: "{query}"

Generate 2-3 specific search queries that will help gather comprehensive information.
Format your response as a JSON array of strings.

Example: ["query 1", "query 2", "query 3"]
"""
        
        try:
            refined_queries_text = self.generate_response(refinement_prompt, temperature=0.3)
            # Parse JSON from response
            refined_queries = json.loads(refined_queries_text)
        except:
            # Fallback to original query
            refined_queries = [query]
        
        # Execute searches
        all_results = []
        for search_query in refined_queries:
            results = self.search_tool.search(search_query)
            all_results.extend(results)
        
        state["search_results"] = all_results
        state["current_agent"] = AgentRole.RESEARCHER
        state["messages"].append({
            "agent": AgentRole.RESEARCHER,
            "action": "search_completed",
            "num_results": len(all_results)
        })
        
        return state


class AnalysisAgent(BaseAgent):
    """Analyzes and synthesizes research findings."""
    
    def __init__(self, search_tool: GoogleSearchGrounding):
        super().__init__(AgentRole.ANALYST)
        self.search_tool = search_tool
    
    def analyze(self, state: AgentState) -> AgentState:
        """Analyze search results and extract insights."""
        
        query = state["query"]
        search_context = self.search_tool.format_results_for_context(state["search_results"])
        
        analysis_prompt = f"""
You are an analysis agent. Your task is to analyze search results and extract key insights.

User Query: {query}

{search_context}

Provide a comprehensive analysis that:
1. Identifies key themes and patterns
2. Highlights important facts and statistics
3. Notes any contradictions or uncertainties
4. Synthesizes information from multiple sources

Format your analysis in clear, structured paragraphs.
"""
        
        analysis = self.generate_response(analysis_prompt, temperature=0.5)
        
        state["analysis"] = analysis
        state["current_agent"] = AgentRole.ANALYST
        state["messages"].append({
            "agent": AgentRole.ANALYST,
            "action": "analysis_completed",
            "analysis_length": len(analysis)
        })
        
        return state


class WriterAgent(BaseAgent):
    """Generates final outputs based on analyzed data."""
    
    def __init__(self):
        super().__init__(AgentRole.WRITER)
    
    def write(self, state: AgentState) -> AgentState:
        """Generate final response based on analysis."""
        
        query = state["query"]
        analysis = state["analysis"]
        
        writing_prompt = f"""
You are a writer agent. Create a comprehensive, well-structured response to the user's query.

User Query: {query}

Analysis:
{analysis}

Generate a response that:
1. Directly answers the user's query
2. Is grounded in the analyzed information
3. Is clear, accurate, and well-organized
4. Cites sources where appropriate
5. Acknowledges limitations or uncertainties

Write in a professional yet accessible tone.
"""
        
        final_output = self.generate_response(writing_prompt, temperature=0.6)
        
        state["final_output"] = final_output
        state["current_agent"] = AgentRole.WRITER
        state["messages"].append({
            "agent": AgentRole.WRITER,
            "action": "writing_completed",
            "output_length": len(final_output)
        })
        
        return state


print("âœ“ All agents defined")

## 5. Build Multi-Agent Workflow with LangGraph

Orchestrate agents using a state graph.

In [None]:
class MultiAgentWorkflow:
    """Multi-agent system orchestrator."""
    
    def __init__(self, search_tool: GoogleSearchGrounding):
        self.search_tool = search_tool
        
        # Initialize agents
        self.supervisor = SupervisorAgent()
        self.researcher = ResearchAgent(search_tool)
        self.analyst = AnalysisAgent(search_tool)
        self.writer = WriterAgent()
        
        # Build workflow graph
        self.graph = self._build_graph()
    
    def _build_graph(self) -> StateGraph:
        """Construct the agent workflow graph."""
        
        workflow = StateGraph(AgentState)
        
        # Add nodes
        workflow.add_node(AgentRole.SUPERVISOR, self.supervisor.route)
        workflow.add_node(AgentRole.RESEARCHER, self.researcher.research)
        workflow.add_node(AgentRole.ANALYST, self.analyst.analyze)
        workflow.add_node(AgentRole.WRITER, self.writer.write)
        
        # Define edges
        workflow.set_entry_point(AgentRole.SUPERVISOR)
        
        # Conditional routing from supervisor
        def route_from_supervisor(state: AgentState) -> str:
            return state["next_agent"]
        
        workflow.add_conditional_edges(
            AgentRole.SUPERVISOR,
            route_from_supervisor,
            {
                AgentRole.RESEARCHER: AgentRole.RESEARCHER,
                AgentRole.ANALYST: AgentRole.ANALYST,
                AgentRole.WRITER: AgentRole.WRITER,
                AgentRole.END: END
            }
        )
        
        # All agents return to supervisor
        workflow.add_edge(AgentRole.RESEARCHER, AgentRole.SUPERVISOR)
        workflow.add_edge(AgentRole.ANALYST, AgentRole.SUPERVISOR)
        workflow.add_edge(AgentRole.WRITER, AgentRole.SUPERVISOR)
        
        return workflow.compile()
    
    def run(self, query: str, max_iterations: int = 5) -> Dict[str, Any]:
        """Execute the multi-agent workflow."""
        
        # Initialize state
        initial_state: AgentState = {
            "query": query,
            "messages": [],
            "search_results": [],
            "analysis": "",
            "final_output": "",
            "current_agent": "",
            "next_agent": "",
            "iteration": 0,
            "max_iterations": max_iterations,
            "errors": []
        }
        
        # Execute workflow
        try:
            final_state = self.graph.invoke(initial_state)
            return final_state
        except Exception as e:
            print(f"Workflow error: {str(e)}")
            initial_state["errors"].append(str(e))
            return initial_state
    
    def visualize_workflow(self):
        """Generate workflow visualization."""
        try:
            from IPython.display import Image, display
            display(Image(self.graph.get_graph().draw_mermaid_png()))
        except Exception as e:
            print(f"Visualization not available: {str(e)}")


# Initialize workflow
workflow = MultiAgentWorkflow(search_tool)
print("âœ“ Multi-agent workflow initialized")

## 6. Visualize Workflow (Optional)

In [None]:
# Visualize the agent workflow graph
workflow.visualize_workflow()

## 7. Execute Multi-Agent Workflow

Run the workflow with a sample query.

In [None]:
# Example query
query = "What are the latest developments in quantum computing in 2025?"

print(f"Query: {query}\n")
print("="*80)
print("Starting multi-agent workflow...\n")

# Run workflow
result = workflow.run(query, max_iterations=5)

# Display results
print("\n" + "="*80)
print("WORKFLOW EXECUTION SUMMARY")
print("="*80)

print(f"\nTotal iterations: {result['iteration']}")
print(f"Number of search results: {len(result['search_results'])}")
print(f"\nAgent execution order:")
for msg in result['messages']:
    print(f"  - {msg['agent']}: {msg['action']}")

if result['errors']:
    print(f"\nErrors encountered: {result['errors']}")

## 8. Display Detailed Results

In [None]:
print("="*80)
print("SEARCH RESULTS (Grounding Data)")
print("="*80)

for i, result in enumerate(result['search_results'][:5], 1):  # Show first 5
    if 'error' not in result:
        print(f"\n{i}. {result['title']}")
        print(f"   {result['snippet']}")
        print(f"   Source: {result['link']}")

print("\n" + "="*80)
print("ANALYSIS")
print("="*80)
print(f"\n{result['analysis']}")

print("\n" + "="*80)
print("FINAL OUTPUT")
print("="*80)
print(f"\n{result['final_output']}")

## 9. Advanced Features: Custom Tool Integration

In [None]:
# Define custom function for Gemini function calling
search_function = FunctionDeclaration(
    name="google_search",
    description="Search Google for current information to ground responses in real-world data",
    parameters={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "The search query"
            },
            "num_results": {
                "type": "integer",
                "description": "Number of results to return",
                "default": 5
            }
        },
        "required": ["query"]
    }
)

# Create tool with function
search_tool_for_gemini = Tool(function_declarations=[search_function])

# Example: Using function calling with Gemini
model_with_tools = genai.GenerativeModel(
    "gemini-1.5-pro",
    tools=[search_tool_for_gemini]
)

print("âœ“ Custom tool integration configured")

## 10. Interactive Demo

In [None]:
def run_interactive_query(query: str):
    """Run workflow and display results in a formatted way."""
    
    print(f"\n{'='*80}")
    print(f"Processing Query: {query}")
    print(f"{'='*80}\n")
    
    result = workflow.run(query)
    
    # Display in sections
    print("\nðŸ“Š WORKFLOW METRICS")
    print(f"   Iterations: {result['iteration']}")
    print(f"   Search Results: {len(result['search_results'])}")
    print(f"   Analysis Length: {len(result['analysis'])} chars")
    print(f"   Output Length: {len(result['final_output'])} chars")
    
    print("\nðŸŽ¯ FINAL ANSWER")
    print("-" * 80)
    print(result['final_output'])
    print("-" * 80)
    
    return result


# Try different queries
test_queries = [
    "What are the latest AI safety developments?",
    "How is climate change affecting ocean temperatures?",
    "What are the recent breakthroughs in fusion energy?"
]

# Uncomment to run
# for query in test_queries:
#     result = run_interactive_query(query)
#     print("\n" + "="*80 + "\n")

## 11. Performance Monitoring

In [None]:
import time
from typing import List, Tuple

def benchmark_workflow(queries: List[str]) -> List[Tuple[str, float, Dict]]:
    """Benchmark workflow performance across multiple queries."""
    
    results = []
    
    for query in queries:
        start_time = time.time()
        result = workflow.run(query)
        elapsed_time = time.time() - start_time
        
        results.append((query, elapsed_time, result))
        
        print(f"Query: {query[:50]}...")
        print(f"Time: {elapsed_time:.2f}s")
        print(f"Success: {bool(result['final_output'])}\n")
    
    return results


# Example benchmark
# benchmark_queries = [
#     "Latest in quantum computing",
#     "Current state of renewable energy",
#     "Advances in gene therapy"
# ]
# benchmark_results = benchmark_workflow(benchmark_queries)

## 12. Error Handling & Retry Logic

In [None]:
from functools import wraps
import time

def retry_with_backoff(max_retries: int = 3, backoff_factor: float = 2.0):
    """Decorator for retrying failed operations with exponential backoff."""
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    if retries >= max_retries:
                        raise
                    wait_time = backoff_factor ** retries
                    print(f"Retry {retries}/{max_retries} after {wait_time}s delay: {str(e)}")
                    time.sleep(wait_time)
            return None
        return wrapper
    return decorator


class RobustMultiAgentWorkflow(MultiAgentWorkflow):
    """Enhanced workflow with error handling."""
    
    @retry_with_backoff(max_retries=3)
    def run(self, query: str, max_iterations: int = 5) -> Dict[str, Any]:
        """Execute workflow with retry logic."""
        return super().run(query, max_iterations)


# Create robust workflow
robust_workflow = RobustMultiAgentWorkflow(search_tool)
print("âœ“ Robust workflow with error handling initialized")

## 13. Save Results

In [None]:
import json
from datetime import datetime

def save_workflow_results(result: Dict[str, Any], filename: str = None):
    """Save workflow results to JSON file."""
    
    if filename is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"workflow_results_{timestamp}.json"
    
    # Create exportable version
    export_data = {
        "query": result["query"],
        "timestamp": datetime.now().isoformat(),
        "iterations": result["iteration"],
        "search_results_count": len(result["search_results"]),
        "search_results": result["search_results"],
        "analysis": result["analysis"],
        "final_output": result["final_output"],
        "execution_log": result["messages"],
        "errors": result["errors"]
    }
    
    with open(filename, 'w') as f:
        json.dump(export_data, f, indent=2)
    
    print(f"âœ“ Results saved to {filename}")
    return filename


# Example usage
# if result and result['final_output']:
#     save_workflow_results(result)

## 14. Summary & Next Steps

### What We've Built

1. **Multi-Agent System**: Supervisor, Researcher, Analyst, and Writer agents
2. **Google Search Grounding**: Real-time search integration for up-to-date information
3. **LangGraph Orchestration**: State management and workflow control
4. **Error Handling**: Retry logic and robust error handling
5. **Production Features**: Logging, monitoring, and result persistence

### Key Features

- **Grounding**: All responses are grounded in real search results
- **Multi-Step Reasoning**: Research â†’ Analysis â†’ Writing pipeline
- **Scalable**: Easy to add new agents or modify workflow
- **Observable**: Comprehensive logging and state tracking

### Customization Options

1. **Add Agents**: Create specialized agents (fact-checker, summarizer, etc.)
2. **Custom Tools**: Integrate additional APIs or data sources
3. **Routing Logic**: Implement more sophisticated agent routing
4. **Memory**: Add conversation memory or long-term storage
5. **Parallel Execution**: Run multiple agents concurrently

### Environment Variables Required

Create a `.env` file with:
```
GOOGLE_API_KEY=your_google_api_key
GOOGLE_CSE_ID=your_custom_search_engine_id
```

### Resources

- [Google ADK Documentation](https://ai.google.dev/)
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
- [Google Custom Search API](https://developers.google.com/custom-search/v1/overview)

In [None]:
# Final demo
print("Multi-Agent Workflow with Google ADK and Search Grounding")
print("Ready to process queries!\n")
print("Example usage:")
print("  result = workflow.run('Your query here')")
print("  print(result['final_output'])")