In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
pip install google-adk

Note: you may need to restart the kernel to use updated packages.


In [3]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
secret_value_0 = user_secrets.get_secret("GOOGLE_API_KEY")


In [4]:

import json
import uuid
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, asdict, field
from enum import Enum
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from collections import defaultdict
import logging


In [5]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


In [6]:
class AgentType(Enum):
    CONCIERGE = "concierge"
    SUSTAINABLE = "sustainable"
    CORPORATE = "corporate"

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class ToolCategory(Enum):
    BOOKING = "booking"
    ANALYSIS = "analysis"
    OPTIMIZATION = "optimization"
    REPORTING = "reporting"

In [7]:
@dataclass
class Tool:
    """Tool Definition"""
    tool_id: str
    name: str
    description: str
    category: str
    handler: Optional[Callable] = None

@dataclass
class Task:
    """Task Definition"""
    task_id: str
    task_type: str
    status: TaskStatus
    parameters: Dict[str, Any] = field(default_factory=dict)
    result: Optional[Dict] = None
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())

@dataclass
class ExecutionResult:
    """Execution Result"""
    success: bool
    result: Dict[str, Any]
    error: Optional[str] = None
    execution_time: float = 0.0

@dataclass
class AgentMetrics:
    """Agent Performance Metrics"""
    agent_id: str
    tasks_completed: int = 0
    tasks_failed: int = 0
    total_execution_time: float = 0.0
    success_rate: float = 0.0

In [8]:
class ToolRegistry:
    """Central Tool Registry - MCP Server Pattern"""
    
    def __init__(self, server_id: str):
        self.server_id = server_id
        self.tools: Dict[str, Tool] = {}
        self.execution_history: List[Dict] = []
        logger.info(f"Tool Registry initialized: {server_id}")
    
    def register_tool(self, tool: Tool) -> bool:
        """Register a tool"""
        if tool.tool_id in self.tools:
            logger.warning(f"Tool {tool.tool_id} already exists")
            return False
        
        self.tools[tool.tool_id] = tool
        logger.info(f"Tool registered: {tool.name}")
        return True
    
    def get_tool(self, tool_id: str) -> Optional[Tool]:
        """Get tool by ID"""
        return self.tools.get(tool_id)
    
    def get_all_tools(self) -> List[Tool]:
        """Get all tools"""
        return list(self.tools.values())
    
    def execute_tool(self, tool_id: str, **kwargs) -> ExecutionResult:
        """Execute a tool"""
        start_time = time.time()
        tool = self.get_tool(tool_id)
        
        if not tool:
            return ExecutionResult(
                success=False,
                result={},
                error=f"Tool {tool_id} not found"
            )
        
        try:
            if tool.handler:
                result = tool.handler(**kwargs)
            else:
                result = {"status": "executed", "tool": tool.name}
            
            execution_time = time.time() - start_time
            
            # Log execution
            self.execution_history.append({
                "tool_id": tool_id,
                "timestamp": datetime.now().isoformat(),
                "success": True,
                "execution_time": execution_time
            })
            
            logger.info(f"Tool executed: {tool.name} ({execution_time:.3f}s)")
            
            return ExecutionResult(
                success=True,
                result=result,
                execution_time=execution_time
            )
        
        except Exception as e:
            logger.error(f"Tool execution failed: {str(e)}")
            return ExecutionResult(
                success=False,
                result={},
                error=str(e)
            )
    
    def get_stats(self) -> Dict:
        """Get registry statistics"""
        return {
            "server_id": self.server_id,
            "total_tools": len(self.tools),
            "tools": [t.name for t in self.tools.values()],
            "executions": len(self.execution_history)
        }


In [9]:
class Agent(ABC):
    """Base Agent Class"""
    
    def __init__(self, agent_id: str, name: str, agent_type: AgentType):
        self.agent_id = agent_id
        self.name = name
        self.agent_type = agent_type
        
        # Initialize components
        self.tool_registry = ToolRegistry(f"registry_{agent_id}")
        self.tasks: Dict[str, Task] = {}
        self.completed_tasks: List[str] = []
        self.failed_tasks: List[str] = []
        
        # Metrics
        self.metrics = AgentMetrics(agent_id)
        self.activity_log: List[str] = []
        
        logger.info(f"Agent initialized: {name} ({agent_type.value})")
    
    @abstractmethod
    def process_task(self, task: Task) -> Dict[str, Any]:
        """Process task - implemented by subclasses"""
        pass
    
    def add_task(self, task: Task) -> bool:
        """Add task to agent"""
        self.tasks[task.task_id] = task
        self.activity_log.append(f"Task added: {task.task_id}")
        return True
    
    def execute_task(self, task: Task) -> ExecutionResult:
        """Execute a task"""
        try:
            task.status = TaskStatus.RUNNING
            start_time = time.time()
            
            result = self.process_task(task)
            
            execution_time = time.time() - start_time
            task.status = TaskStatus.COMPLETED
            task.result = result
            
            # Update metrics
            self.metrics.tasks_completed += 1
            self.metrics.total_execution_time += execution_time
            self.completed_tasks.append(task.task_id)
            
            if (self.metrics.tasks_completed + self.metrics.tasks_failed) > 0:
                self.metrics.success_rate = (
                    self.metrics.tasks_completed / 
                    (self.metrics.tasks_completed + self.metrics.tasks_failed) * 100
                )
            
            self.activity_log.append(f"Task completed: {task.task_id}")
            logger.info(f"Task completed: {task.task_id} by {self.name}")
            
            return ExecutionResult(
                success=True,
                result=result,
                execution_time=execution_time
            )
        
        except Exception as e:
            task.status = TaskStatus.FAILED
            self.metrics.tasks_failed += 1
            self.failed_tasks.append(task.task_id)
            
            self.activity_log.append(f"Task failed: {task.task_id} - {str(e)}")
            logger.error(f"Task failed: {str(e)}")
            
            return ExecutionResult(
                success=False,
                result={},
                error=str(e)
            )
    
    def get_status(self) -> Dict:
        """Get agent status"""
        return {
            "agent_id": self.agent_id,
            "name": self.name,
            "type": self.agent_type.value,
            "tasks_completed": self.metrics.tasks_completed,
            "tasks_failed": self.metrics.tasks_failed,
            "success_rate": f"{self.metrics.success_rate:.2f}%",
            "total_tools": len(self.tool_registry.tools),
            "activity_log": self.activity_log[-5:]  # Last 5 activities
        }



In [10]:
class ConciergeAgent(Agent):
    """Personal Travel Concierge Agent"""
    
    def __init__(self, agent_id: str = "agent_001"):
        super().__init__(agent_id, "ConciergeAgent", AgentType.CONCIERGE)
        self._setup_tools()
    
    def _setup_tools(self):
        """Setup concierge tools"""
        tools = [
            Tool("search_flights", "Search Flights", "Search available flights", "booking", self._search_flights),
            Tool("book_flight", "Book Flight", "Book a flight", "booking", self._book_flight),
            Tool("book_hotel", "Book Hotel", "Book accommodation", "booking", self._book_hotel),
            Tool("create_itinerary", "Create Itinerary", "Plan travel itinerary", "booking", self._create_itinerary),
        ]
        
        for tool in tools:
            self.tool_registry.register_tool(tool)
    
    def process_task(self, task: Task) -> Dict[str, Any]:
        """Process concierge tasks"""
        task_type = task.task_type.lower()
        
        if task_type == "search_flights":
            return {
                "success": True,
                "flights_found": 12,
                "airlines": ["Air India", "Emirates", "Lufthansa"],
                "price_range": "$300-$2000"
            }
        
        elif task_type == "book_flight":
            return {
                "success": True,
                "booking_id": f"FLY{str(uuid.uuid4())[:8].upper()}",
                "status": "confirmed",
                "confirmation_sent": True
            }
        
        elif task_type == "book_hotel":
            return {
                "success": True,
                "booking_id": f"HTL{str(uuid.uuid4())[:8].upper()}",
                "hotel_name": "Grand Hotel",
                "status": "confirmed"
            }
        
        elif task_type == "plan_trip":
            return {
                "success": True,
                "itinerary_id": f"IT{str(uuid.uuid4())[:8].upper()}",
                "days": 7,
                "activities": 8
            }
        
        return {"success": False, "error": f"Unknown task: {task_type}"}
    
    def _search_flights(self, **kwargs) -> Dict:
        return {"flights": 10, "best_price": "$450"}
    
    def _book_flight(self, **kwargs) -> Dict:
        return {"booking": "success"}
    
    def _book_hotel(self, **kwargs) -> Dict:
        return {"hotel": "booked"}
    
    def _create_itinerary(self, **kwargs) -> Dict:
        return {"itinerary": "created"}

class SustainableAgent(Agent):
    """Sustainable Travel Agent"""
    
    def __init__(self, agent_id: str = "agent_002"):
        super().__init__(agent_id, "SustainableAgent", AgentType.SUSTAINABLE)
        self._setup_tools()
    
    def _setup_tools(self):
        """Setup sustainability tools"""
        tools = [
            Tool("calc_carbon", "Calculate Carbon", "Calculate carbon footprint", "analysis", self._calculate_carbon),
            Tool("eco_options", "Eco Options", "Find eco-friendly options", "analysis", self._eco_options),
            Tool("carbon_offset", "Carbon Offset", "Purchase carbon offsets", "analysis", self._carbon_offset),
            Tool("impact_report", "Impact Report", "Generate impact report", "reporting", self._impact_report),
        ]
        
        for tool in tools:
            self.tool_registry.register_tool(tool)
    
    def process_task(self, task: Task) -> Dict[str, Any]:
        """Process sustainability tasks"""
        task_type = task.task_type.lower()
        
        if task_type == "calculate_carbon":
            params = task.parameters
            distance = params.get("distance", 1000)
            carbon = distance * 0.1
            return {
                "success": True,
                "carbon_kg": round(carbon, 2),
                "trees_equivalent": round(carbon / 0.025)
            }
        
        elif task_type == "find_eco_options":
            return {
                "success": True,
                "eco_options": 5,
                "savings_percent": 35,
                "recommendation": "Use train or bus"
            }
        
        elif task_type == "offset_carbon":
            return {
                "success": True,
                "offset_id": f"OFF{str(uuid.uuid4())[:8].upper()}",
                "trees_planted": 15
            }
        
        elif task_type == "community_impact":
            return {
                "success": True,
                "travelers_helped": 250,
                "carbon_saved_kg": 1500
            }
        
        return {"success": False, "error": f"Unknown task: {task_type}"}
    
    def _calculate_carbon(self, **kwargs) -> Dict:
        return {"carbon": 150.5}
    
    def _eco_options(self, **kwargs) -> Dict:
        return {"options": 3}
    
    def _carbon_offset(self, **kwargs) -> Dict:
        return {"offset": "purchased"}
    
    def _impact_report(self, **kwargs) -> Dict:
        return {"report": "generated"}

class CorporateAgent(Agent):
    """Corporate Travel Management Agent"""
    
    def __init__(self, agent_id: str = "agent_003"):
        super().__init__(agent_id, "CorporateAgent", AgentType.CORPORATE)
        self._setup_tools()
    
    def _setup_tools(self):
        """Setup corporate tools"""
        tools = [
            Tool("book_employee", "Book Employee", "Book employee travel", "booking", self._book_employee),
            Tool("optimize_costs", "Optimize Costs", "Reduce travel costs", "optimization", self._optimize_costs),
            Tool("compliance_check", "Compliance", "Check policy compliance", "analysis", self._compliance_check),
            Tool("travel_report", "Report", "Generate analytics", "reporting", self._travel_report),
        ]
        
        for tool in tools:
            self.tool_registry.register_tool(tool)
    
    def process_task(self, task: Task) -> Dict[str, Any]:
        """Process corporate tasks"""
        task_type = task.task_type.lower()
        
        if task_type == "book_employee":
            return {
                "success": True,
                "booking_id": f"CORP{str(uuid.uuid4())[:8].upper()}",
                "cost": 1250.50,
                "policy_compliant": True
            }
        
        elif task_type == "optimize_costs":
            return {
                "success": True,
                "savings_percent": 18,
                "amount_saved": 5000
            }
        
        elif task_type == "verify_policy":
            return {
                "success": True,
                "compliant": True,
                "score": 92
            }
        
        elif task_type == "generate_report":
            return {
                "success": True,
                "total_bookings": 150,
                "total_cost": 185000,
                "cost_per_employee": 1233.33
            }
        
        return {"success": False, "error": f"Unknown task: {task_type}"}
    
    def _book_employee(self, **kwargs) -> Dict:
        return {"booking": "confirmed"}
    
    def _optimize_costs(self, **kwargs) -> Dict:
        return {"savings": "20%"}
    
    def _compliance_check(self, **kwargs) -> Dict:
        return {"compliant": True}
    
    def _travel_report(self, **kwargs) -> Dict:
        return {"report": "ready"}


In [11]:
class WorkflowEngine:
    """Execute multi-agent workflows"""
    
    def __init__(self):
        self.workflows: Dict[str, Dict] = {}
        self.execution_history: List[Dict] = []
        logger.info("Workflow Engine initialized")
    
    def define_workflow(self, workflow_id: str, name: str, steps: List[Dict]) -> Dict:
        """Define a workflow"""
        workflow = {
            "workflow_id": workflow_id,
            "name": name,
            "steps": steps,
            "created_at": datetime.now().isoformat()
        }
        self.workflows[workflow_id] = workflow
        logger.info(f"Workflow defined: {name}")
        return workflow
    
    def execute_workflow(self, workflow_id: str, agents: Dict[str, Agent]) -> Dict:
        """Execute a workflow"""
        if workflow_id not in self.workflows:
            return {"success": False, "error": "Workflow not found"}
        
        workflow = self.workflows[workflow_id]
        execution = {
            "workflow_id": workflow_id,
            "started_at": datetime.now().isoformat(),
            "steps": []
        }
        
        # Execute each step
        for step in workflow["steps"]:
            agent_id = step.get("agent_id")
            task_type = step.get("task_type")
            parameters = step.get("parameters", {})
            
            if agent_id not in agents:
                execution["steps"].append({
                    "agent_id": agent_id,
                    "status": "failed",
                    "error": "Agent not found"
                })
                continue
            
            agent = agents[agent_id]
            task = Task(
                task_id=str(uuid.uuid4()),
                task_type=task_type,
                status=TaskStatus.PENDING,
                parameters=parameters
            )
            
            result = agent.execute_task(task)
            
            execution["steps"].append({
                "agent_id": agent_id,
                "task_type": task_type,
                "status": "completed" if result.success else "failed",
                "result": result.result,
                "execution_time": result.execution_time
            })
        
        execution["completed_at"] = datetime.now().isoformat()
        self.execution_history.append(execution)
        
        return execution
    
    def get_stats(self) -> Dict:
        """Get workflow engine stats"""
        return {
            "workflows_defined": len(self.workflows),
            "workflows_executed": len(self.execution_history),
            "workflows": list(self.workflows.keys())
        }


In [12]:
class SystemOrchestrator:
    """Orchestrate entire system"""
    
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.workflow_engine = WorkflowEngine()
        logger.info("System Orchestrator initialized")
    
    def register_agent(self, agent: Agent) -> bool:
        """Register agent"""
        self.agents[agent.agent_id] = agent
        logger.info(f"Agent registered: {agent.name}")
        return True
    
    def get_system_report(self) -> Dict:
        """Get system report"""
        agents_status = [agent.get_status() for agent in self.agents.values()]
        
        return {
            "timestamp": datetime.now().isoformat(),
            "total_agents": len(self.agents),
            "agents": agents_status,
            "workflows": self.workflow_engine.get_stats()
        }

In [13]:
def main():
    """Main execution"""
    print("\n" + "="*80)
    print("ADK - Agent Development Kit")
    print("TravelAI Multi-Agent System - FULLY WORKING")
    print("="*80 + "\n")
    
    # Initialize system
    orchestrator = SystemOrchestrator()
    
    # Create agents
    print("ðŸ“Œ Step 1: Creating Agents\n")
    concierge = ConciergeAgent("agent_001")
    sustainable = SustainableAgent("agent_002")
    corporate = CorporateAgent("agent_003")
    
    orchestrator.register_agent(concierge)
    orchestrator.register_agent(sustainable)
    orchestrator.register_agent(corporate)
    
    print("âœ“ 3 Agents created and registered\n")
    
    # Define workflow
    print("ðŸ“Œ Step 2: Defining Workflows\n")
    
    workflow_steps = [
        {"agent_id": "agent_001", "task_type": "search_flights", "parameters": {"from": "NYC", "to": "LON"}},
        {"agent_id": "agent_001", "task_type": "book_flight", "parameters": {"flight_id": "AF101"}},
        {"agent_id": "agent_002", "task_type": "calculate_carbon", "parameters": {"distance": 5500}},
        {"agent_id": "agent_003", "task_type": "book_employee", "parameters": {"employee_id": "EMP001"}},
    ]
    
    orchestrator.workflow_engine.define_workflow("workflow_001", "Complete Travel Booking", workflow_steps)
    print("âœ“ Workflow defined with 4 steps\n")
    
    # Execute workflow
    print("ðŸ“Œ Step 3: Executing Workflow\n")
    result = orchestrator.workflow_engine.execute_workflow("workflow_001", orchestrator.agents)
    
    print("\n" + "="*80)
    print("WORKFLOW EXECUTION RESULTS")
    print("="*80 + "\n")
    print(json.dumps(result, indent=2))
    
    # System report
    print("\n" + "="*80)
    print("SYSTEM REPORT")
    print("="*80 + "\n")
    report = orchestrator.get_system_report()
    print(json.dumps(report, indent=2))
    
    print("\n" + "="*80)
    print("âœ“ ADK SYSTEM EXECUTION COMPLETED SUCCESSFULLY!")
    print("="*80 + "\n")

if __name__ == "__main__":
    main()

2025-11-16 06:34:52,580 - INFO - Workflow Engine initialized
2025-11-16 06:34:52,581 - INFO - System Orchestrator initialized
2025-11-16 06:34:52,583 - INFO - Tool Registry initialized: registry_agent_001
2025-11-16 06:34:52,584 - INFO - Agent initialized: ConciergeAgent (concierge)
2025-11-16 06:34:52,587 - INFO - Tool registered: Search Flights
2025-11-16 06:34:52,588 - INFO - Tool registered: Book Flight
2025-11-16 06:34:52,588 - INFO - Tool registered: Book Hotel
2025-11-16 06:34:52,590 - INFO - Tool registered: Create Itinerary
2025-11-16 06:34:52,591 - INFO - Tool Registry initialized: registry_agent_002
2025-11-16 06:34:52,593 - INFO - Agent initialized: SustainableAgent (sustainable)
2025-11-16 06:34:52,594 - INFO - Tool registered: Calculate Carbon
2025-11-16 06:34:52,595 - INFO - Tool registered: Eco Options
2025-11-16 06:34:52,596 - INFO - Tool registered: Carbon Offset
2025-11-16 06:34:52,598 - INFO - Tool registered: Impact Report
2025-11-16 06:34:52,598 - INFO - Tool Regi


ADK - Agent Development Kit
TravelAI Multi-Agent System - FULLY WORKING

ðŸ“Œ Step 1: Creating Agents

âœ“ 3 Agents created and registered

ðŸ“Œ Step 2: Defining Workflows

âœ“ Workflow defined with 4 steps

ðŸ“Œ Step 3: Executing Workflow


WORKFLOW EXECUTION RESULTS

{
  "workflow_id": "workflow_001",
  "started_at": "2025-11-16T06:34:52.608741",
  "steps": [
    {
      "agent_id": "agent_001",
      "task_type": "search_flights",
      "status": "completed",
      "result": {
        "success": true,
        "flights_found": 12,
        "airlines": [
          "Air India",
          "Emirates",
          "Lufthansa"
        ],
        "price_range": "$300-$2000"
      },
      "execution_time": 4.76837158203125e-06
    },
    {
      "agent_id": "agent_001",
      "task_type": "book_flight",
      "status": "completed",
      "result": {
        "success": true,
        "booking_id": "FLY8A5FD13C",
        "status": "confirmed",
        "confirmation_sent": true
      },
      "ex