### Libraries

In [1]:
import os
import warnings

os.chdir("..")
warnings.filterwarnings("ignore")

In [2]:
import asyncio

from datetime import datetime
from typing import List, Dict, Optional
import random

from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types

### Define tools

In [None]:
class Flight:
    """Represents a flight with all relevant details"""
    
    def __init__(self, flight_id: str, airline: str, source: str, destination: str, 
                 departure_time: datetime, arrival_time: datetime, price: float):
        self.flight_id = flight_id
        self.airline = airline
        self.source = source
        self.destination = destination
        self.departure_time = departure_time
        self.arrival_time = arrival_time
        self.price = price
    
    def __str__(self) -> str:
        return (f"Flight {self.flight_id}: {self.airline} - {self.source} to {self.destination} - "
                f"Departure: {self.departure_time.strftime('%Y-%m-%d %H:%M')} - "
                f"Arrival: {self.arrival_time.strftime('%Y-%m-%d %H:%M')} - "
                f"Price: ${self.price:.2f}")


class FlightSearchEngine:
    """Search engine to find available flights based on source, destination and date"""
    
    def __init__(self):
        # In a real application, this would connect to a database or API
        self.flights_database = []
        self._populate_dummy_data()
    
    def _populate_dummy_data(self):
        """Populate with some dummy flight data for demonstration"""
        airlines = ["Delta", "United", "American", "Southwest", "JetBlue"]
        cities = ["New York", "Los Angeles", "Chicago", "Miami", "Seattle", "Boston", "Dallas"]
        
        # Generate 100 random flights between cities
        for i in range(1, 101):
            source = random.choice(cities)
            # Make sure destination is different from source
            destination_options = [city for city in cities if city != source]
            destination = random.choice(destination_options)
            
            # Create random dates within the next 30 days
            day = random.randint(1, 30)
            hour_depart = random.randint(6, 20)
            flight_duration = random.randint(1, 8)  # Flight duration in hours
            
            departure_time = datetime(2025, 5, day, hour_depart, random.choice([0, 15, 30, 45]))
            arrival_time = datetime(2025, 5, day, hour_depart + flight_duration, 
                                    random.choice([0, 15, 30, 45]))
            
            # Adjust arrival time if it goes past midnight
            if arrival_time.hour >= 24:
                arrival_time = datetime(2025, 5, day + 1, arrival_time.hour - 24, 
                                        arrival_time.minute)
            
            price = round(random.uniform(150, 1500), 2)
            
            flight = Flight(
                flight_id=f"FL{i:03d}",
                airline=random.choice(airlines),
                source=source,
                destination=destination,
                departure_time=departure_time,
                arrival_time=arrival_time,
                price=price
            )
            
            self.flights_database.append(flight)
    
    def search_flights(self, source: str, destination: str, date: datetime) -> List[Flight]:
        """
        Search for flights based on source, destination and date
        
        Args:
            source: Departure city
            destination: Arrival city
            date: Date of travel (year-month-day)
            
        Returns:
            List of Flight objects matching the criteria
        """
        results = []
        
        for flight in self.flights_database:
            if (flight.source.lower() == source.lower() and 
                flight.destination.lower() == destination.lower() and
                flight.departure_time.date() == date.date()):
                results.append(flight)
        
        # Sort results by price (lowest first)
        results.sort(key=lambda x: x.price)
        return results


class Hotel:
    """Represents a hotel with all relevant details"""
    
    def __init__(self, hotel_id: str, name: str, city: str, address: str, 
                 rating: float, amenities: List[str], price_per_night: float):
        self.hotel_id = hotel_id
        self.name = name
        self.city = city
        self.address = address
        self.rating = rating  # Scale of 1-5
        self.amenities = amenities
        self.price_per_night = price_per_night
    
    def __str__(self) -> str:
        return (f"{self.name} ({self.rating}★) - {self.city} - "
                f"${self.price_per_night:.2f} per night - "
                f"Amenities: {', '.join(self.amenities)}")


In [None]:
class HotelSearchEngine:
    """Search engine to find available hotels based on city"""
    
    def __init__(self):
        # In a real application, this would connect to a database or API
        self.hotels_database = []
        self._populate_dummy_data()
    
    def _populate_dummy_data(self):
        """Populate with some dummy hotel data for demonstration"""
        cities = ["New York", "Los Angeles", "Chicago", "Miami", "Seattle", "Boston", "Dallas"]
        hotel_chains = ["Marriott", "Hilton", "Hyatt", "Sheraton", "Holiday Inn", "Ritz-Carlton", "Four Seasons"]
        hotel_types = ["Resort", "Suites", "Inn", "Grand Hotel", "Plaza", "Boutique"]
        possible_amenities = [
            "Free WiFi", "Swimming Pool", "Gym", "Restaurant", "Room Service", 
            "Spa", "Business Center", "Free Breakfast", "Pet Friendly", "Airport Shuttle"
        ]
        
        # Generate 70 random hotels across cities
        for i in range(1, 71):
            city = random.choice(cities)
            chain = random.choice(hotel_chains)
            hotel_type = random.choice(hotel_types)
            
            # Select 3-7 random amenities
            num_amenities = random.randint(3, 7)
            amenities = random.sample(possible_amenities, num_amenities)
            
            # Generate rating between 2.0 and 5.0
            rating = round(random.uniform(2.0, 5.0), 1)
            
            # Price based partially on rating
            base_price = 70 + (rating * 40)  # Higher rated hotels cost more
            price_variation = random.uniform(0.8, 1.2)  # +/- 20% random variation
            price = round(base_price * price_variation, 2)
            
            hotel = Hotel(
                hotel_id=f"HT{i:03d}",
                name=f"{chain} {hotel_type} {city}",
                city=city,
                address=f"{random.randint(100, 999)} Main Street, {city}",
                rating=rating,
                amenities=amenities,
                price_per_night=price
            )
            
            self.hotels_database.append(hotel)
    
    def search_hotels(self, city: str, min_rating: Optional[float] = None, 
                      max_price: Optional[float] = None) -> List[Hotel]:
        """
        Search for hotels based on city with optional filters
        
        Args:
            city: City where the hotel is located
            min_rating: Minimum rating (1-5 scale)
            max_price: Maximum price per night
            
        Returns:
            List of Hotel objects matching the criteria
        """
        results = []
        
        for hotel in self.hotels_database:
            if hotel.city.lower() != city.lower():
                continue
                
            if min_rating is not None and hotel.rating < min_rating:
                continue
                
            if max_price is not None and hotel.price_per_night > max_price:
                continue
                
            results.append(hotel)
        
        # Sort results by rating (highest first)
        results.sort(key=lambda x: x.rating, reverse=True)
        return results


### Define Agent

### Setup Runner and Session Service

In [5]:
APP_NAME = "weather_app"
USER_ID = "user_1"
SESSION_ID = "session_001"

In [6]:
session_service = InMemorySessionService()
session = session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)

runner = Runner(
    agent=weather_agent,
    app_name=APP_NAME,
    session_service=session_service
)

### Interact with the Agent

In [7]:
async def call_agent_async(query: str, runner: type[Runner], user_id: str, session_id: str):
    print(f"\n>>> User Query: {query}")

    content = types.Content(role="user", parts=[types.Part(text=query)])

    final_response_text = "Agent did not provide a final response"

    async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
        print(event)
        print("---" * 10)
        if event.is_final_response():
            if event.content and event.content.parts:
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate:
                final_response_text = f"Agent escalated: {event.error_message or 'No specific message'}"

            break

    print(f"<<< Agent Response: {final_response_text}")

In [8]:
async def run_conversation():
    await call_agent_async("What is the weather like in London?",
                                       runner=runner,
                                       user_id=USER_ID,
                                       session_id=SESSION_ID)

    await call_agent_async("How about Paris?",
                                       runner=runner,
                                       user_id=USER_ID,
                                       session_id=SESSION_ID) # Expecting the tool's error message

    await call_agent_async("Tell me the weather in New York",
                                       runner=runner,
                                       user_id=USER_ID,
                                       session_id=SESSION_ID)