# Error Handling for API Calls

This notebook covers strategies for handling errors when working with LLM APIs. Topics include:

- Rate limit handling and backoff strategies
- Timeout management
- Fallback options between models
- Logging and monitoring API usage

## 1. Setup and Imports

First, let's import the necessary libraries and set up our environment:

In [None]:
import os
import sys
import time
import json
import random
import logging
import requests
import pandas as pd
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from concurrent.futures import ThreadPoolExecutor

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('llm_api')

# Load environment variables
load_dotenv()

# Import our custom utilities
sys.path.append('.')
from api_utils import (
    call_openrouter,
    extract_text_response
)

Let's verify that our API key is loaded correctly:

In [None]:
# Verify that the API key is loaded
api_key = os.getenv("OPENROUTER_API_KEY")
if api_key:
    print("✅ API key loaded successfully!")
    # Show first and last three characters for verification
    masked_key = f"{api_key[:3]}...{api_key[-3:]}" if len(api_key) > 6 else "[key too short]"
    print(f"API key: {masked_key}")
else:
    print("❌ API key not found! Make sure you've created a .env file with your OPENROUTER_API_KEY.")

## 2. Understanding API Errors

Before we implement error handling strategies, let's understand the types of errors we might encounter when working with LLM APIs:

### 2.1 Common Error Types

| Error Type | HTTP Status | Description | Handling Strategy |
|------------|-------------|-------------|-------------------|
| Authentication | 401 | Invalid API key or authentication error | Check API key configuration |
| Authorization | 403 | Permission denied | Verify account permissions |
| Rate Limit | 429 | Too many requests | Implement backoff and retry |
| Server Error | 500-599 | Server-side error | Retry with backoff or fallback to another provider |
| Invalid Request | 400 | Malformed request | Fix request parameters |
| Timeout | N/A | Request took too long | Implement timeouts and retry logic |
| Network Error | N/A | Connection problems | Retry with backoff |
| Content Filtering | 400/403 | Content violates policy | Handle content filtering edge cases |

Let's simulate some of these errors to understand how they look:

In [None]:
def simulate_error(error_type):
    """Simulate different API error responses."""
    if error_type == "authentication":
        return {
            "error": {
                "message": "Authentication error: invalid API key provided",
                "type": "authentication_error",
                "code": 401
            }
        }
    elif error_type == "rate_limit":
        return {
            "error": {
                "message": "Rate limit exceeded. Please try again in 20s",
                "type": "rate_limit_error",
                "code": 429
            }
        }
    elif error_type == "server":
        return {
            "error": {
                "message": "Server error: The service is currently unavailable",
                "type": "server_error",
                "code": 503
            }
        }
    elif error_type == "timeout":
        # This is typically a client-side error
        class TimeoutError(Exception):
            pass
        raise TimeoutError("Request timed out after 60 seconds")
    else:
        return {"error": {"message": "Unknown error", "type": "unknown", "code": 400}}

# Show example errors
for error_type in ["authentication", "rate_limit", "server"]:
    print(f"Example {error_type} error response:")
    print(json.dumps(simulate_error(error_type), indent=2))
    print()

# Timeout errors are typically raised as exceptions
try:
    simulate_error("timeout")
except Exception as e:
    print(f"Example timeout error: {e}")

## 3. Rate Limit Handling

Rate limiting is one of the most common issues when working with LLM APIs. Most providers implement rate limits to ensure fair usage and system stability.

### 3.1 Exponential Backoff

A key strategy for handling rate limits is exponential backoff, where each retry waits longer than the previous one. This helps prevent overwheling the API with retries.

In [None]:
def simple_backoff_retry(func, max_retries=5, initial_delay=1):
    """Implements a simple exponential backoff retry mechanism.
    
    Args:
        func: The function to retry
        max_retries: Maximum number of retry attempts
        initial_delay: Initial delay in seconds
        
    Returns:
        The function result or raises the last exception
    """
    retries = 0
    delay = initial_delay
    
    while retries < max_retries:
        try:
            return func()
        except Exception as e:
            retries += 1
            if retries >= max_retries:
                logger.error(f"Maximum retries ({max_retries}) exceeded. Last error: {e}")
                raise
            
            # Calculate delay with jitter for distributed load
            jitter = random.uniform(0.8, 1.2)
            wait_time = delay * jitter
            
            logger.info(f"Retry {retries}/{max_retries} after error: {e}. Waiting {wait_time:.2f}s")
            time.sleep(wait_time)
            
            # Exponential increase for next retry
            delay *= 2

# Simulate a function that occasionally fails with rate limit errors
def simulate_api_call(fail_probability=0.6):
    """Simulate an API call that might fail with rate limits."""
    if random.random() < fail_probability:
        # Simulate a rate limit error
        logger.warning("API call hit rate limit!")
        raise Exception("Rate limit exceeded")
    
    logger.info("API call succeeded")
    return {"success": True, "data": "This is a simulated response"}

# Test our backoff strategy
try:
    result = simple_backoff_retry(simulate_api_call, max_retries=5, initial_delay=0.5)
    print(f"Final result: {result}")
except Exception as e:
    print(f"All retries failed: {e}")

### 3.2 Using Tenacity for Robust Retries

Instead of implementing our own retry logic, we can use the `tenacity` library which provides robust retry capabilities:

In [None]:
# Define a custom retry strategy using tenacity
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError, TimeoutError)),
    reraise=True
)
def api_call_with_retry(prompt, model="openai/gpt-4o-mini-2024-07-18"):
    """Make an API call with automatic retries."""
    # In a real application, we might want to inspect the exception type
    # to determine if it's a retryable error
    response = call_openrouter(
        prompt=prompt,
        model=model,
        temperature=0.7,
        max_tokens=100
    )
    
    # Check if the API returned an error instead of raising an exception
    if not response.get("success", False):
        error = response.get("error", "Unknown error")
        # Simulate raising an exception for certain error types
        if "rate limit" in str(error).lower():
            raise requests.exceptions.RequestException(f"Rate limit error: {error}")
        elif "timeout" in str(error).lower():
            raise TimeoutError(f"Timeout error: {error}")
        elif "server" in str(error).lower():
            raise ConnectionError(f"Server error: {error}")
    
    return response

# Let's test the retry-enabled API call
# Note: This won't actually fail in normal use, as we're using a valid API key
try:
    response = api_call_with_retry("What is the capital of France?")
    if response.get("success", False):
        print("API call succeeded!")
        print(f"Response: {extract_text_response(response)}")
    else:
        print(f"API call failed: {response.get('error', 'Unknown error')}")
except Exception as e:
    print(f"API call failed after all retries: {e}")

### 3.3 Request Rate Limiting

In addition to handling rate limit errors, we can proactively prevent them by implementing client-side rate limiting. This ensures we don't exceed the API's rate limits in the first place.

In [None]:
class RateLimiter:
    """A simple rate limiter that enforces a maximum number of requests per minute."""
    
    def __init__(self, requests_per_minute):
        """Initialize the rate limiter.
        
        Args:
            requests_per_minute: Maximum requests per minute
        """
        self.requests_per_minute = requests_per_minute
        self.min_interval = 60.0 / requests_per_minute  # Minimum interval in seconds
        self.last_request_time = 0
    
    def wait_if_needed(self):
        """Wait if necessary to comply with the rate limit."""
        current_time = time.time()
        elapsed = current_time - self.last_request_time
        
        if elapsed < self.min_interval:
            wait_time = self.min_interval - elapsed
            logger.info(f"Rate limiting: Waiting {wait_time:.2f}s before next request")
            time.sleep(wait_time)
        
        self.last_request_time = time.time()

# Create a rate limiter that allows 6 requests per minute
limiter = RateLimiter(requests_per_minute=6)

def rate_limited_api_call(prompt, model="openai/gpt-4o-mini-2024-07-18"):
    """Make an API call with rate limiting."""
    # Wait if needed to respect rate limits
    limiter.wait_if_needed()
    
    # Make the API call
    return call_openrouter(
        prompt=prompt,
        model=model,
        temperature=0.7,
        max_tokens=100
    )

# Test the rate limiter with multiple requests
test_prompts = [
    "What is artificial intelligence?",
    "Explain machine learning in simple terms",
    "What is deep learning?"
]

for prompt in test_prompts:
    start_time = time.time()
    response = rate_limited_api_call(prompt)
    end_time = time.time()
    
    if response.get("success", False):
        print(f"Request for '{prompt}' took {end_time - start_time:.2f}s")
    else:
        print(f"Request for '{prompt}' failed: {response.get('error', 'Unknown error')}")

## 4. Timeout Management

API calls can sometimes take longer than expected, especially for complex prompts or during high traffic periods. Let's implement proper timeout handling:

In [None]:
def call_with_timeout(prompt, model="openai/gpt-4o-mini-2024-07-18", timeout=10):
    """Make an API call with a specified timeout.
    
    Args:
        prompt: The prompt to send
        model: The model to use
        timeout: Timeout in seconds
        
    Returns:
        The API response or an error
    """
    try:
        # Here we'd typically use requests.post() with a timeout parameter
        # For our example, we'll use our utility function call_openrouter
        # Since we don't have direct access to set the timeout, we'll simulate it
        
        # In a real implementation, you'd do something like this:
        # response = requests.post(
        #    "https://openrouter.ai/api/v1/chat/completions", 
        #    json=payload, 
        #    headers=headers, 
        #    timeout=timeout
        # )
        
        # Simulate potential timeout
        process_time = timeout * 0.8  # Simulate taking 80% of the timeout time
        print(f"API call will take {process_time:.2f}s (timeout is {timeout}s)")
        
        if process_time > timeout:
            raise requests.exceptions.Timeout(f"Request timed out after {timeout}s")
        
        return call_openrouter(
            prompt=prompt,
            model=model,
            temperature=0.7,
            max_tokens=100
        )
        
    except requests.exceptions.Timeout as e:
        logger.error(f"Request timed out: {e}")
        return {"success": False, "error": f"Timeout error: {e}"}
    except Exception as e:
        logger.error(f"Error during API call: {e}")
        return {"success": False, "error": f"Error: {e}"}

# Test with different timeout values
timeouts = [5, 15, 30]  # seconds
for timeout in timeouts:
    print(f"\nTrying with {timeout}s timeout:")
    response = call_with_timeout("Explain quantum computing briefly", timeout=timeout)
    
    if response.get("success", False):
        print(f"Success! Response: {extract_text_response(response)[:100]}...")
    else:
        print(f"Failed: {response.get('error', 'Unknown error')}")

### 4.1 Adaptive Timeouts

Instead of using fixed timeouts, we can implement adaptive timeouts based on the complexity of the request:

In [None]:
def calculate_adaptive_timeout(prompt, model="openai/gpt-4o-mini-2024-07-18", base_timeout=10):
    """Calculate an adaptive timeout based on prompt complexity.
    
    Args:
        prompt: The prompt to send
        model: The model to use
        base_timeout: Base timeout in seconds
        
    Returns:
        Calculated timeout in seconds
    """
    # Start with the base timeout
    timeout = base_timeout
    
    # Adjust based on prompt length (token count would be better)
    prompt_length = len(prompt) if isinstance(prompt, str) else len(json.dumps(prompt))
    
    # Add 1 second for every 100 characters in the prompt
    timeout += (prompt_length / 100)
    
    # Adjust based on model (more complex models might take longer)
    if "opus" in model or "gpt-4" in model:
        timeout *= 1.5  # 50% more time for complex models
    elif "mini" in model or "haiku" in model:
        timeout *= 0.8  # 20% less time for smaller models
    
    # Add a small random factor (±10%) to prevent thundering herd problem
    jitter = random.uniform(0.9, 1.1)
    timeout *= jitter
    
    # Ensure a minimum timeout
    return max(5, timeout)

# Test the adaptive timeout calculation
test_cases = [
    {"prompt": "Hello", "model": "openai/gpt-4o-mini-2024-07-18"},
    {"prompt": "Explain the theory of relativity in detail", "model": "openai/gpt-4o-mini-2024-07-18"},
    {"prompt": "Write a 500 word essay on climate change" * 5, "model": "openai/gpt-4o-2024-08-06"},
    {"prompt": "Summarize this book chapter" + "A" * 2000, "model": "anthropic/claude-3-opus-20240229"}
]

for case in test_cases:
    timeout = calculate_adaptive_timeout(case["prompt"], case["model"])
    print(f"Prompt: '{case['prompt'][:30]}...' ({len(case['prompt'])} chars)")
    print(f"Model: {case['model']}")
    print(f"Calculated timeout: {timeout:.2f}s\n")

## 5. Fallback Options

When one model or provider fails, having fallback options can ensure your application remains operational.

### 5.1 Model Fallback

Let's implement a fallback chain that tries multiple models in sequence:

In [None]:
def call_with_model_fallback(prompt, models=None, max_attempts=3):
    """Try multiple models in sequence until one succeeds.
    
    Args:
        prompt: The prompt to send
        models: List of models to try in order
        max_attempts: Maximum attempts per model
        
    Returns:
        The successful response or the last error
    """
    if models is None:
        # Default fallback chain from most capable to least capable
        models = [
            "openai/gpt-4o-2024-08-06",  # Primary model
            "openai/gpt-4o-mini-2024-07-18",  # Fallback 1
            "anthropic/claude-3-haiku-20240307",  # Fallback 2
            "google/gemini-2.5-flash"  # Fallback 3
        ]
    
    last_error = None
    for model in models:
        logger.info(f"Trying model: {model}")
        
        for attempt in range(max_attempts):
            try:
                response = call_openrouter(
                    prompt=prompt,
                    model=model,
                    temperature=0.7,
                    max_tokens=100
                )
                
                if response.get("success", False):
                    logger.info(f"Successfully used model {model} on attempt {attempt + 1}")
                    return response
                else:
                    error = response.get("error", "Unknown error")
                    last_error = error
                    logger.warning(f"Model {model} failed (attempt {attempt + 1}): {error}")
                    
                    # For certain errors, immediately try the next model
                    if any(err in str(error).lower() for err in [
                        "model not found", "not available", "not supported", "invalid model"
                    ]):
                        logger.info(f"Model {model} is not available. Trying next model.")
                        break
                    
                    # For rate limits, wait before retrying
                    if "rate limit" in str(error).lower():
                        wait_time = (attempt + 1) * 2  # Progressive backoff
                        logger.info(f"Rate limited. Waiting {wait_time}s before retry.")
                        time.sleep(wait_time)
            
            except Exception as e:
                last_error = str(e)
                logger.error(f"Error with model {model} (attempt {attempt + 1}): {e}")
    
    # If we've exhausted all models, return the last error
    logger.error(f"All models failed. Last error: {last_error}")
    return {"success": False, "error": f"All models failed. Last error: {last_error}"}

# Test the fallback chain
# For this test, let's include a non-existent model at the start to force fallback
test_fallback_chain = [
    "non-existent-model/gpt-99",  # This will fail, forcing fallback
    "openai/gpt-4o-mini-2024-07-18",  # This should work if available
    "anthropic/claude-3-haiku-20240307"
]

response = call_with_model_fallback(
    "What's the best way to learn programming?", 
    models=test_fallback_chain
)

if response.get("success", False):
    print(f"Successfully got response using fallback: {extract_text_response(response)[:100]}...")
    print(f"Model used: {response.get('model', 'Unknown')}")
else:
    print(f"All fallbacks failed: {response.get('error', 'Unknown error')}")

### 5.2 Parameter Fallbacks

In addition to model fallbacks, we can also implement parameter fallbacks for cases where a request fails due to parameter issues:

In [None]:
def call_with_parameter_fallback(prompt, model="openai/gpt-4o-mini-2024-07-18"):
    """Try different parameter configurations if the initial call fails.
    
    Args:
        prompt: The prompt to send
        model: The model to use
        
    Returns:
        The successful response or the last error
    """
    # Initial parameters
    initial_params = {
        "temperature": 0.7,
        "max_tokens": 500,
        "top_p": 1.0
    }
    
    # Fallback parameter sets to try if the initial one fails
    fallback_params = [
        # Conservative fallback - reduce output size, lower temperature
        {"temperature": 0.3, "max_tokens": 200, "top_p": 0.9},
        # Minimal fallback - highly constrained
        {"temperature": 0.0, "max_tokens": 100, "top_p": 0.5}
    ]
    
    # Try the initial parameters
    logger.info(f"Trying initial parameters: {initial_params}")
    response = call_openrouter(
        prompt=prompt,
        model=model,
        **initial_params
    )
    
    if response.get("success", False):
        return response
    
    # If initial call failed, try fallbacks
    last_error = response.get("error", "Unknown error")
    for i, params in enumerate(fallback_params):
        logger.info(f"Trying fallback parameters {i+1}: {params}")
        
        response = call_openrouter(
            prompt=prompt,
            model=model,
            **params
        )
        
        if response.get("success", False):
            logger.info(f"Succeeded with fallback parameters {i+1}")
            return response
        
        last_error = response.get("error", "Unknown error")
    
    # If all parameter sets failed, return the last error
    logger.error(f"All parameter sets failed. Last error: {last_error}")
    return {"success": False, "error": f"All parameter sets failed. Last error: {last_error}"}

# Test the parameter fallback
# For a real test, we'd need to simulate parameter failures,
# but we'll just demonstrate the concept
response = call_with_parameter_fallback(
    "Explain the concept of neural networks in simple terms"
)

if response.get("success", False):
    print(f"Successfully got response: {extract_text_response(response)[:100]}...")
else:
    print(f"All parameter sets failed: {response.get('error', 'Unknown error')}")

## 6. Logging and Monitoring

Proper logging and monitoring are critical for tracking API usage, errors, and performance.

### 6.1 Comprehensive Logging

In [None]:
class APILogger:
    """A class for comprehensive API call logging."""
    
    def __init__(self, log_file=None):
        """Initialize the logger.
        
        Args:
            log_file: Path to the log file. If None, logs to console only.
        """
        self.logger = logging.getLogger('api_logger')
        self.logger.setLevel(logging.INFO)
        
        # Add a console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        console_handler.setFormatter(console_format)
        self.logger.addHandler(console_handler)
        
        # Add a file handler if a log file is specified
        if log_file:
            file_handler = logging.FileHandler(log_file)
            file_handler.setLevel(logging.DEBUG)  # Log everything to file
            file_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            file_handler.setFormatter(file_format)
            self.logger.addHandler(file_handler)
        
        # Keep track of API call statistics
        self.stats = {
            "total_calls": 0,
            "successful_calls": 0,
            "failed_calls": 0,
            "total_tokens": 0,
            "call_durations": [],
            "errors": {}
        }
    
    def log_api_call(self, model, prompt, response, start_time, end_time):
        """Log an API call with detailed information.
        
        Args:
            model: The model used
            prompt: The prompt sent
            response: The API response
            start_time: Start time of the call
            end_time: End time of the call
        """
        duration = end_time - start_time
        success = response.get("success", False)
        
        # Basic log entry
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "model": model,
            "success": success,
            "duration": duration,
            "prompt_length": len(str(prompt))
        }
        
        # Update statistics
        self.stats["total_calls"] += 1
        self.stats["call_durations"].append(duration)
        
        if success:
            self.stats["successful_calls"] += 1
            response_text = extract_text_response(response)
            log_entry["response_length"] = len(response_text)
            
            # Use abbreviated response to avoid huge log entries
            abbreviated_response = response_text[:100] + "..." if len(response_text) > 100 else response_text
            self.logger.info(
                f"API call successful | Model: {model} | Duration: {duration:.2f}s | "
                f"Response: {abbreviated_response}"
            )
        else:
            self.stats["failed_calls"] += 1
            error = response.get("error", "Unknown error")
            log_entry["error"] = error
            
            # Track error types
            error_type = "unknown"
            if "rate limit" in str(error).lower():
                error_type = "rate_limit"
            elif "timeout" in str(error).lower():
                error_type = "timeout"
            elif "auth" in str(error).lower():
                error_type = "authentication"
            
            self.stats["errors"][error_type] = self.stats["errors"].get(error_type, 0) + 1
            self.logger.warning(
                f"API call failed | Model: {model} | Duration: {duration:.2f}s | "
                f"Error: {error}"
            )
        
        return log_entry
    
    def get_stats_summary(self):
        """Get a summary of API call statistics."""
        if not self.stats["total_calls"]:
            return "No API calls logged yet."
        
        success_rate = (self.stats["successful_calls"] / self.stats["total_calls"]) * 100
        avg_duration = sum(self.stats["call_durations"]) / len(self.stats["call_durations"])
        
        summary = f"""API Call Statistics:  
        Total Calls: {self.stats['total_calls']}  
        Success Rate: {success_rate:.1f}%  
        Average Duration: {avg_duration:.2f}s  
        Failed Calls: {self.stats['failed_calls']}  
        Error Types: {self.stats['errors']}  
        """
        
        return summary

# Create a logger
api_logger = APILogger(log_file="api_calls.log")

def logged_api_call(prompt, model="openai/gpt-4o-mini-2024-07-18"):
    """Make an API call with comprehensive logging."""
    start_time = time.time()
    
    response = call_openrouter(
        prompt=prompt,
        model=model,
        temperature=0.7,
        max_tokens=100
    )
    
    end_time = time.time()
    
    # Log the API call
    api_logger.log_api_call(model, prompt, response, start_time, end_time)
    
    return response

# Test the logged API call
for _ in range(3):
    logged_api_call("What's your favorite color?")

# Print the stats summary
print("\n" + "=" * 50 + "\n")
print(api_logger.get_stats_summary())

### 6.2 Cost and Usage Tracking

In [None]:
class UsageTracker:
    """Track API usage, costs, and quotas."""
    
    def __init__(self, daily_budget=10.0):
        """Initialize the usage tracker.
        
        Args:
            daily_budget: Daily budget in USD
        """
        self.daily_budget = daily_budget
        self.usage = {
            "total_cost": 0.0,
            "total_tokens": 0,
            "calls_by_model": {},
            "costs_by_model": {},
            "start_time": datetime.now().isoformat()
        }
    
    def track_usage(self, model, input_tokens, output_tokens, success=True):
        """Track usage for an API call.
        
        Args:
            model: The model used
            input_tokens: Number of input tokens
            output_tokens: Number of output tokens
            success: Whether the call was successful
            
        Returns:
            A dictionary with usage information
        """
        # Initialize model tracking if not exists
        if model not in self.usage["calls_by_model"]:
            self.usage["calls_by_model"][model] = 0
            self.usage["costs_by_model"][model] = 0.0
        
        # Update call count
        self.usage["calls_by_model"][model] += 1
        
        # Only track tokens and costs for successful calls
        if success:
            # Import the cost estimation function
            from api_utils import estimate_cost
            
            # Calculate cost
            cost = estimate_cost(model, input_tokens, output_tokens)
            
            # Update cost tracking
            self.usage["total_cost"] += cost
            self.usage["costs_by_model"][model] += cost
            
            # Update token tracking
            self.usage["total_tokens"] += input_tokens + output_tokens
        
        # Check if we're approaching the budget
        budget_percent = (self.usage["total_cost"] / self.daily_budget) * 100
        if budget_percent >= 80:
            logger.warning(f"WARNING: {budget_percent:.1f}% of daily budget used (${self.usage['total_cost']:.2f}/{self.daily_budget:.2f})")
        
        return {
            "cost": cost if success else 0.0,
            "total_cost": self.usage["total_cost"],
            "budget_percent": budget_percent,
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens
        }
    
    def get_usage_report(self):
        """Get a detailed usage report."""
        # Calculate time period
        start_time = datetime.fromisoformat(self.usage["start_time"])
        current_time = datetime.now()
        duration = current_time - start_time
        
        # Create the report
        report = f"""\nUsage Report ({start_time.strftime('%Y-%m-%d %H:%M')} to {current_time.strftime('%Y-%m-%d %H:%M')}, {duration.total_seconds()/3600:.1f} hours):
        
        Total Cost: ${self.usage['total_cost']:.4f}
        Budget Used: {(self.usage['total_cost'] / self.daily_budget) * 100:.1f}% of ${self.daily_budget:.2f}
        Total Tokens: {self.usage['total_tokens']:,}
        
        Usage by Model:
        """
        
        for model in self.usage["calls_by_model"]:
            calls = self.usage["calls_by_model"][model]
            cost = self.usage["costs_by_model"][model]
            report += f"        {model}: {calls} calls, ${cost:.4f}\n"
        
        return report

# Create a usage tracker
usage_tracker = UsageTracker(daily_budget=5.0)

def tracked_api_call(prompt, model="openai/gpt-4o-mini-2024-07-18"):
    """Make an API call with usage tracking."""
    # Count input tokens
    from token_counter import count_tokens
    input_tokens = count_tokens(prompt if isinstance(prompt, str) else json.dumps(prompt))
    
    # Make the API call
    response = call_openrouter(
        prompt=prompt,
        model=model,
        temperature=0.7,
        max_tokens=100
    )
    
    # Track usage
    if response.get("success", False):
        output_text = extract_text_response(response)
        output_tokens = count_tokens(output_text)
        
        usage_info = usage_tracker.track_usage(
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            success=True
        )
        
        # Add usage info to the response
        response["usage_info"] = usage_info
    else:
        # Track failed call
        usage_tracker.track_usage(
            model=model,
            input_tokens=input_tokens,
            output_tokens=0,
            success=False
        )
    
    return response

# Test with multiple models to see usage tracking
test_models = [
    "openai/gpt-4o-mini-2024-07-18",
    "openai/gpt-4o-mini-2024-07-18",
    "anthropic/claude-3-haiku-20240307"
]

for model in test_models:
    response = tracked_api_call(f"Tell me a fun fact about {random.choice(['cats', 'dogs', 'birds', 'space'])}", model=model)
    
    if response.get("success", False):
        usage_info = response.get("usage_info", {})
        print(f"API call to {model} cost: ${usage_info.get('cost', 0):.6f}")
        print(f"Response: {extract_text_response(response)[:100]}...\n")

# Print the usage report
print(usage_tracker.get_usage_report())

## 7. Creating a Resilient API Client

Now, let's combine everything we've learned to create a comprehensive, resilient API client:

In [None]:
class ResilientLLMClient:
    """A comprehensive, resilient client for LLM API calls."""
    
    def __init__(self, 
                 primary_model="openai/gpt-4o-mini-2024-07-18",
                 fallback_models=None,
                 base_timeout=15,
                 max_retries=3,
                 log_file="llm_api.log",
                 daily_budget=10.0,
                 requests_per_minute=10):
        """Initialize the resilient client.
        
        Args:
            primary_model: The primary model to use
            fallback_models: List of fallback models
            base_timeout: Base timeout in seconds
            max_retries: Maximum retry attempts
            log_file: Path to the log file
            daily_budget: Daily budget in USD
            requests_per_minute: Maximum requests per minute
        """
        # Model configuration
        self.primary_model = primary_model
        self.fallback_models = fallback_models or [
            "openai/gpt-4o-mini-2024-07-18",
            "anthropic/claude-3-haiku-20240307",
            "google/gemini-2.5-flash"
        ]
        
        # Timeouts and retries
        self.base_timeout = base_timeout
        self.max_retries = max_retries
        
        # Set up logging
        self.logger = APILogger(log_file=log_file)
        
        # Set up usage tracking
        self.usage_tracker = UsageTracker(daily_budget=daily_budget)
        
        # Set up rate limiting
        self.rate_limiter = RateLimiter(requests_per_minute=requests_per_minute)
        
        # Cache for responses (simple in-memory cache)
        self.cache = {}
        
        # Call statistics
        self.call_count = 0
        self.error_count = 0
        self.cache_hits = 0
        
        logging.info(f"Initialized ResilientLLMClient with primary model: {primary_model}")
    
    def generate_cache_key(self, prompt, model, temperature, max_tokens):
        """Generate a cache key for a request."""
        # Convert prompt to string if it's not already
        prompt_str = json.dumps(prompt) if not isinstance(prompt, str) else prompt
        
        # Create a unique key based on the request parameters
        key_parts = [
            prompt_str,
            model,
            str(temperature),
            str(max_tokens)
        ]
        
        # Use a hash for the key
        import hashlib
        return hashlib.md5("|".join(key_parts).encode()).hexdigest()
    
    def call(self, prompt, model=None, temperature=0.7, max_tokens=300, 
             system_prompt=None, use_cache=True, timeout=None):
        """Make a resilient API call with comprehensive error handling.
        
        Args:
            prompt: The prompt to send
            model: The model to use (defaults to primary_model)
            temperature: Temperature parameter
            max_tokens: Maximum tokens to generate
            system_prompt: Optional system prompt
            use_cache: Whether to use caching
            timeout: Custom timeout in seconds
            
        Returns:
            The API response
        """
        self.call_count += 1
        model = model or self.primary_model
        
        # Calculate adaptive timeout if not specified
        if timeout is None:
            timeout = calculate_adaptive_timeout(prompt, model, self.base_timeout)
        
        # Check if we have a cached response
        if use_cache:
            cache_key = self.generate_cache_key(prompt, model, temperature, max_tokens)
            if cache_key in self.cache:
                self.cache_hits += 1
                logging.info(f"Cache hit for prompt: {prompt[:30]}...")
                return self.cache[cache_key]
        
        # Apply rate limiting
        self.rate_limiter.wait_if_needed()
        
        # Count input tokens for tracking
        from token_counter import count_tokens, count_message_tokens
        if isinstance(prompt, list):
            # It's a message list
            input_tokens = count_message_tokens(prompt)
        else:
            # It's a string prompt
            if system_prompt:
                # Account for system prompt
                input_tokens = count_tokens(prompt) + count_tokens(system_prompt) + 10  # 10 extra for message format
            else:
                input_tokens = count_tokens(prompt)
        
        # Start timing
        start_time = time.time()
        
        # Try the primary model first
        try:
            response = call_openrouter(
                prompt=prompt,
                model=model,
                temperature=temperature,
                max_tokens=max_tokens,
                system_prompt=system_prompt
            )
            
            # Process and log the response
            end_time = time.time()
            self.logger.log_api_call(model, prompt, response, start_time, end_time)
            
            # If successful, track usage and cache the response
            if response.get("success", False):
                output_text = extract_text_response(response)
                output_tokens = count_tokens(output_text)
                
                # Track usage
                usage_info = self.usage_tracker.track_usage(
                    model=model,
                    input_tokens=input_tokens,
                    output_tokens=output_tokens,
                    success=True
                )
                
                response["usage_info"] = usage_info
                
                # Cache the response if requested
                if use_cache:
                    cache_key = self.generate_cache_key(prompt, model, temperature, max_tokens)
                    self.cache[cache_key] = response
                
                return response
            else:
                # If the primary model failed, try fallbacks
                self.error_count += 1
                return self._try_fallbacks(
                    prompt=prompt, 
                    temperature=temperature, 
                    max_tokens=max_tokens,
                    system_prompt=system_prompt,
                    input_tokens=input_tokens
                )
        
        except Exception as e:
            # Handle exceptions and try fallbacks
            self.error_count += 1
            logging.error(f"Error with primary model {model}: {e}")
            
            return self._try_fallbacks(
                prompt=prompt, 
                temperature=temperature, 
                max_tokens=max_tokens,
                system_prompt=system_prompt,
                input_tokens=input_tokens
            )
    
    def _try_fallbacks(self, prompt, temperature, max_tokens, system_prompt, input_tokens):
        """Try fallback models if the primary model fails."""
        for model in self.fallback_models:
            logging.info(f"Trying fallback model: {model}")
            
            # Apply rate limiting
            self.rate_limiter.wait_if_needed()
            
            # Start timing
            start_time = time.time()
            
            try:
                response = call_openrouter(
                    prompt=prompt,
                    model=model,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    system_prompt=system_prompt
                )
                
                # Process and log the response
                end_time = time.time()
                self.logger.log_api_call(model, prompt, response, start_time, end_time)
                
                if response.get("success", False):
                    logging.info(f"Fallback to {model} successful")
                    
                    # Track usage
                    output_text = extract_text_response(response)
                    from token_counter import count_tokens
                    output_tokens = count_tokens(output_text)
                    
                    usage_info = self.usage_tracker.track_usage(
                        model=model,
                        input_tokens=input_tokens,
                        output_tokens=output_tokens,
                        success=True
                    )
                    
                    response["usage_info"] = usage_info
                    response["used_fallback"] = True
                    
                    return response
            
            except Exception as e:
                logging.error(f"Error with fallback model {model}: {e}")
        
        # If all fallbacks failed, return an error
        logging.error("All models failed")
        return {
            "success": False, 
            "error": "All models failed", 
            "model": "none"
        }
    
    def get_client_stats(self):
        """Get statistics about client usage."""
        success_rate = 0 if self.call_count == 0 else ((self.call_count - self.error_count) / self.call_count) * 100
        cache_hit_rate = 0 if self.call_count == 0 else (self.cache_hits / self.call_count) * 100
        
        stats = f"""Client Statistics:  
        Total Calls: {self.call_count}  
        Success Rate: {success_rate:.1f}%  
        Error Count: {self.error_count}  
        Cache Hit Rate: {cache_hit_rate:.1f}%  
        """
        
        # Add the usage report
        stats += "\n" + self.usage_tracker.get_usage_report()
        
        return stats

# Create a resilient client
resilient_client = ResilientLLMClient(
    primary_model="openai/gpt-4o-mini-2024-07-18",
    daily_budget=5.0,
    requests_per_minute=10
)

# Test the client with a few requests
test_prompts = [
    "What is artificial intelligence?",
    "Explain machine learning in simple terms",
    "What is artificial intelligence?"  # Repeat to test caching
]

for prompt in test_prompts:
    print(f"\nSending prompt: '{prompt}'")
    response = resilient_client.call(prompt, max_tokens=100)
    
    if response.get("success", False):
        print(f"Got response using model: {response.get('model', 'Unknown')}")
        print(f"Response: {extract_text_response(response)[:100]}...")
        
        # Show if it was a fallback
        if response.get("used_fallback", False):
            print("(Used fallback model)")
        
        # Show if it was from cache
        if resilient_client.cache_hits > 0 and resilient_client.cache_hits == test_prompts.index(prompt) + 1:
            print("(From cache)")
    else:
        print(f"Failed: {response.get('error', 'Unknown error')}")

# Show client statistics
print("\n" + "=" * 50 + "\n")
print(resilient_client.get_client_stats())

## 8. Best Practices for Production Systems

For production systems, there are additional considerations for error handling:

### 8.1 Error Handling Best Practices

1. **Health Checks and Circuit Breakers**
   - Implement health checks to proactively detect API issues
   - Use circuit breakers to prevent cascading failures

2. **Graceful Degradation**
   - Design your application to function with reduced capabilities when APIs are unavailable
   - Provide clear error messages to users

3. **Comprehensive Monitoring**
   - Set up alerting for error rates and latency spikes
   - Monitor quotas and resource usage

4. **Structured Error Handling**
   - Create specific exception types for different error categories
   - Standardize error reporting across your application

5. **Request Idempotency**
   - Add request IDs to ensure retries don't result in duplicate operations
   - Implement idempotency keys for critical operations

6. **Dedicated Reliability Service**
   - Consider building a dedicated service for handling API reliability
   - Centralize error handling logic

7. **Documentation and Runbooks**
   - Document common failure modes and their solutions
   - Create runbooks for handling different types of API outages

## 9. Exercises

Here are some exercises to practice error handling techniques:

1. **Enhanced Rate Limiter**: Implement a more sophisticated rate limiter that adjusts based on observed error rates

2. **Circuit Breaker**: Implement a circuit breaker pattern for the API client

3. **Error Categorization**: Create a system that categorizes errors into actionable categories with specific handling strategies

4. **Cost Optimization**: Enhance the usage tracker to automatically select the most cost-effective model for different types of prompts

5. **Distributed Rate Limiting**: Implement a distributed rate limiter that works across multiple application instances

## 10. Summary

In this notebook, we've explored comprehensive error handling strategies for LLM APIs, including:

- Understanding different types of API errors
- Implementing exponential backoff and retry logic
- Managing rate limits through client-side throttling
- Setting up appropriate timeouts and timeout handling
- Creating fallback chains across models and parameters
- Logging API calls and monitoring usage
- Building a resilient API client that incorporates all these techniques

These strategies will help you build more reliable and robust applications that use LLM APIs, ensuring your systems remain operational even when facing API issues.