# TEORÍA - SESIÓN 4: Testing e Integración con IA Generativa

**Duración:** 180 minutos

**Objetivos:**
- Implementar tests automatizados con pytest y TestClient
- Mockear dependencias y autenticación en tests
- Integrar APIs de IA generativa en FastAPI
- Controlar tokens, costes y rate limiting
- Manejar errores y timeouts en llamadas a IA

## CONFIGURACIÓN DEL ENTORNO

In [None]:
# Instalación de dependencias
!pip install fastapi==0.115.0 uvicorn[standard]==0.32.0 pytest==8.0.0 httpx==0.27.0 google-generativeai openai==1.12.0 python-dotenv==1.0.0 ipytest -q
print(" Dependencias instaladas")

### Sobre las herramientas instaladas

**ipytest:**
- Permite ejecutar tests de pytest directamente en notebooks de Jupyter
- Sin ipytest: Tendrías que crear archivos .py separados y ejecutar `pytest test_api.py` en terminal
- Con ipytest: Puedes escribir y ejecutar tests en la misma celda del notebook
- Útil para aprendizaje y prototipado rápido

**google-generativeai:**
- SDK oficial de Google para usar modelos Gemini
- Alternativa a Google Gemini, con API gratuita para desarrollo
- Usaremos Gemini 2.5 Flash para los ejemplos de integración con IA

**Flujo normal en producción:**
```bash
# 1. Crear archivo de tests
# test_api.py

# 2. Ejecutar tests desde terminal
pytest test_api.py -v

# 3. Ver resultados en consola
```

En este notebook usamos ipytest para ver los resultados inmediatamente.

### Servidor

Una función para ejecutar el servidor sin tener que reiniciar el kernel

In [None]:
import uvicorn
import threading
import time
import logging

# Variable global para guardar la referencia al servidor activo
# (Esto sobrevive entre ejecuciones de celdas)
if "active_server" not in globals():
    active_server = None


def run_api(app_instance, port=8000):
    global active_server

    logging.getLogger("uvicorn").setLevel(logging.CRITICAL)
    logging.getLogger("uvicorn.error").setLevel(logging.CRITICAL)
    logging.getLogger("uvicorn.access").setLevel(logging.CRITICAL)

    # 1. SI YA HAY UN SERVIDOR, LO APAGAMOS
    if active_server:
        active_server.should_exit = True
        active_server.force_exit = True

        # Esperamos a que libere el puerto (max 3 segundos)
        for _ in range(30):
            if not active_server.started:
                break
            time.sleep(0.1)

    # 2. CONFIGURAMOS EL NUEVO
    config = uvicorn.Config(
        app=app_instance, host="127.0.0.1", port=port, log_level="warning"
    )
    server = uvicorn.Server(config)

    # Guardamos la referencia global
    active_server = server

    # 3. ARRANCAMOS EN UN HILO APARTE
    thread = threading.Thread(target=server.run)
    thread.daemon = True
    thread.start()

    time.sleep(1)
    print(f" Servidor iniciado en http://127.0.0.1:{port}")
    print(f" Documentación: http://127.0.0.1:{port}/docs")


print(" Definida correctamente")

---

# BLOQUE 1: TESTING

## 1. INTRODUCCIÓN AL TESTING

### ¿Por qué testear APIs?

Los tests automatizados son esenciales en producción:

1. **Detectar bugs antes de desplegar** → Evitar errores en producción
2. **Documentación viva** → Los tests muestran cómo usar la API
3. **Refactorización segura** → Cambiar código sin miedo a romper funcionalidad
4. **CI/CD** → Integración continua con validación automática

### TestClient de FastAPI

FastAPI incluye `TestClient` basado en `httpx` para hacer requests sin levantar un servidor real.

**Ventajas:**
- No necesitas `uvicorn` corriendo
- Tests rápidos (milisegundos)
- Aislamiento total (cada test limpio)

### Estructura básica con pytest

```python

# test_api.py
from fastapi.testclient import TestClient

def test_nombre_descriptivo():
    # Arrange (preparar)
    client = TestClient(app)
    
    # Act (actuar)
    response = client.get("/ruta")
    
    # Assert (verificar)
    assert response.status_code == 200
    
```

### Ejemplo básico: API simple con test

In [None]:
# API simple
from fastapi import FastAPI
from fastapi import status
from fastapi.testclient import TestClient
app = FastAPI()

@app.get("/")
def root():
    return {"mensaje": "API funcionando"}

@app.get("/saludo/{nombre}")
def saludar(nombre: str):
    return {"saludo": f"Hola {nombre}"}

# Test con TestClient
client = TestClient(app)

# Test 1: Endpoint raíz
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"mensaje": "API funcionando"}
print(" Test 1 pasado: Endpoint raíz funciona")

# Test 2: Endpoint con parámetro
response = client.get("/saludo/Ana")
assert response.status_code == 200
assert response.json()["saludo"] == "Hola Ana"
print(" Test 2 pasado: Saludo personalizado funciona")

# Iniciar servidor
run_api(app, port=8500)

---

## 2. TESTS BÁSICOS

### Status codes

Los tests más comunes verifican los códigos HTTP correctos:

- **200 OK** → GET exitoso
- **201 Created** → POST crea recurso
- **400 Bad Request** → Datos inválidos
- **401 Unauthorized** → Sin autenticación
- **404 Not Found** → Recurso no existe
- **422 Unprocessable Entity** → Validación Pydantic falla

In [None]:
# API con validación Pydantic
from fastapi import FastAPI
from fastapi import status
from fastapi.testclient import TestClient
from pydantic import BaseModel
app_validacion = FastAPI()

class Item(BaseModel):
    nombre: str
    precio: float
    cantidad: int = 1

@app_validacion.post("/items", status_code=201)
def crear_item(item: Item):
    return {"item_creado": item.model_dump()}

# Tests de status codes
client_val = TestClient(app_validacion)

# Test 1: POST exitoso retorna 201
response = client_val.post("/items", json={"nombre": "Laptop", "precio": 999.99})
assert response.status_code == 201
print(" Test: POST exitoso retorna 201")

# Test 2: POST con datos inválidos retorna 422
response = client_val.post("/items", json={"nombre": "Laptop"})  # Falta precio (obligatorio)
assert response.status_code == 422
print(" Test: Validación Pydantic retorna 422")

# Test 3: Verificar estructura de error 422
error_detail = response.json()["detail"]
assert len(error_detail) > 0  # Hay al menos un error
assert error_detail[0]["type"] == "missing"  # Campo faltante
print(" Test: Error 422 tiene estructura correcta")

# Iniciar servidor
run_api(app, port=8501)

### Validación de estructura de respuesta

Además del status code, debemos verificar que el JSON retornado tenga la estructura esperada.

In [None]:
# Test completo de estructura
from fastapi import status
response = client_val.post("/items", json={
    "nombre": "Mouse",
    "precio": 25.50,
    "cantidad": 3
})

assert response.status_code == 201
data = response.json()

# Verificar que existe la clave "item_creado"
assert "item_creado" in data

# Verificar estructura interna
item = data["item_creado"]
assert item["nombre"] == "Mouse"
assert item["precio"] == 25.50
assert item["cantidad"] == 3

print(" Test: Estructura de respuesta correcta")

### MICRO-RETO 1: Test de endpoint con error

Crea un test que verifique que un endpoint retorna 404 cuando se busca un recurso inexistente.

In [None]:
# API con endpoint que puede retornar 404
from fastapi import FastAPI
from fastapi import HTTPException
from fastapi import status
from fastapi.testclient import TestClient
app_404 = FastAPI()

ITEMS_DB = {
    1: {"nombre": "Laptop", "precio": 999.99},
    2: {"nombre": "Mouse", "precio": 25.50}
}

@app_404.get("/items/{item_id}")
def obtener_item(item_id: int):
    item = ITEMS_DB.get(item_id)
    if not item:
        raise HTTPException(status_code=404, detail="Item no encontrado")
    return item

# TODO: Completa el test
client_404 = TestClient(app_404)

# Test 1: Item existente retorna 200
# TODO: Completa aquí

# Test 2: Item inexistente retorna 404
# TODO: Completa aquí

# Test 3: Verificar mensaje de error
# TODO: Completa aquí

# Iniciar servidor
run_api(app, port=8502)


---

## 3. FIXTURES Y MOCKING

### ¿Qué son las fixtures en pytest?

Las **fixtures** son funciones reutilizables que preparan el entorno para tests.

**Ventajas:**
- Evitar código duplicado
- Setup/teardown automático
- Tests más limpios y legibles

**Ejemplo básico:**
```python
import pytest

@pytest.fixture
def datos_usuario():
    return {"nombre": "Ada", "edad": 8}


def test_edad(datos_usuario):
    assert datos_usuario["edad"] == 8
```

In [None]:
import ipytest
ipytest.autoconfig()

In [None]:
%%ipytest
#  IMPORTANTE: Tienes que importar TODO lo que uses despues del %%ipytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
import pytest

# --- DEFINICIÓN DE LA APP ---
app = FastAPI()

@app.get("/items")
def leer_items():
    return ["manzana", "banana", "cereza"]

# --- LA FIXTURE ---
@pytest.fixture
def cliente_test():
    """
    Prepara el entorno y entrega el cliente.
    """
    return TestClient(app)

# --- TEST BUENO ---
def test_leer_items(cliente_test):
    # Pytest inyecta 'cliente_test' automáticamente
    response = cliente_test.get("/items")
    
    assert response.status_code == 200
    assert len(response.json()) == 3
    assert "manzana" in response.json()

# --- TEST QUE FALLA ---
def test_leer_items2(cliente_test):
    response = cliente_test.get("/items")
    assert response.status_code == 200
    # CAMBIO AQUÍ: Esperamos algo que no existe
    assert "piña" in response.json()

### app.dependency_overrides: Mockear dependencias

En tests, a menudo queremos **mockear** (simular) dependencias como autenticación, bases de datos, etc.

FastAPI tiene `app.dependency_overrides` para reemplazar dependencias en tests.

**Caso de uso típico:** Mockear autenticación para no necesitar tokens reales.

In [None]:
# API con autenticación
from fastapi import Depends
from fastapi import FastAPI
from fastapi import HTTPException
from fastapi import status
from fastapi.testclient import TestClient
from pydantic import BaseModel
from typing import Annotated
app_auth = FastAPI()

class User(BaseModel):
    username: str
    email: str

# Dependencia real (simula validar token)
def get_current_user() -> User:
    # En producción: validaría JWT, buscaría en BD, etc.
    raise HTTPException(status_code=401, detail="No autenticado")

CurrentUser = Annotated[User, Depends(get_current_user)]

@app_auth.get("/perfil")
def obtener_perfil(current_user: CurrentUser):
    return current_user

# SIN MOCK: Endpoint retorna 401
client_auth = TestClient(app_auth)
response = client_auth.get("/perfil")
assert response.status_code == 401
print(" Sin mock: Retorna 401 como esperado")

In [None]:
# CON MOCK: Reemplazar dependencia
from fastapi import status
def mock_get_current_user() -> User:
    """Usuario mockeado para tests"""
    return User(username="testuser", email="test@example.com")

# Reemplazar dependencia SOLO en tests
app_auth.dependency_overrides[get_current_user] = mock_get_current_user

# Ahora el endpoint retorna 200 con usuario mockeado
response = client_auth.get("/perfil")
assert response.status_code == 200
assert response.json()["username"] == "testuser"
print(" Con mock: Retorna 200 con usuario mockeado")

# IMPORTANTE: Limpiar después del test
app_auth.dependency_overrides.clear()
print(" Override limpiado")

### Patrón completo: Test con fixture + mock

In [None]:
%%ipytest
from fastapi import Depends
from fastapi import FastAPI
from fastapi import HTTPException
from fastapi import status
from fastapi.testclient import TestClient
from typing import Annotated
import pytest

# Aplicación de ejemplo
app = FastAPI()

# Dependencia real
def obtener_usuario():
    raise HTTPException(status_code=401, detail="No autenticado")

# Annotated reutilizable
UsuarioActual = Annotated[dict, Depends(obtener_usuario)]

@app.get("/protegido")
def ruta_protegida(usuario: UsuarioActual):
    return {"mensaje": f"Hola {usuario['nombre']}"}

# Fixture con mock
@pytest.fixture
def cliente_con_mock():
    """Fixture que retorna cliente con autenticación mockeada"""
    
    # Mock de la dependencia
    def mock_usuario():
        return {"id": 1, "nombre": "usuario_prueba"}
    
    # Reemplazar dependencia
    app.dependency_overrides[obtener_usuario] = mock_usuario
    
    cliente = TestClient(app)
    yield cliente
    
    # Limpieza
    app.dependency_overrides.clear()

# Test usando la fixture
def test_protegido_con_mock(cliente_con_mock):
    respuesta = cliente_con_mock.get("/protegido")
    assert respuesta.status_code == 200
    assert "usuario_prueba" in respuesta.json()["mensaje"]

###  MICRO-RETO 2: Mockear base de datos

Crea un test que mockee una función de "buscar en BD" para retornar datos fake.

In [None]:
from fastapi import Depends
from fastapi import FastAPI
app_db = FastAPI()

# Dependencia que simula consulta a BD
def get_db_connection():
    # En producción: retornaría conexión real a PostgreSQL, etc.
    raise Exception("BD no disponible en tests")

@app_db.get("/productos")
def listar_productos(db=Depends(get_db_connection)):
    # En producción: db.query(...)
    return db

# TODO: Crea una función mock_db que retorne una lista fake de productos
def mock_db():
    # TODO: Retorna [{"id": 1, "nombre": "Laptop"}, {"id": 2, "nombre": "Mouse"}]
    pass

# TODO: Reemplaza la dependencia

# TODO: Haz un test GET /productos y verifica que retorna la lista fake

# TODO: Limpia el override

# Iniciar servidor
run_api(app, port=8507)


---

#  BLOQUE 2: INTEGRACIÓN CON IA GENERATIVA

## 1. INTEGRACIÓN CON APIS DE IA

### Llamada a Google Gemini

La librería `openai` (v1+) es compatible con múltiples proveedores:

- **Google Gemini** (gemini-2.5-pro, Gemini 2.5 Flash)
- **Anthropic** (Claude) → Compatible con Google Gemini SDK
- **Otros** (con base_url personalizada)

### Configuración de API keys

** IMPORTANTE:** NUNCA hardcodear API keys en el código.

In [None]:
import os
import google.generativeai as genai
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

# 1. Configuración
# Lo ideal es os.getenv("GEMINI_API_KEY"), pero para el notebook:
GOOGLE_API_KEY = "AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg" 
genai.configure(api_key=GOOGLE_API_KEY)

# 2. Definir el modelo
model_name = 'models/gemini-2.5-flash'
model = genai.GenerativeModel(model_name)

app = FastAPI()

# Modelo de datos (Input)
class PromptRequest(BaseModel):
    prompt: str

# 3. Endpoint
@app.post("/generar-texto")
def generar_texto(request: PromptRequest):
    try:
        # Llamada asíncrona a Google
        response = model.generate_content(request.prompt)
        
        # Extraer el texto limpio
        return {"respuesta": response.text}
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    

#PRUEBA DE CONEXIÓN AL MODELO
# 1. Creamos el cliente conectado a tu app
cliente = TestClient(app)

print("⏳ Enviando petición a Gemini (esto puede tardar unos segundos)...")

# 2. Hacemos la petición POST
response = cliente.post(
    "/generar-texto", 
    json={"prompt": "Explica en 20 palabras qué es la IA generativa"}
)

# 3. Verificamos y mostramos el resultado
if response.status_code == 200:
    data = response.json()
    print("\n RESPUESTA DE GEMINI:\n")
    print(data["respuesta"])  # Imprimimos solo el texto limpio
else:
    print(f"\n Error {response.status_code}:")
    print(response.json())


# servidor
run_api(app, port=8001)

### Parámetros avanzados de generación

Además del prompt básico, Gemini permite controlar el comportamiento de la generación:

**generation_config:**
```python
generation_config = {
    "temperature": 0.7,        # Creatividad (0-1): 0=determinista, 1=creativo
    "max_output_tokens": 100,  # Límite de tokens generados
    "top_p": 0.95,            # Nucleus sampling
    "top_k": 40               # Top-k sampling
}
```

**Parámetros clave:**
- **temperature:** Controla aleatoriedad. Bajo (0.2) = respuestas precisas y predecibles. Alto (0.9) = respuestas creativas y variadas.
- **max_output_tokens:** Límite máximo de tokens en la respuesta (equivalente a max_tokens en otros LLMs)
- **top_p y top_k:** Controlan diversidad en la selección de tokens

**System instructions:**
```python
model = genai.GenerativeModel(
    "gemini-2.5-flash",
    system_instruction="Eres un asistente experto en Python que responde de forma concisa"
)
```

Estas instrucciones guían el comportamiento general del modelo.

In [None]:
import google.generativeai as genai
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.testclient import TestClient

# Configurar Gemini con system instruction
genai.configure(api_key="AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg")

model_avanzado = genai.GenerativeModel(
    "gemini-2.5-flash",
    system_instruction="Eres un asistente técnico que explica conceptos de programación de forma clara y concisa."
)

app_avanzada = FastAPI()

class SolicitudAvanzada(BaseModel):
    prompt: str
    temperatura: float = 0.7
    max_tokens: int = 150

@app_avanzada.post("/completar-avanzado")
async def completar_con_parametros(solicitud: SolicitudAvanzada):
    """Endpoint con control de parámetros de generación"""
    try:
        # Configuración de generación
        config_generacion = genai.types.GenerationConfig(
            temperature=solicitud.temperatura,
            max_output_tokens=solicitud.max_tokens,
            top_p=0.95,
            top_k=40
        )
        
        # Llamada a Gemini con parámetros
        respuesta = await model_avanzado.generate_content_async(
            solicitud.prompt,
            generation_config=config_generacion
        )
        
        return {
            "respuesta": respuesta.text,
            "tokens_usados": respuesta.usage_metadata.total_token_count,
            "temperatura_usada": solicitud.temperatura
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Prueba con temperatura media (equilibrio entre precisión y creatividad)
cliente = TestClient(app_avanzada)

print("=== Ejemplo con parámetros de generación ===")
respuesta = cliente.post("/completar-avanzado", json={
    "prompt": "Explica qué es FastAPI en 2 frases",
    "temperatura": 0.7,
    "max_tokens": 500
})

if respuesta.status_code == 200:
    datos = respuesta.json()
    print(f"Respuesta: {datos['respuesta']}")
    print(f"Tokens usados: {datos['tokens_usados']}")
    print(f"Temperatura: {datos['temperatura_usada']}")
    print("\nPuedes experimentar con diferentes valores de temperatura (0.0-1.0)")
    print("- Temperatura baja (0.2): Respuestas más precisas y deterministas")
    print("- Temperatura alta (0.9): Respuestas más creativas y variadas")
else:
    print(f"Error {respuesta.status_code}:", respuesta.json())

=== Ejemplo con parámetros de generación ===
Respuesta: FastAPI es un framework web de Python moderno y de alto rendimiento diseñado para construir APIs RESTful de forma rápida y eficiente.

A
Tokens usados: 722
Temperatura: 0.7

Puedes experimentar con diferentes valores de temperatura (0.0-1.0)
- Temperatura baja (0.2): Respuestas más precisas y deterministas
- Temperatura alta (0.9): Respuestas más creativas y variadas


###  MICRO-RETO 3: Endpoint con prompt personalizado

Crea un endpoint `/traducir` que reciba texto y lo "traduzca" usando un system prompt específico.

In [None]:
import google.generativeai as genai
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi.testclient import TestClient

# Configurar Gemini
genai.configure(api_key="AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg")
model = genai.GenerativeModel("gemini-2.5-flash")

app_traductor = FastAPI()

# TODO: Define TraduccionRequest con texto e idioma_destino
class TraduccionRequest(BaseModel):
    pass  # Completa aquí

# TODO: Crea endpoint POST /traducir
@app_traductor.post("/traducir")
async def traducir(request: TraduccionRequest):
    # TODO: 1. Construye prompt
    
    # TODO: 2. Llama a Gemini
    
    # TODO: 3. Retorna traducción
    pass

# TODO: Test
client = TestClient(app_traductor)
response = client.post("/traducir", json={
    "texto": "Hello world",
    "idioma_destino": "español"
})
print(response.json())


# Levantar servidor
run_api(app_traductor)

{'texto_original': 'Hello world, I am learning how to test FasAPI with AI integration', 'idioma': 'español', 'traduccion': 'Hola mundo, estoy aprendiendo a probar FastAPI con integración de IA.'}
 Servidor iniciado en http://127.0.0.1:8000
 Documentación: http://127.0.0.1:8000/docs


---

## 2. CONTROL DE TOKENS Y COSTES

### Token limits

Los LLMs tienen límites de tokens:

- **gemini-2.5-flash**: 4,096 tokens (input + output)
- **gemini-2.5-pro**: 8,192 tokens
- **gemini-2.5-pro-32k**: 32,768 tokens

**1 token ≈ 0.75 palabras en inglés**

### Estimación de costes

**Precios aproximados (gemini-2.5-flash):**
- Input: $0.50 / 1M tokens
- Output: $2.50 / 1M tokens

**Ejemplo:**
- Request con 100 tokens input + 200 tokens output
- Coste: (100 × $0.50 + 200 × $2.50) / 1,000,000 = $0.00035

In [18]:
# Cálculo de costes con Gemini
PRECIOS_GEMINI = {
    "gemini-2.5-flash": {"input": 0.075 / 1_000_000, "output": 0.30 / 1_000_000},
    "gemini-2.5-pro": {"input": 1.25 / 1_000_000, "output": 5.00 / 1_000_000},
}

def calcular_coste(modelo: str, prompt_tokens: int, output_tokens: int) -> float:
    """Calcula coste en USD para modelos Gemini"""
    precios = PRECIOS_GEMINI.get(modelo, PRECIOS_GEMINI["gemini-2.5-flash"])
    
    coste_input = prompt_tokens * precios["input"]
    coste_output = output_tokens * precios["output"]
    
    return coste_input + coste_output

# Ejemplo
coste = calcular_coste("gemini-2.5-flash", prompt_tokens=100, output_tokens=200)
print(f"Coste: ${coste:.6f} USD")
print(f"Con 1000 requests: ${coste * 1000:.2f} USD")


Coste: $0.000067 USD
Con 1000 requests: $0.07 USD


### Rate limiting básico

Para controlar costes y evitar abusos, implementamos rate limiting (limitación de velocidad).

**¿Qué es rate limiting?**
- Limita cuántas peticiones puede hacer un usuario en un periodo de tiempo
- Ejemplo: Máximo 10 requests por minuto por usuario
- Si se excede → retornar error 429 (Too Many Requests)

**Parámetros clave:**
- `user_id`: Identificador único del usuario (puede ser IP, token, etc.)
- `max_requests`: Número máximo de peticiones permitidas (ej: 10)
- `window_minutes`: Ventana temporal en minutos (ej: 1 minuto)

**Patrón de ventana deslizante (sliding window):**
- Se guarda el timestamp de cada request
- Solo se cuentan los requests dentro de la ventana temporal actual
- Los requests antiguos (fuera de la ventana) se descartan automáticamente

**En producción:**
- Usar Redis o Memcached (no defaultdict en memoria)
- Implementar cleanup periódico de datos antiguos
- Considerar rate limiting por endpoint, no solo global

In [19]:
from collections import defaultdict
from datetime import datetime, timedelta

# Almacén simple en memoria (en producción: usar Redis)
# defaultdict crea automáticamente una lista vacía si la clave no existe
rate_limit_store = defaultdict(list)

def verificar_rate_limit(user_id: str, max_requests: int = 10, window_minutes: int = 1) -> bool:
    """
    Verifica si el usuario excedió el límite de requests
    
    Args:
        user_id: Identificador único del usuario
        max_requests: Máximo de peticiones permitidas
        window_minutes: Ventana temporal en minutos
    
    Returns:
        True si la petición está permitida, False si excede el límite
    """
    ahora = datetime.now()
    
    # Calcular el inicio de la ventana temporal
    # Ejemplo: si son las 10:05 y window=1, inicio es 10:04
    inicio_ventana = ahora - timedelta(minutes=window_minutes)
    
    # Filtrar solo los requests que están dentro de la ventana temporal
    # Descartamos los requests antiguos (fuera de la ventana)
    requests_recientes = [
        timestamp for timestamp in rate_limit_store[user_id]
        if timestamp > inicio_ventana
    ]
    
    # Actualizar el store solo con requests recientes
    rate_limit_store[user_id] = requests_recientes
    
    # Verificar si se excedió el límite
    if len(requests_recientes) >= max_requests:
        return False  # Rate limit excedido
    
    # Registrar este nuevo request
    rate_limit_store[user_id].append(ahora)
    return True  # Request permitido

# Simulación: Usuario hace 12 requests (límite es 10)
print("Simulación de rate limiting (límite: 10 requests/minuto)\n")

for i in range(12):
    permitido = verificar_rate_limit("usuario_123", max_requests=10, window_minutes=1)
    
    if permitido:
        print(f"Request {i+1:2d}: Permitido")
    else:
        print(f"Request {i+1:2d}: BLOQUEADO - Rate limit excedido")

# Estado final del store
print(f"\nTotal requests registrados para usuario_123: {len(rate_limit_store['usuario_123'])}")

Simulación de rate limiting (límite: 10 requests/minuto)

Request  1: Permitido
Request  2: Permitido
Request  3: Permitido
Request  4: Permitido
Request  5: Permitido
Request  6: Permitido
Request  7: Permitido
Request  8: Permitido
Request  9: Permitido
Request 10: Permitido
Request 11: BLOQUEADO - Rate limit excedido
Request 12: BLOQUEADO - Rate limit excedido

Total requests registrados para usuario_123: 10


### Integración en endpoint con rate limit

In [20]:
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from fastapi.testclient import TestClient
from typing import Annotated
from collections import defaultdict
from datetime import datetime, timedelta

# Rate limiting store
rate_limit_store = defaultdict(list)

def verificar_rate_limit(user_id: str, max_requests: int = 3) -> bool:
    """Verifica rate limit (3 requests por minuto)"""
    ahora = datetime.now()
    inicio_ventana = ahora - timedelta(minutes=1)
    
    requests_recientes = [
        t for t in rate_limit_store[user_id] if t > inicio_ventana
    ]
    rate_limit_store[user_id] = requests_recientes
    
    if len(requests_recientes) >= max_requests:
        return False
    
    rate_limit_store[user_id].append(ahora)
    return True

# Mock de Gemini para esta demostración (evita problemas de event loop)
class MockGeminiResponse:
    def __init__(self, texto):
        self.text = texto
        self.usage_metadata = type('obj', (object,), {'total_token_count': 50})

async def mock_gemini(prompt: str):
    """Mock que simula llamada a Gemini (sin event loop issues)"""
    return MockGeminiResponse(f"Respuesta simulada a: {prompt[:20]}...")

# Aplicación
app_con_rate_limit = FastAPI()

class SolicitudIA(BaseModel):
    prompt: str

@app_con_rate_limit.post("/generar")
async def generar_con_rate_limit(
    solicitud: SolicitudIA,
    user_id: Annotated[str, Header()] = "anonimo"
):
    """Endpoint con rate limiting integrado"""
    
    # Verificar rate limit ANTES de llamar a IA
    if not verificar_rate_limit(user_id, max_requests=3):
        raise HTTPException(
            status_code=429,
            detail="Rate limit excedido. Máximo 3 requests por minuto"
        )
    
    # Llamar a IA (mock para evitar problemas de event loop)
    respuesta = await mock_gemini(solicitud.prompt)
    
    return {
        "respuesta": respuesta.text,
        "requests_restantes": 3 - len(rate_limit_store[user_id])
    }

# Demostración: 4 requests (límite es 3)
cliente = TestClient(app_con_rate_limit)

for i in range(1, 5):
    response = cliente.post(
        "/generar",
        json={"prompt": "Hola"},
        headers={"user-id": "usuario_test"}
    )
    
    if response.status_code == 200:
        datos = response.json()
        print(f"Request {i}: OK - Restantes: {datos['requests_restantes']}")
    else:
        print(f"Request {i}: BLOQUEADO - {response.json()['detail']}")

Request 1: OK - Restantes: 2
Request 2: OK - Restantes: 1
Request 3: OK - Restantes: 0
Request 4: BLOQUEADO - Rate limit excedido. Máximo 3 requests por minuto


###  MICRO-RETO 4: Control de max_tokens

Modifica el endpoint para rechazar requests con `max_tokens > 500` (código 400).

In [None]:
import google.generativeai as genai
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.testclient import TestClient

# Configurar Gemini
genai.configure(api_key="AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg")
model = genai.GenerativeModel("gemini-2.5-flash")

app_validado = FastAPI()

class CompletionRequest(BaseModel):
    prompt: str
    max_tokens: int = 100

@app_validado.post("/completar")
async def completar_con_validacion(request: CompletionRequest):
    # TODO: 1. Valida que max_tokens <= 500
    
    # TODO: 2. Llama a Gemini
    
    # TODO: 3. Retorna datos con tokens usados
    pass

# TODO: Test 1 - Debe fallar (400)
client = TestClient(app_validado)
response = client.post("/completar", json={"prompt": "test", "max_tokens": 600})
assert response.status_code == 400

# TODO: Test 2 - Debe funcionar (200)
response = client.post("/completar", json={"prompt": "Explica FastAPI", "max_tokens": 100})
assert response.status_code == 200
print(response.json())

---

## 3. MANEJO DE ERRORES

### Timeouts largos

Las llamadas a APIs de IA pueden tardar varios segundos (5-15s en prompts complejos). Debemos:

1. **Configurar timeouts** para evitar requests colgados indefinidamente
2. **Manejar TimeoutError** con mensajes claros al usuario
3. **Usar asyncio.wait_for()** para limitar el tiempo de espera

**Estrategia:**
```python
try:
    response = await asyncio.wait_for(
        model.generate_content_async(prompt),
        timeout=10.0  # Máximo 10 segundos
    )
except asyncio.TimeoutError:
    raise HTTPException(status_code=504, detail="La IA tardó demasiado en responder")
```

**Códigos HTTP relevantes:**
- `504 Gateway Timeout`: El servidor upstream (Gemini) no respondió a tiempo
- `408 Request Timeout`: El cliente tardó demasiado en enviar la petición

In [None]:
import google.generativeai as genai
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.testclient import TestClient

# Configurar Gemini
genai.configure(api_key="AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg")
model = genai.GenerativeModel("gemini-2.5-flash")

app_con_timeout = FastAPI()

class SolicitudIA(BaseModel):
    prompt: str
    timeout_segundos: int = 10

@app_con_timeout.post("/generar-con-timeout")
async def generar_con_timeout(solicitud: SolicitudIA):
    """Endpoint con manejo de timeout"""
    try:
        # Configurar timeout con asyncio.wait_for
        respuesta = await asyncio.wait_for(
            model.generate_content_async(solicitud.prompt),
            timeout=solicitud.timeout_segundos
        )
        
        return {
            "respuesta": respuesta.text[:100] + "...",
            "tokens_usados": respuesta.usage_metadata.total_token_count,
            "estado": "completado"
        }
        
    except asyncio.TimeoutError:
        raise HTTPException(
            status_code=504,
            detail=f"La IA no respondió en {solicitud.timeout_segundos} segundos. Intenta con un prompt más corto."
        )
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Error inesperado: {str(e)}"
        )

# Prueba con timeout generoso (debería funcionar)
cliente = TestClient(app_con_timeout)

print("Prueba 1: Timeout de 30 segundos (debería funcionar)")
respuesta = cliente.post("/generar-con-timeout", json={
    "prompt": "Explica FastAPI",
    "timeout_segundos": 30
})

if respuesta.status_code == 200:
    print(f"  Estado: {respuesta.json()['estado']}")
    print(f"  Respuesta: {respuesta.json()['respuesta'][:50]}...")
else:
    print(f"  Error {respuesta.status_code}: {respuesta.json()['detail']}")

Prueba 1: Timeout de 30 segundos (debería funcionar)
  Estado: completado
  Respuesta: ¡Claro que sí! FastAPI es un framework web de Pyth...


### Retry logic con backoff exponencial

Si la API falla temporalmente (error 500, timeout, rate limit), podemos reintentar automáticamente con esperas crecientes.

**¿Qué es backoff exponencial?**
- Intento 1: inmediato
- Intento 2: espera 1 segundo
- Intento 3: espera 2 segundos  
- Intento 4: espera 4 segundos
- Intento 5: espera 8 segundos

**¿Por qué es útil?**
- Evita saturar la API con reintentos inmediatos
- Da tiempo al servidor para recuperarse
- Mejora la tasa de éxito en errores transitorios

**Errores que vale la pena reintentar:**
- 429 (Rate Limit)
- 500, 502, 503 (Errores del servidor)
- TimeoutError
- Errores de red temporales

**Errores que NO se deben reintentar:**
- 400 (Bad Request) - El request está mal formado
- 401 (Unauthorized) - API key inválida
- 404 (Not Found) - Endpoint no existe

In [None]:
import google.generativeai as genai
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.testclient import TestClient

# Configurar Gemini
genai.configure(api_key="AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg")
model = genai.GenerativeModel("gemini-2.5-flash")

async def llamar_gemini_con_retry(
    prompt: str, 
    max_intentos: int = 3, 
    timeout: float = 10.0
):
    """
    Llama a Gemini con retry logic y backoff exponencial
    
    Args:
        prompt: Texto a enviar a Gemini
        max_intentos: Número máximo de intentos (default: 3)
        timeout: Timeout por intento en segundos
    
    Returns:
        Respuesta de Gemini
    
    Raises:
        Exception: Si todos los intentos fallan
    """
    for intento in range(1, max_intentos + 1):
        try:
            print(f"Intento {intento}/{max_intentos}...")
            
            # Llamada con timeout
            respuesta = await asyncio.wait_for(
                model.generate_content_async(prompt),
                timeout=timeout
            )
            
            print(f"  ✓ Éxito en intento {intento}")
            return respuesta
            
        except asyncio.TimeoutError:
            print(f"  ✗ Timeout en intento {intento}")
            
            if intento == max_intentos:
                raise Exception(f"Falló después de {max_intentos} intentos (timeout)")
            
            # Backoff exponencial: 2^(intento-1) segundos
            espera = 2 ** (intento - 1)
            print(f"  ⏳ Esperando {espera}s antes del siguiente intento...")
            await asyncio.sleep(espera)
            
        except Exception as e:
            print(f"  ✗ Error en intento {intento}: {str(e)[:50]}")
            
            if intento == max_intentos:
                raise Exception(f"Falló después de {max_intentos} intentos: {str(e)}")
            
            # Backoff exponencial
            espera = 2 ** (intento - 1)
            print(f"  ⏳ Esperando {espera}s antes del siguiente intento...")
            await asyncio.sleep(espera)

# Aplicación con retry
app_con_retry = FastAPI()

class SolicitudIA(BaseModel):
    prompt: str
    max_intentos: int = 3

@app_con_retry.post("/generar-con-retry")
async def generar_con_retry(solicitud: SolicitudIA):
    """Endpoint con retry logic integrado"""
    try:
        respuesta = await llamar_gemini_con_retry(
            solicitud.prompt,
            max_intentos=solicitud.max_intentos
        )
        
        return {
            "respuesta": respuesta.text[:100] + "...",
            "tokens_usados": respuesta.usage_metadata.total_token_count
        }
        
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Error después de {solicitud.max_intentos} intentos: {str(e)}"
        )

# Prueba
cliente = TestClient(app_con_retry)

print("=== Prueba de retry logic ===\n")
respuesta = cliente.post("/generar-con-retry", json={
    "prompt": "Explica FastAPI en una frase",
    "max_intentos": 3
})

if respuesta.status_code == 200:
    print(f"\n✓ Respuesta final: {respuesta.json()['respuesta'][:80]}...")
    print(f"\n✓ ResTokens usados: {respuesta.json()['tokens_usados']}")
else:
    print(f"\n✗ Error: {respuesta.json()['detail']}")

### Logging de uso

Es crítico registrar cada llamada a IA para:
- **Monitoreo de costes:** Rastrear tokens consumidos y calcular gastos
- **Debugging:** Identificar prompts problemáticos o lentos
- **Auditoría:** Registrar quién usa la API y cuándo
- **Optimización:** Detectar patrones de uso para mejorar prompts

**Información a loggear:**
- Timestamp
- User ID
- Prompt (primeros 50 caracteres por privacidad)
- Tokens de entrada y salida
- Tiempo de respuesta
- Coste estimado
- Éxito/error

**Niveles de logging:**
- `INFO`: Llamadas exitosas
- `WARNING`: Rate limits, timeouts recuperados
- `ERROR`: Fallos permanentes

In [None]:
import google.generativeai as genai
import asyncio
import logging
from datetime import datetime
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from fastapi.testclient import TestClient
from typing import Annotated

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("gemini_api")

# Configurar Gemini
genai.configure(api_key="AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg")
model = genai.GenerativeModel("gemini-2.5-flash")

# Precios de Gemini 2.5 Flash (después de límite gratuito)
PRECIO_INPUT = 0.075 / 1_000_000  # Por token
PRECIO_OUTPUT = 0.30 / 1_000_000

async def llamar_gemini_con_logging(prompt: str, user_id: str):
    """Llamada a Gemini con logging completo"""
    inicio = datetime.now()
    
    try:
        # Log de inicio
        logger.info(
            f"[INICIO] user_id={user_id} | "
            f"prompt='{prompt[:50]}{'...' if len(prompt) > 50 else ''}'"
        )
        
        # Llamada a Gemini
        respuesta = model.generate_content(prompt)
        
        # Calcular métricas
        tiempo_ms = (datetime.now() - inicio).total_seconds() * 1000
        tokens_input = respuesta.usage_metadata.prompt_token_count
        tokens_output = respuesta.usage_metadata.candidates_token_count
        tokens_total = respuesta.usage_metadata.total_token_count
        
        # Calcular coste
        coste = (tokens_input * PRECIO_INPUT) + (tokens_output * PRECIO_OUTPUT)
        
        # Log de éxito
        logger.info(
            f"[ÉXITO] user_id={user_id} | "
            f"tokens={tokens_total} (in:{tokens_input} out:{tokens_output}) | "
            f"tiempo={tiempo_ms:.0f}ms | "
            f"coste=${coste:.6f}"
        )
        
        return respuesta, coste
        
    except Exception as e:
        # Log de error
        tiempo_ms = (datetime.now() - inicio).total_seconds() * 1000
        logger.error(
            f"[ERROR] user_id={user_id} | "
            f"tiempo={tiempo_ms:.0f}ms | "
            f"error={str(e)[:100]}"
        )
        raise

# Aplicación con logging
app_con_logging = FastAPI()

class SolicitudIA(BaseModel):
    prompt: str

@app_con_logging.post("/generar-con-logging")
async def generar_con_logging(
    solicitud: SolicitudIA,
    user_id: Annotated[str, Header()] = "anonimo"
):
    """Endpoint con logging completo de uso"""
    try:
        respuesta, coste = await llamar_gemini_con_logging(
            solicitud.prompt,
            user_id
        )
        
        return {
            "respuesta": respuesta.text[:100] + "...",
            "tokens_usados": respuesta.usage_metadata.total_token_count,
            "coste_usd": round(coste, 6)
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Prueba
cliente = TestClient(app_con_logging)

print("=== Prueba de logging (revisa los logs arriba) ===\n")
respuesta = cliente.post(
    "/generar-con-logging",
    json={"prompt": "Explica FastAPI en 2 frases"},
    headers={"user-id": "usuario_123"}
)

if respuesta.status_code == 200:
    datos = respuesta.json()
    print(f"Tokens: {datos['tokens_usados']}")
    print(f"Coste: ${datos['coste_usd']}")
else:
    print(f"Error: {respuesta.json()['detail']}")

run_api(app_con_logging)

### Endpoint completo con mejores prácticas

Integrando todo lo aprendido: rate limiting, timeout, retry, logging y manejo de errores.

In [None]:
import google.generativeai as genai
import asyncio
import logging
from datetime import datetime, timedelta
from collections import defaultdict
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from fastapi.testclient import TestClient
from typing import Annotated

# Configuración
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("api_completa")

genai.configure(api_key="AIzaSyBS7oO2Bw7GbVOBz1xOh5XHSMaSSXr6xkg")
model = genai.GenerativeModel("gemini-2.5-flash")

# Rate limiting
rate_limit_store = defaultdict(list)

def verificar_rate_limit(user_id: str, max_requests: int = 5) -> bool:
    ahora = datetime.now()
    inicio_ventana = ahora - timedelta(minutes=1)
    requests_recientes = [t for t in rate_limit_store[user_id] if t > inicio_ventana]
    rate_limit_store[user_id] = requests_recientes
    
    if len(requests_recientes) >= max_requests:
        return False
    rate_limit_store[user_id].append(ahora)
    return True

# Aplicación completa
app_produccion = FastAPI()

class SolicitudIA(BaseModel):
    prompt: str
    max_tokens: int = 150

@app_produccion.post("/ia/generar")
async def endpoint_produccion(
    solicitud: SolicitudIA,
    user_id: Annotated[str, Header()] = "anonimo"
):
    """Endpoint de producción con todas las mejores prácticas integradas"""
    inicio = datetime.now()
    
    try:
        # 1. Validación de input
        if solicitud.max_tokens > 500:
            logger.warning(f"user_id={user_id} intentó usar max_tokens={solicitud.max_tokens}")
            raise HTTPException(
                status_code=400,
                detail="max_tokens no puede exceder 500"
            )
        
        # 2. Rate limiting
        if not verificar_rate_limit(user_id, max_requests=5):
            logger.warning(f"Rate limit excedido para user_id={user_id}")
            raise HTTPException(
                status_code=429,
                detail="Límite de 5 requests por minuto excedido"
            )
        
        # 3. Log de inicio
        logger.info(
            f"[INICIO] user_id={user_id} | prompt='{solicitud.prompt[:30]}...'"
        )
        
        # 4. Llamada con timeout
        config = genai.types.GenerationConfig(max_output_tokens=solicitud.max_tokens)
        
        respuesta = await asyncio.wait_for(
            model.generate_content_async(solicitud.prompt, generation_config=config),
            timeout=15.0
        )
        
        # 5. Métricas
        duracion_ms = (datetime.now() - inicio).total_seconds() * 1000
        tokens = respuesta.usage_metadata.total_token_count
        
        # 6. Log de éxito
        logger.info(
            f"[ÉXITO] user_id={user_id} | tokens={tokens} | tiempo={duracion_ms:.0f}ms"
        )
        
        return {
            "respuesta": respuesta.text[:200] + "..." if len(respuesta.text) > 200 else respuesta.text,
            "tokens_usados": tokens,
            "duracion_ms": int(duracion_ms)
        }
        
    except asyncio.TimeoutError:
        logger.error(f"[TIMEOUT] user_id={user_id} después de 15s")
        raise HTTPException(
            status_code=504,
            detail="La IA tardó demasiado en responder"
        )
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"[ERROR] user_id={user_id} | error={str(e)[:100]}")
        raise HTTPException(
            status_code=500,
            detail="Error interno del servidor"
        )

# Prueba
cliente = TestClient(app_produccion)

print("=== Endpoint de producción ===\n")
respuesta = cliente.post(
    "/ia/generar",
    json={"prompt": "Explica FastAPI", "max_tokens": 100},
    headers={"user-id": "usuario_demo"}
)

if respuesta.status_code == 200:
    datos = respuesta.json()
    print(f"✓ Tokens: {datos['tokens_usados']}")
    print(f"✓ Duración: {datos['duracion_ms']}ms")
    print(f"✓ Respuesta: {datos['respuesta'][:80]}...")
else:
    print(f"✗ Error {respuesta.status_code}: {respuesta.json()['detail']}")

print("\n✓ Incluye: validación, rate limiting, timeout, logging y manejo de errores")

### MICRO-RETO 5: Endpoint con logging completo

Añade logs INFO y WARNING al sistema de rate limiting para monitorear su uso.

**Tareas:**
1. En `verificar_rate_limit()`: Añadir log INFO mostrando requests actuales vs límite
2. Cuando se exceda el límite: Añadir log WARNING antes de retornar False
3. Test: Hacer 6 requests (límite es 5) y verificar que aparecen los logs

**Logs esperados:**
```
INFO - [RATE LIMIT] user=test_user requests=3/5
INFO - [RATE LIMIT] user=test_user requests=4/5
WARNING - [RATE LIMIT EXCEEDED] user=test_user
```

In [None]:
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from fastapi.testclient import TestClient
from typing import Annotated

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("rate_limit")

# Store de rate limiting
rate_limit_store = defaultdict(list)

def verificar_rate_limit(user_id: str, max_requests: int = 5) -> bool:
    """Verifica rate limit con logging completo"""
    ahora = datetime.now()
    inicio_ventana = ahora - timedelta(minutes=1)
    
    requests_recientes = [
        t for t in rate_limit_store[user_id] if t > inicio_ventana
    ]
    rate_limit_store[user_id] = requests_recientes
    
    # TODO: Añade log INFO aquí
    # logger.info(f"[RATE LIMIT] user={user_id} requests={len(requests_recientes)}/{max_requests}")
    
    if len(requests_recientes) >= max_requests:
        # TODO: Añade log WARNING aquí ANTES de retornar False
        # logger.warning(f"[RATE LIMIT EXCEEDED] user={user_id}")
        return False
    
    rate_limit_store[user_id].append(ahora)
    return True

# Aplicación
app_rate_limit = FastAPI()

class Solicitud(BaseModel):
    mensaje: str

@app_rate_limit.post("/mensaje")
async def enviar_mensaje(
    solicitud: Solicitud,
    user_id: Annotated[str, Header()] = "anonimo"
):
    """Endpoint con rate limiting"""
    if not verificar_rate_limit(user_id, max_requests=5):
        raise HTTPException(
            status_code=429,
            detail="Rate limit excedido"
        )
    
    return {"respuesta": f"Mensaje recibido: {solicitud.mensaje}"}

# TODO: Test - Descomenta y ejecuta
cliente = TestClient(app_rate_limit)
print("=== Test de logging (límite: 5 requests) ===\n")

for i in range(1, 7):  # 6 requests, límite es 5
    respuesta = cliente.post(
        "/mensaje",
        json={"mensaje": f"Test {i}"},
        headers={"user-id": "test_user"}
    )
    
    if respuesta.status_code == 200:
        print(f"Request {i}: OK")
    else:
        print(f"Request {i}: BLOQUEADO")

print("\n¡Revisa los logs arriba! Deberías ver logs INFO y WARNING")