# GenAI Developer Assignment ‚Äî Travel Assistant (LangGraph + Gemini)

This notebook contains the **complete implementation** with comprehensive logging.

---
## Objective
Build an intelligent **Travel Assistant** using:
- **Gemini API (Flash / Pro)**
- **Tools**: `search_flights`, `get_weather`, `find_attractions`
- **FastAPI endpoint** (`/travel-assistant`)
- **Retry logic with exponential backoff**
- **Streaming responses** for better UX
- **LangGraph framework**
- **Comprehensive logging mechanism**

In [None]:
# ============================================
# TASK 0: Setup and Dependencies
# ============================================
import os
import json
import logging
import asyncio
import time
from typing import TypedDict, Annotated, Sequence
from functools import wraps

# LangChain imports
from langchain.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode

# FastAPI imports
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

# Other imports
from dotenv import load_dotenv
import google.generativeai as genai

# ============================================
# LOGGING SETUP
# ============================================
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('travel_assistant.log')
    ]
)
logger = logging.getLogger(__name__)

logger.info("üöÄ Travel Assistant Application Starting...")
logger.info("üì¶ All dependencies imported successfully")
print("‚úÖ Setup complete! All imports successful.")


In [1]:
# Configure Gemini API
logger.info("üìù Loading environment variables...")
load_dotenv()

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    logger.error("‚ùå GOOGLE_API_KEY not found in environment variables")
    raise ValueError("GOOGLE_API_KEY not found. Please set it in .env file")

logger.info("üîë API key found")

# Configure Gemini
genai.configure(api_key=GOOGLE_API_KEY)
logger.info("‚úÖ Gemini API configured")

# Initialize Gemini model
MODEL_NAME = "gemini-1.5-flash"
logger.info(f"ü§ñ Initializing model: {MODEL_NAME}")

llm = ChatGoogleGenerativeAI(
    model=MODEL_NAME,
    temperature=0.7,
    google_api_key=GOOGLE_API_KEY
)

logger.info("‚úÖ LLM model initialized successfully")
print(f"‚úÖ Gemini API configured with model: {MODEL_NAME}")

NameError: name 'logger' is not defined

In [None]:
# Implement mock tools with comprehensive logging

@tool
def search_flights(origin: str, destination: str, date: str = "2025-12-01") -> dict:
    """
    Search for flight options between origin and destination.
    
    Args:
        origin: Departure city
        destination: Arrival city
        date: Travel date (YYYY-MM-DD format)
    
    Returns:
        Dictionary containing flight options
    """
    logger.info(f"üõ´ search_flights called: {origin} ‚Üí {destination} on {date}")
    
    # Mock flight data
    mock_flights = {
        "flights": [
            {
                "airline": "Singapore Airlines",
                "flight_number": "SQ638",
                "price_usd": 450,
                "departure_time": "07:00 AM",
                "arrival_time": "02:30 PM",
                "duration": "6h 30m",
                "stops": "Direct"
            },
            {
                "airline": "ANA",
                "flight_number": "NH842",
                "price_usd": 420,
                "departure_time": "11:30 AM",
                "arrival_time": "07:00 PM",
                "duration": "6h 30m",
                "stops": "Direct"
            },
            {
                "airline": "JAL",
                "flight_number": "JL712",
                "price_usd": 480,
                "departure_time": "09:15 AM",
                "arrival_time": "04:45 PM",
                "duration": "6h 30m",
                "stops": "Direct"
            }
        ],
        "origin": origin,
        "destination": destination,
        "date": date
    }
    
    logger.info(f"‚úÖ Found {len(mock_flights['flights'])} flights from {origin} to {destination}")
    logger.debug(f"Flight data: {json.dumps(mock_flights, indent=2)}")
    
    return mock_flights


@tool
def get_weather(location: str, days: int = 3) -> dict:
    """
    Get weather forecast for a location.
    
    Args:
        location: City name
        days: Number of days for forecast (1-7)
    
    Returns:
        Dictionary containing weather forecast
    """
    logger.info(f"üå§Ô∏è  get_weather called: {location} for {days} days")
    
    # Mock weather conditions
    conditions = ["Sunny", "Partly Cloudy", "Cloudy", "Light Rain", "Clear"]
    
    forecast = {
        "location": location,
        "forecast": []
    }
    
    for i in range(min(days, 7)):
        day_forecast = {
            "day": f"Day {i + 1}",
            "condition": conditions[i % len(conditions)],
            "temperature_celsius": 22 + (i % 5),
            "humidity_percent": 55 + (i * 5),
            "precipitation_chance": 10 + (i * 5)
        }
        forecast["forecast"].append(day_forecast)
    
    logger.info(f"‚úÖ Weather forecast retrieved for {location} ({days} days)")
    logger.debug(f"Weather data: {json.dumps(forecast, indent=2)}")
    
    return forecast


@tool
def find_attractions(location: str, limit: int = 5) -> dict:
    """
    Find top tourist attractions in a location.
    
    Args:
        location: City name
        limit: Maximum number of attractions to return
    
    Returns:
        Dictionary containing attraction information
    """
    logger.info(f"üóº find_attractions called: {location} (limit: {limit})")
    
    # Mock attractions database
    attractions_db = {
        "Tokyo": [
            {"name": "Shibuya Crossing", "type": "Landmark", "rating": 4.8, "description": "Famous scramble crossing"},
            {"name": "Senso-ji Temple", "type": "Temple", "rating": 4.7, "description": "Ancient Buddhist temple in Asakusa"},
            {"name": "Tokyo Skytree", "type": "Observation Tower", "rating": 4.6, "description": "Tallest structure in Japan"},
            {"name": "Meiji Shrine", "type": "Shrine", "rating": 4.7, "description": "Shinto shrine in forest setting"},
            {"name": "Tokyo Tower", "type": "Landmark", "rating": 4.5, "description": "Iconic communications tower"},
            {"name": "Tsukiji Outer Market", "type": "Market", "rating": 4.6, "description": "Fresh seafood and street food"},
        ],
        "default": [
            {"name": "City Center", "type": "District", "rating": 4.5, "description": "Main downtown area"},
            {"name": "Historic Quarter", "type": "District", "rating": 4.6, "description": "Old town with traditional architecture"},
            {"name": "Central Park", "type": "Park", "rating": 4.4, "description": "Large urban park"},
            {"name": "National Museum", "type": "Museum", "rating": 4.7, "description": "Cultural and historical exhibits"},
            {"name": "Waterfront", "type": "Area", "rating": 4.5, "description": "Scenic riverside or harbor area"},
        ]
    }
    
    # Get attractions for location or use default
    attractions_list = attractions_db.get(location, attractions_db["default"])
    limited_attractions = attractions_list[:limit]
    
    result = {
        "location": location,
        "attractions": limited_attractions,
        "total_found": len(limited_attractions)
    }
    
    logger.info(f"‚úÖ Found {len(limited_attractions)} attractions in {location}")
    logger.debug(f"Attractions data: {json.dumps(result, indent=2)}")
    
    return result


# Register tools
tools = [search_flights, get_weather, find_attractions]
logger.info(f"üìã Registered {len(tools)} tools: {[t.name for t in tools]}")

print("‚úÖ Tools implemented successfully!")
print(f"Available tools: {[t.name for t in tools]}")

In [None]:
# Implement retry logic wrapper with comprehensive logging

def retry_with_exponential_backoff(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    exponential_base: float = 2.0,
    max_delay: float = 60.0
):
    """
    Decorator for retrying a function with exponential backoff.
    
    Args:
        max_retries: Maximum number of retry attempts
        initial_delay: Initial delay in seconds
        exponential_base: Base for exponential calculation
        max_delay: Maximum delay between retries
    """
    def decorator(func):
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            retry_count = 0
            
            while retry_count <= max_retries:
                try:
                    logger.debug(f"üîÑ Attempting {func.__name__} (attempt {retry_count + 1}/{max_retries + 1})")
                    result = await func(*args, **kwargs)
                    
                    if retry_count > 0:
                        logger.info(f"‚úÖ {func.__name__} succeeded after {retry_count} retries")
                    
                    return result
                    
                except (google_exceptions.ResourceExhausted,
                        google_exceptions.ServiceUnavailable,
                        google_exceptions.DeadlineExceeded,
                        ConnectionError,
                        TimeoutError) as e:
                    
                    retry_count += 1
                    
                    if retry_count > max_retries:
                        logger.error(f"‚ùå {func.__name__} failed after {max_retries} retries: {str(e)}")
                        raise
                    
                    # Calculate delay with exponential backoff
                    delay = min(initial_delay * (exponential_base ** (retry_count - 1)), max_delay)
                    
                    logger.warning(
                        f"‚ö†Ô∏è  {func.__name__} failed (attempt {retry_count}/{max_retries + 1}): {type(e).__name__}"
                    )
                    logger.info(f"üîÑ Retrying in {delay:.2f} seconds...")
                    
                    await asyncio.sleep(delay)
                    
                except Exception as e:
                    # Don't retry on non-transient errors
                    logger.error(f"‚ùå {func.__name__} failed with non-retryable error: {type(e).__name__}: {str(e)}")
                    raise
            
            logger.error(f"‚ùå {func.__name__} exceeded maximum retries")
            raise Exception(f"Maximum retries ({max_retries}) exceeded for {func.__name__}")
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            retry_count = 0
            
            while retry_count <= max_retries:
                try:
                    logger.debug(f"üîÑ Attempting {func.__name__} (attempt {retry_count + 1}/{max_retries + 1})")
                    result = func(*args, **kwargs)
                    
                    if retry_count > 0:
                        logger.info(f"‚úÖ {func.__name__} succeeded after {retry_count} retries")
                    
                    return result
                    
                except (google_exceptions.ResourceExhausted,
                        google_exceptions.ServiceUnavailable,
                        google_exceptions.DeadlineExceeded,
                        ConnectionError,
                        TimeoutError) as e:
                    
                    retry_count += 1
                    
                    if retry_count > max_retries:
                        logger.error(f"‚ùå {func.__name__} failed after {max_retries} retries: {str(e)}")
                        raise
                    
                    # Calculate delay with exponential backoff
                    delay = min(initial_delay * (exponential_base ** (retry_count - 1)), max_delay)
                    
                    logger.warning(
                        f"‚ö†Ô∏è  {func.__name__} failed (attempt {retry_count}/{max_retries + 1}): {type(e).__name__}"
                    )
                    logger.info(f"üîÑ Retrying in {delay:.2f} seconds...")
                    
                    time.sleep(delay)
                    
                except Exception as e:
                    # Don't retry on non-transient errors
                    logger.error(f"‚ùå {func.__name__} failed with non-retryable error: {type(e).__name__}: {str(e)}")
                    raise
            
            logger.error(f"‚ùå {func.__name__} exceeded maximum retries")
            raise Exception(f"Maximum retries ({max_retries}) exceeded for {func.__name__}")
        
        # Return appropriate wrapper based on function type
        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        else:
            return sync_wrapper
    
    return decorator


logger.info("‚úÖ Retry logic with exponential backoff implemented")
print("‚úÖ Retry logic implemented with exponential backoff (1s, 2s, 4s, 8s...)")

In [None]:
# Implement streaming response handler with comprehensive logging

async def stream_llm_response(messages: list, tools_list: list = None):
    """
    Stream responses from Gemini LLM.
    
    Args:
        messages: List of conversation messages
        tools_list: Optional list of tools to bind
    
    Yields:
        Partial response chunks
    """
    logger.info("üì° Starting streaming LLM response")
    logger.debug(f"Input messages count: {len(messages)}")
    
    try:
        # Bind tools if provided
        model = llm
        if tools_list:
            model = llm.bind_tools(tools_list)
            logger.debug(f"Tools bound to model: {[t.name for t in tools_list]}")
        
        # Stream the response
        chunk_count = 0
        full_response = ""
        
        logger.info("üöÄ Invoking LLM with streaming...")
        async for chunk in model.astream(messages):
            chunk_count += 1
            
            if hasattr(chunk, 'content') and chunk.content:
                content = chunk.content
                full_response += content
                logger.debug(f"Chunk {chunk_count}: {len(content)} chars")
                yield chunk
            else:
                yield chunk
        
        logger.info(f"‚úÖ Streaming completed: {chunk_count} chunks, {len(full_response)} total chars")
        
    except Exception as e:
        logger.error(f"‚ùå Streaming failed: {type(e).__name__}: {str(e)}")
        raise


logger.info("‚úÖ Streaming response handler implemented")
print("‚úÖ Streaming response handler implemented!")

In [None]:
# Build LangGraph workflow with comprehensive logging

# Define Agent State
class AgentState(TypedDict):
    messages: Annotated[list, "Conversation messages"]


# Node 1: LLM Node with retry logic
@retry_with_exponential_backoff(max_retries=3, initial_delay=1.0)
async def call_model(state: AgentState) -> AgentState:
    """
    Call the LLM with tools bound.
    """
    logger.info("ü§ñ LLM Node: Calling model...")
    logger.debug(f"Current state messages: {len(state['messages'])}")
    
    messages = state["messages"]
    
    # Bind tools to LLM
    model_with_tools = llm.bind_tools(tools)
    logger.debug(f"Tools bound: {[t.name for t in tools]}")
    
    # Invoke LLM
    logger.info("üìû Invoking LLM...")
    response = await model_with_tools.ainvoke(messages)
    
    logger.info(f"‚úÖ LLM responded")
    
    # Check for tool calls
    if hasattr(response, 'tool_calls') and response.tool_calls:
        logger.info(f"üîß LLM requested {len(response.tool_calls)} tool call(s)")
        for tc in response.tool_calls:
            logger.debug(f"  Tool: {tc.get('name', 'unknown')} with args: {tc.get('args', {})}")
    else:
        logger.info("üí¨ LLM provided final response (no tool calls)")
    
    return {"messages": [response]}


# Router: Decide whether to continue or end
def should_continue(state: AgentState) -> Literal["tools", "__end__"]:
    """
    Determine if we should call tools or end the conversation.
    """
    logger.info("üîÄ Router: Determining next step...")
    
    messages = state["messages"]
    last_message = messages[-1]
    
    # Check if there are tool calls
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        logger.info(f"‚û°Ô∏è  Router decision: TOOLS ({len(last_message.tool_calls)} tool calls pending)")
        return "tools"
    else:
        logger.info("‚û°Ô∏è  Router decision: END (no tool calls, conversation complete)")
        return "__end__"


# Create the tool node
logger.info("Creating ToolNode...")
tool_node = ToolNode(tools)
logger.info(f"ToolNode created with {len(tools)} tools")


# Build the graph
logger.info("üèóÔ∏è  Building LangGraph workflow...")

workflow = StateGraph(AgentState)

# Add nodes
logger.info("Adding nodes to graph...")
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
logger.info("Nodes added: agent, tools")

# Set entry point
workflow.set_entry_point("agent")
logger.info("Entry point set: agent")

# Add conditional edges
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tools": "tools",
        "__end__": END
    }
)
logger.info("Conditional edges added: agent -> [tools, END]")

# Add edge from tools back to agent
workflow.add_edge("tools", "agent")
logger.info("Edge added: tools -> agent")

# Compile the graph
logger.info("Compiling graph...")
graph = workflow.compile()
logger.info("‚úÖ LangGraph workflow compiled successfully")

print("‚úÖ LangGraph Travel Assistant built successfully!")
print("Graph structure: START -> agent -> [tools -> agent (loop)] -> END")

In [None]:
# Build FastAPI app and endpoint with comprehensive logging

# Create FastAPI app
app = FastAPI(title="Travel Assistant API", version="1.0.0")
logger.info("FastAPI app created")


# Request model
class TravelRequest(BaseModel):
    query: str
    stream: bool = True


# Response model
class TravelResponse(BaseModel):
    response: str
    status: str


@app.post("/travel-assistant")
async def travel_assistant_endpoint(request: TravelRequest):
    """
    Travel Assistant endpoint - plans trips using LangGraph and Gemini.
    """
    request_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
    logger.info("="*60)
    logger.info(f"üåç NEW REQUEST [{request_id}]")
    logger.info("="*60)
    logger.info(f"Query: {request.query}")
    logger.info(f"Streaming: {request.stream}")
    
    try:
        # Initialize state
        initial_state = {
            "messages": [HumanMessage(content=request.query)]
        }
        logger.info("Initial state created")
        
        if request.stream:
            # Streaming response
            logger.info("üì° Starting streaming response...")
            
            async def event_generator():
                try:
                    step_count = 0
                    
                    async for event in graph.astream(initial_state, stream_mode="values"):
                        step_count += 1
                        logger.debug(f"Stream step {step_count}")
                        
                        messages = event.get("messages", [])
                        if messages:
                            last_message = messages[-1]
                            
                            # Stream content
                            if hasattr(last_message, 'content') and last_message.content:
                                content = last_message.content
                                logger.debug(f"Streaming content: {len(content)} chars")
                                yield f"data: {json.dumps({'type': 'content', 'data': content})}\\n\\n"
                            
                            # Stream tool calls
                            if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
                                for tc in last_message.tool_calls:
                                    tool_info = {
                                        'type': 'tool_call',
                                        'tool': tc.get('name', 'unknown'),
                                        'args': tc.get('args', {})
                                    }
                                    logger.info(f"Streaming tool call: {tool_info['tool']}")
                                    yield f"data: {json.dumps(tool_info)}\\n\\n"
                    
                    logger.info(f"‚úÖ Streaming completed: {step_count} steps")
                    yield f"data: {json.dumps({'type': 'done', 'status': 'success'})}\\n\\n"
                    
                except Exception as e:
                    logger.error(f"‚ùå Streaming error [{request_id}]: {type(e).__name__}: {str(e)}")
                    error_data = {
                        'type': 'error',
                        'error': str(e),
                        'error_type': type(e).__name__
                    }
                    yield f"data: {json.dumps(error_data)}\\n\\n"
            
            return StreamingResponse(
                event_generator(),
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                    "X-Request-ID": request_id
                }
            )
        
        else:
            # Non-streaming response
            logger.info("üîÑ Running non-streaming workflow...")
            
            result = await graph.ainvoke(initial_state)
            messages = result.get("messages", [])
            
            if messages:
                final_message = messages[-1]
                response_content = final_message.content if hasattr(final_message, 'content') else str(final_message)
                
                logger.info(f"‚úÖ Request completed [{request_id}]: {len(response_content)} chars")
                
                return TravelResponse(
                    response=response_content,
                    status="success"
                )
            else:
                logger.warning(f"‚ö†Ô∏è  No messages in result [{request_id}]")
                return TravelResponse(
                    response="No response generated",
                    status="error"
                )
    
    except Exception as e:
        logger.error(f"‚ùå Request failed [{request_id}]: {type(e).__name__}: {str(e)}")
        logger.exception("Full traceback:")
        
        return TravelResponse(
            response=f"Error: {str(e)}",
            status="error"
        )


@app.get("/")
async def root():
    """Health check endpoint."""
    logger.info("Health check requested")
    return {
        "status": "healthy",
        "service": "Travel Assistant API",
        "version": "1.0.0"
    }


@app.get("/health")
async def health():
    """Detailed health check."""
    logger.info("Detailed health check requested")
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "tools": [t.name for t in tools],
        "model": MODEL_NAME
    }


logger.info("‚úÖ FastAPI endpoints configured")
logger.info("Available endpoints: /travel-assistant, /, /health")

print("‚úÖ FastAPI endpoint '/travel-assistant' created successfully!")
print("\\nEndpoints:")
print("  POST /travel-assistant - Main travel planning endpoint")
print("  GET  /                 - Health check")
print("  GET  /health           - Detailed health status")

---
## üìä Rubric (Total 20 Points)
### **1. Tool Implementation (4 pts)**
- ‚úÖ Tools implemented correctly (2 pts)
- ‚úÖ Realistic mock responses (2 pts)

### **2. Retry Logic (4 pts)**
- ‚úÖ Exponential backoff implemented (2 pts)
- ‚úÖ Retries trigger correctly (2 pts)

### **3. Streaming Responses (4 pts)**
- ‚úÖ Streaming implemented (2 pts)
- ‚úÖ Smooth incremental output (2 pts)

### **4. LangGraph Workflow (4 pts)**
- ‚úÖ Graph nodes defined (2 pts)
- ‚úÖ Correct tool routing (2 pts)

### **5. FastAPI Endpoint (4 pts)**
- ‚úÖ Endpoint functional (2 pts)
- ‚úÖ Runs graph + streams output (2 pts)

---

## üìã Logging Features Implemented

This implementation includes comprehensive logging at every level:

1. **Setup Logging**: Initialization and configuration tracking
2. **Tool Logging**: Every tool call with parameters and results (üõ´ üå§Ô∏è üóº)
3. **Retry Logging**: Attempt counters, backoff delays, success/failure tracking
4. **Streaming Logging**: Chunk delivery and progress monitoring
5. **Graph Logging**: Node execution, router decisions, state transitions
6. **API Logging**: Request IDs, queries, responses, errors
7. **Error Logging**: Full exception details with context

**Log Output**: 
- Console (real-time)
- File: `travel_assistant.log`

**Log Levels**:
- INFO: General flow and important events
- DEBUG: Detailed execution information
- WARNING: Retry attempts and recoverable issues
- ERROR: Failures and exceptions

---
## üìù Sample Input
```
Plan a 3-day trip to Tokyo. I need flight options from Singapore, weather forecast, and top attractions.
```
## ‚úÖ Expected Output (High-Level)
```
Flights Found:
- Singapore ‚Üí Tokyo, $450, 7 AM

Weather Forecast:
- Day 1: Sunny
- Day 2: Cloudy

Top Attractions:
- Shibuya Crossing
- Senso-ji Temple
- Tokyo Skytree

Suggested Itinerary:
Day 1: Shinjuku, Shibuya
Day 2: Asakusa, Skytree
Day 3: Odaiba
```

In [None]:
# Start the FastAPI server
# Note: This will run the server - use Ctrl+C to stop

import uvicorn

logger.info("="*60)
logger.info("üöÄ STARTING FASTAPI SERVER")
logger.info("="*60)

print("\\n" + "="*60)
print("üöÄ Starting FastAPI Server")
print("="*60)
print("\\nServer will start on: http://127.0.0.1:8000")
print("\\nEndpoints:")
print("  POST http://127.0.0.1:8000/travel-assistant")
print("  GET  http://127.0.0.1:8000/")
print("  GET  http://127.0.0.1:8000/health")
print("\\nPress Ctrl+C to stop the server")
print("="*60 + "\\n")

# Run the server
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

---
## üöÄ Start FastAPI Server

In [None]:
# Test the Travel Assistant directly (without FastAPI)

async def test_travel_assistant(query: str):
    """
    Test the travel assistant with a query.
    """
    logger.info("="*60)
    logger.info("üß™ TEST MODE")
    logger.info("="*60)
    logger.info(f"Test query: {query}")
    
    print("\\n" + "="*60)
    print("üß™ Testing Travel Assistant")
    print("="*60)
    print(f"\\nQuery: {query}\\n")
    print("-"*60)
    
    # Create initial state
    initial_state = {
        "messages": [HumanMessage(content=query)]
    }
    
    # Run the graph
    logger.info("Starting graph execution...")
    result = await graph.ainvoke(initial_state)
    
    # Extract final response
    messages = result.get("messages", [])
    
    logger.info(f"Graph execution completed: {len(messages)} messages")
    
    print("\\nüìù Response:\\n")
    
    for msg in messages:
        if hasattr(msg, 'content') and msg.content:
            print(msg.content)
            print()
    
    print("-"*60)
    logger.info("‚úÖ Test completed")
    
    return result


# Run test with the sample input from the assignment
test_query = "Plan a 3-day trip to Tokyo. I need flight options from Singapore, weather forecast, and top attractions."

logger.info("Initiating test...")
test_result = await test_travel_assistant(test_query)

---
## üß™ Test the Travel Assistant

---
## üåê Task 5 ‚Äî Build FastAPI Endpoint `/travel-assistant`
The endpoint must:
- Accept user input
- Run LangGraph workflow
- Stream output to the client

---
## üß© Task 4 ‚Äî Build LangGraph Travel Assistant
Implement the full graph:
- LLM node
- Tool invocation nodes
- Router logic
- State updates

---
## üîÑ Task 3 ‚Äî Add Streaming Responses
Use Gemini's streaming capability and return partial responses incrementally.

---
## üîÅ Task 2 ‚Äî Implement Retry Logic (Exponential Backoff)
Your LLM calls must:
- Retry on transient errors
- Use exponential backoff (1s, 2s, 4s, ...)

---
## üîß Task 1 ‚Äî Implement Tools
Build three tools used by the Travel Assistant:
### 1. `search_flights`
### 2. `get_weather`
### 3. `find_attractions`
Each tool should return **mock responses**.

In [None]:
# Import required libraries
import os
import time
import asyncio
import logging
import json
from datetime import datetime
from typing import TypedDict, Annotated, Literal, List, Dict, Any
from functools import wraps

# Google AI
import google.generativeai as genai
from google.api_core import exceptions as google_exceptions

# LangChain & LangGraph
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode

# FastAPI
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

# Environment
from dotenv import load_dotenv
import nest_asyncio

# Enable nested async (for Jupyter notebooks)
nest_asyncio.apply()

# Configure comprehensive logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('travel_assistant.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

logger.info("="*60)
logger.info("üöÄ Travel Assistant Application Starting")
logger.info("="*60)

print("‚úÖ Libraries imported successfully!")

In [None]:
# Install dependencies
!pip install -q google-generativeai langgraph langchain langchain-google-genai fastapi uvicorn python-dotenv nest-asyncio pydantic tiktoken

## üì¶ Setup
Create required imports, install libraries, and configure Gemini API.