# 09 - Production Deployment

**Deploy AI applications to production.**

## Learning Objectives

By the end of this notebook, you will:
- Design production-ready APIs
- Implement caching and optimization
- Add monitoring and observability
- Manage costs effectively

## Table of Contents

1. [API Design](#api)
2. [Caching](#caching)
3. [Rate Limiting](#ratelimit)
4. [Monitoring](#monitoring)
5. [Cost Management](#costs)
6. [Exercises](#exercises)
7. [Checkpoint](#checkpoint)

In [None]:
# GUIDED: Setup
import os
import sys
import time
import json
from pathlib import Path
from datetime import datetime

sys.path.append(str(Path.cwd().parent))

from dotenv import load_dotenv
load_dotenv(Path.cwd().parent / ".env")

print("Setup complete!")

---
## 1. API Design <a id='api'></a>

### Best Practices for AI APIs:
- **Streaming**: Return partial results for better UX
- **Async**: Don't block on long operations
- **Versioning**: Support multiple API versions
- **Error handling**: Clear, actionable error messages

In [None]:
# GUIDED: FastAPI example for AI endpoint
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum

# Request/Response models
class ChatRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=10000)
    system_prompt: Optional[str] = None
    temperature: float = Field(default=0.7, ge=0, le=2)
    max_tokens: int = Field(default=1000, ge=1, le=4000)

class ChatResponse(BaseModel):
    message: str
    tokens_used: int
    processing_time_ms: float
    request_id: str

class ErrorResponse(BaseModel):
    error: str
    error_code: str
    request_id: str

# Example validation
request = ChatRequest(
    message="Hello, how are you?",
    temperature=0.8
)
print(f"Valid request: {request.model_dump()}")

In [None]:
# GUIDED: FastAPI application structure
api_code = '''
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
import uuid
import time

app = FastAPI(title="AI Chat API", version="1.0.0")

@app.post("/v1/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """Send a message and get a response."""
    request_id = str(uuid.uuid4())
    start_time = time.time()
    
    try:
        # Call LLM
        from src.llm_utils import LLMClient
        client = LLMClient(provider="openai", model="gpt-4o-mini")
        
        response = client.chat(
            message=request.message,
            system=request.system_prompt
        )
        
        processing_time = (time.time() - start_time) * 1000
        
        return ChatResponse(
            message=response,
            tokens_used=client.get_stats().total_input_tokens + client.get_stats().total_output_tokens,
            processing_time_ms=processing_time,
            request_id=request_id
        )
        
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail={"error": str(e), "error_code": "LLM_ERROR", "request_id": request_id}
        )

@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
    """Stream a response token by token."""
    async def generate():
        from src.llm_utils import LLMClient
        client = LLMClient(provider="openai", model="gpt-4o-mini")
        
        for chunk in client.stream(request.message, system=request.system_prompt):
            yield f"data: {chunk}\\n\\n"
        yield "data: [DONE]\\n\\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")
'''

print("FastAPI application structure:")
print(api_code)

---
## 2. Caching <a id='caching'></a>

In [None]:
# GUIDED: Implement semantic caching
import hashlib
from dataclasses import dataclass, field
from typing import Optional
import time

@dataclass
class CacheEntry:
    key: str
    value: str
    created_at: float
    ttl: float  # Time to live in seconds
    
    def is_expired(self) -> bool:
        return time.time() > self.created_at + self.ttl

class SimpleCache:
    """Simple in-memory cache with TTL."""
    
    def __init__(self, default_ttl: float = 3600):
        self.cache: dict[str, CacheEntry] = {}
        self.default_ttl = default_ttl
        self.hits = 0
        self.misses = 0
    
    def _hash_key(self, text: str) -> str:
        """Create a hash key from text."""
        return hashlib.sha256(text.encode()).hexdigest()[:16]
    
    def get(self, key: str) -> Optional[str]:
        """Get value from cache."""
        hash_key = self._hash_key(key)
        
        if hash_key in self.cache:
            entry = self.cache[hash_key]
            if not entry.is_expired():
                self.hits += 1
                return entry.value
            else:
                del self.cache[hash_key]
        
        self.misses += 1
        return None
    
    def set(self, key: str, value: str, ttl: Optional[float] = None):
        """Set value in cache."""
        hash_key = self._hash_key(key)
        self.cache[hash_key] = CacheEntry(
            key=hash_key,
            value=value,
            created_at=time.time(),
            ttl=ttl or self.default_ttl
        )
    
    def stats(self) -> dict:
        """Get cache statistics."""
        total = self.hits + self.misses
        hit_rate = self.hits / total if total > 0 else 0
        return {
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": hit_rate,
            "size": len(self.cache)
        }

# Test the cache
cache = SimpleCache(default_ttl=60)

cache.set("What is Python?", "Python is a programming language.")
print(f"Cached value: {cache.get('What is Python?')}")
print(f"Cache miss: {cache.get('What is Java?')}")
print(f"Stats: {cache.stats()}")

In [None]:
# GUIDED: Cached LLM client
from src.llm_utils import LLMClient

class CachedLLMClient:
    """LLM client with caching."""
    
    def __init__(self, client: LLMClient, cache: SimpleCache):
        self.client = client
        self.cache = cache
    
    def chat(self, message: str, system: Optional[str] = None) -> str:
        # Create cache key from message + system
        cache_key = f"{system or ''}::{message}"
        
        # Check cache
        cached = self.cache.get(cache_key)
        if cached:
            print("[CACHE HIT]")
            return cached
        
        # Call LLM
        print("[CACHE MISS - calling LLM]")
        response = self.client.chat(message, system=system)
        
        # Cache result
        self.cache.set(cache_key, response)
        
        return response

# Test cached client
base_client = LLMClient(provider="openai", model="gpt-4o-mini")
cached_client = CachedLLMClient(base_client, cache)

# First call - cache miss
response1 = cached_client.chat("What is 2+2?")
print(f"Response: {response1}\n")

# Second call - cache hit
response2 = cached_client.chat("What is 2+2?")
print(f"Response: {response2}")

---
## 3. Rate Limiting <a id='ratelimit'></a>

In [None]:
# GUIDED: Token bucket rate limiter
import time
from dataclasses import dataclass

@dataclass
class RateLimiter:
    """Token bucket rate limiter."""
    
    requests_per_minute: int
    tokens: float = 0
    last_update: float = 0
    
    def __post_init__(self):
        self.tokens = float(self.requests_per_minute)
        self.last_update = time.time()
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_update
        self.tokens = min(
            self.requests_per_minute,
            self.tokens + (elapsed * self.requests_per_minute / 60)
        )
        self.last_update = now
    
    def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire tokens. Returns True if allowed."""
        self._refill()
        
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False
    
    def wait_and_acquire(self, tokens: int = 1, timeout: float = 30) -> bool:
        """Wait for tokens to become available."""
        start = time.time()
        
        while time.time() - start < timeout:
            if self.acquire(tokens):
                return True
            time.sleep(0.1)
        
        return False

# Test rate limiter
limiter = RateLimiter(requests_per_minute=10)

for i in range(12):
    if limiter.acquire():
        print(f"Request {i+1}: Allowed")
    else:
        print(f"Request {i+1}: Rate limited!")

---
## 4. Monitoring <a id='monitoring'></a>

In [None]:
# GUIDED: Simple metrics tracking
from dataclasses import dataclass, field
from collections import defaultdict
from datetime import datetime
import statistics

@dataclass
class MetricsCollector:
    """Collect and report metrics."""
    
    counters: dict = field(default_factory=lambda: defaultdict(int))
    gauges: dict = field(default_factory=dict)
    histograms: dict = field(default_factory=lambda: defaultdict(list))
    
    def increment(self, name: str, value: int = 1):
        """Increment a counter."""
        self.counters[name] += value
    
    def gauge(self, name: str, value: float):
        """Set a gauge value."""
        self.gauges[name] = value
    
    def histogram(self, name: str, value: float):
        """Record a histogram value."""
        self.histograms[name].append(value)
    
    def report(self) -> dict:
        """Generate metrics report."""
        report = {
            "timestamp": datetime.now().isoformat(),
            "counters": dict(self.counters),
            "gauges": dict(self.gauges),
            "histograms": {}
        }
        
        for name, values in self.histograms.items():
            if values:
                report["histograms"][name] = {
                    "count": len(values),
                    "min": min(values),
                    "max": max(values),
                    "mean": statistics.mean(values),
                    "p50": statistics.median(values),
                    "p95": statistics.quantiles(values, n=20)[18] if len(values) >= 20 else max(values),
                }
        
        return report

# Test metrics
metrics = MetricsCollector()

# Simulate some requests
import random
for i in range(100):
    metrics.increment("requests_total")
    metrics.histogram("response_time_ms", random.gauss(200, 50))
    if random.random() < 0.05:
        metrics.increment("errors_total")

metrics.gauge("cache_size", 150)
metrics.gauge("active_connections", 10)

report = metrics.report()
print(json.dumps(report, indent=2))

In [None]:
# GUIDED: Instrumented LLM client
class InstrumentedLLMClient:
    """LLM client with metrics."""
    
    def __init__(self, client: LLMClient, metrics: MetricsCollector):
        self.client = client
        self.metrics = metrics
    
    def chat(self, message: str, system: Optional[str] = None) -> str:
        self.metrics.increment("llm_requests_total")
        start = time.time()
        
        try:
            response = self.client.chat(message, system=system)
            
            # Record metrics
            latency = (time.time() - start) * 1000
            self.metrics.histogram("llm_latency_ms", latency)
            
            stats = self.client.get_stats()
            self.metrics.gauge("llm_total_tokens", stats.total_input_tokens + stats.total_output_tokens)
            self.metrics.gauge("llm_total_cost", stats.total_cost)
            
            return response
            
        except Exception as e:
            self.metrics.increment("llm_errors_total")
            raise

# Test instrumented client
metrics = MetricsCollector()
base_client = LLMClient(provider="openai", model="gpt-4o-mini")
instrumented = InstrumentedLLMClient(base_client, metrics)

# Make some calls
for q in ["What is 1+1?", "What is 2+2?", "What is 3+3?"]:
    instrumented.chat(q)

print("Metrics after 3 calls:")
print(json.dumps(metrics.report(), indent=2))

---
## 5. Cost Management <a id='costs'></a>

In [None]:
# GUIDED: Cost tracking and budgets
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class Budget:
    """Track spending against a budget."""
    
    daily_limit: float
    monthly_limit: float
    daily_spent: float = 0
    monthly_spent: float = 0
    last_daily_reset: datetime = field(default_factory=datetime.now)
    last_monthly_reset: datetime = field(default_factory=datetime.now)
    
    def _maybe_reset(self):
        """Reset counters if needed."""
        now = datetime.now()
        
        # Reset daily
        if now.date() > self.last_daily_reset.date():
            self.daily_spent = 0
            self.last_daily_reset = now
        
        # Reset monthly
        if now.month != self.last_monthly_reset.month:
            self.monthly_spent = 0
            self.last_monthly_reset = now
    
    def can_spend(self, amount: float) -> bool:
        """Check if spending is within budget."""
        self._maybe_reset()
        
        if self.daily_spent + amount > self.daily_limit:
            return False
        if self.monthly_spent + amount > self.monthly_limit:
            return False
        return True
    
    def record_spend(self, amount: float):
        """Record spending."""
        self._maybe_reset()
        self.daily_spent += amount
        self.monthly_spent += amount
    
    def status(self) -> dict:
        """Get budget status."""
        self._maybe_reset()
        return {
            "daily": {
                "spent": self.daily_spent,
                "limit": self.daily_limit,
                "remaining": self.daily_limit - self.daily_spent,
                "percent_used": (self.daily_spent / self.daily_limit) * 100
            },
            "monthly": {
                "spent": self.monthly_spent,
                "limit": self.monthly_limit,
                "remaining": self.monthly_limit - self.monthly_spent,
                "percent_used": (self.monthly_spent / self.monthly_limit) * 100
            }
        }

# Test budget
budget = Budget(daily_limit=10.0, monthly_limit=100.0)

# Simulate spending
costs = [0.01, 0.02, 0.015, 0.03]
for cost in costs:
    if budget.can_spend(cost):
        budget.record_spend(cost)
        print(f"Spent ${cost:.3f}")
    else:
        print(f"Budget exceeded for ${cost:.3f}")

print(f"\nBudget status: {json.dumps(budget.status(), indent=2)}")

In [None]:
# GUIDED: Cost-aware routing
from src.llm_utils import estimate_cost

class CostAwareRouter:
    """Route requests to different models based on complexity and budget."""
    
    def __init__(self, budget: Budget):
        self.budget = budget
        self.models = {
            "cheap": "gpt-4o-mini",
            "expensive": "gpt-4o"
        }
    
    def route(self, message: str, prefer_quality: bool = False) -> str:
        """Select appropriate model."""
        # Estimate tokens (rough)
        estimated_tokens = len(message.split()) * 1.5
        
        # Check budget
        status = self.budget.status()
        budget_remaining_pct = 100 - status["daily"]["percent_used"]
        
        # Decision logic
        if budget_remaining_pct < 20:
            return self.models["cheap"]
        
        if prefer_quality and budget_remaining_pct > 50:
            return self.models["expensive"]
        
        return self.models["cheap"]

# Test router
router = CostAwareRouter(budget)

print(f"Normal request: {router.route('Simple question')}")
print(f"Quality request: {router.route('Complex question', prefer_quality=True)}")

---
## 6. Exercises <a id='exercises'></a>

### Exercise 1: Build Complete API

Create a production-ready API with all components.

In [None]:
# TODO: Combine caching, rate limiting, metrics, and budget into one API

# Your code here:


### Exercise 2: Alerting System

Build an alerting system for errors and budget thresholds.

In [None]:
# TODO: Create alerts for high error rates, budget warnings, latency spikes

# Your code here:


---
## 7. Checkpoint <a id='checkpoint'></a>

Before moving on, verify:

- [ ] You can design production APIs
- [ ] You implemented caching
- [ ] You understand rate limiting
- [ ] You can track metrics and costs

### Next Steps

In the final notebook, we'll build a **Complete AI Application** as a capstone project!