# üìä commonBench v2 - Professional LLM Benchmarking Tool

## Objetivo
Herramienta profesional para evaluar LLMs con metricas cuantitativas completas:

### M√©tricas Implementadas

| Tier | M√©tricas | Descripci√≥n |
|------|----------|-------------|
| **Tier 1 - Performance** | `latency_sec`, `ttft_sec`, `tokens_per_sec` | Velocidad y rendimiento |
| **Tier 2 - Cost** | `cost_usd`, `cost_per_word`, `projected_monthly` | Costes reales y proyectados |
| **Tier 3 - Quality** | `lexical_diversity`, `readability_flesch` | Calidad del texto |
| **Tier 4 - Operational** | `error_rate`, `availability`, `retry_count` | Fiabilidad del servicio |
| **Tier 5 - Optional** | `f1_score` | Con respuesta de referencia |


In [1]:
# Instalar paquetes necesarios
#!pip install gradio openai python-dotenv textstat sentence-transformers scikit-learn pandas matplotlib seaborn requests tenacity --quiet

#print("‚úÖ Dependencias instaladas correctamente")

---
## üìã PASO 2: Imports y configuraci√≥n

In [2]:
# Standard library
import os
import time
import json
import random
import logging
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from functools import wraps
from pathlib import Path

# Third party - API
import gradio as gr
from openai import OpenAI
from dotenv import load_dotenv
import requests

# Third party - Metrics
import textstat
import pandas as pd
import numpy as np

# Third party - Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)s | %(message)s',
    datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)

# Load environment
load_dotenv(override=True)
API_KEY = os.getenv('OPENROUTER_API_KEY')

if not API_KEY:
    print("‚ö†Ô∏è OPENROUTER_API_KEY no encontrada en .env")
    print("   Crea un archivo .env con: OPENROUTER_API_KEY=tu-api-key")
else:
    print(f"‚úÖ API Key cargada: {API_KEY[:8]}...{API_KEY[-4:]}")

print("‚úÖ Imports cargados correctamente")

ModuleNotFoundError: No module named 'textstat'

---
## üìã PASO 3: Sistema de clasificaci√≥n de errores
Distinguimos entre errores transitorios (retry) y permanentes (skip)

In [None]:
class ErrorType(Enum):
    """
    Clasificaci√≥n de errores para evaluaci√≥n de calidad operativa.
    
    - TRANSIENT: Retry puede funcionar (timeouts, errores de red)
    - PERMANENT: No reintentar (modelo no existe, API key inv√°lida)
    - RATE_LIMIT: L√≠mites de la API (esperar y reintentar)
    - MODEL_ERROR: Modelo rechaz√≥ la petici√≥n (content policy)
    - SUCCESS: No es error
    """
    TRANSIENT = "transient"
    PERMANENT = "permanent"
    RATE_LIMIT = "rate_limit"
    MODEL_ERROR = "model_error"
    SUCCESS = "success"


@dataclass
class ModelCallResult:
    """
    Resultado estructurado de una llamada al modelo.
    Incluye m√©tricas y metadata para an√°lisis completo.
    """
    success: bool
    text: str = ""
    input_tokens: int = 0
    output_tokens: int = 0
    total_tokens: int = 0
    latency_sec: float = 0.0
    ttft_sec: Optional[float] = None  # Time to First Token
    error_type: ErrorType = ErrorType.SUCCESS
    error_message: str = ""
    retry_count: int = 0
    model_id: str = ""


def classify_error(exception: Exception) -> Tuple[ErrorType, str]:
    """
    Clasifica una excepci√≥n para decidir si reintentar.
    """
    error_str = str(exception).lower()
    error_class = type(exception).__name__
    
    # Rate limits
    if any(kw in error_str for kw in ['rate limit', 'rate_limit', '429', 'too many requests']):
        return ErrorType.RATE_LIMIT, f"Rate limit: {str(exception)[:100]}"
    
    # Timeouts
    if any(kw in error_str for kw in ['timeout', 'timed out', 'deadline']):
        return ErrorType.TRANSIENT, f"Timeout: {str(exception)[:100]}"
    
    # Conexi√≥n
    if any(kw in error_str for kw in ['connection', 'network', 'socket', 'ssl']):
        return ErrorType.TRANSIENT, f"Conexi√≥n: {str(exception)[:100]}"
    
    # Server errors (5xx)
    if any(kw in error_str for kw in ['500', '502', '503', '504', 'server error']):
        return ErrorType.TRANSIENT, f"Server error: {str(exception)[:100]}"
    
    # Modelo no encontrado
    if any(kw in error_str for kw in ['not found', '404', 'model not available', 'invalid model']):
        return ErrorType.PERMANENT, f"Modelo no disponible: {str(exception)[:100]}"
    
    # Auth errors
    if any(kw in error_str for kw in ['unauthorized', '401', 'invalid api key', 'authentication']):
        return ErrorType.PERMANENT, f"Auth error: {str(exception)[:100]}"
    
    # Content policy
    if any(kw in error_str for kw in ['content policy', 'safety', 'refused']):
        return ErrorType.MODEL_ERROR, f"Rechazado: {str(exception)[:100]}"
    
    # Default: asumir transitorio
    return ErrorType.TRANSIENT, f"Error ({error_class}): {str(exception)[:100]}"


print("‚úÖ Sistema de clasificaci√≥n de errores definido")

---
## üìã PASO 4: Retry con exponential backoff

In [None]:
def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    exponential_base: float = 2.0,
    retryable_errors: Tuple[ErrorType, ...] = (ErrorType.TRANSIENT, ErrorType.RATE_LIMIT)
):
    """
    Decorador para retry con exponential backoff.
    
    - Espera m√°s tiempo entre cada reintento
    - A√±ade jitter para evitar thundering herd
    - Solo reintenta errores transitorios
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs) -> ModelCallResult:
            last_result = None
            
            for attempt in range(max_retries + 1):
                result = func(*args, **kwargs)
                result.retry_count = attempt
                
                # √âxito o error permanente: retornar
                if result.success or result.error_type not in retryable_errors:
                    return result
                
                last_result = result
                
                # Calcular delay con backoff + jitter
                if attempt < max_retries:
                    delay = min(base_delay * (exponential_base ** attempt), max_delay)
                    jitter = delay * 0.2 * (random.random() - 0.5)
                    actual_delay = delay + jitter
                    
                    logger.warning(
                        f"Retry {attempt + 1}/{max_retries} - "
                        f"{result.error_type.value}: esperando {actual_delay:.1f}s"
                    )
                    time.sleep(actual_delay)
            
            return last_result
        return wrapper
    return decorator


print("‚úÖ Sistema de retry definido")

---
## üìã PASO 5: Colector de estad√≠sticas de errores

In [None]:
@dataclass
class ModelErrorStats:
    """
    Estad√≠sticas de errores por modelo.
    El error_rate es una m√©trica de calidad operativa.
    """
    total_calls: int = 0
    successful_calls: int = 0
    failed_calls: int = 0
    transient_errors: int = 0
    permanent_errors: int = 0
    rate_limit_errors: int = 0
    model_errors: int = 0
    total_retries: int = 0
    
    @property
    def error_rate(self) -> float:
        if self.total_calls == 0:
            return 0.0
        return round(self.failed_calls / self.total_calls * 100, 2)
    
    @property
    def availability(self) -> float:
        return round(100 - self.error_rate, 2)


class ErrorStatsCollector:
    """Colector de estad√≠sticas de errores para m√∫ltiples modelos."""
    
    def __init__(self):
        self._stats: Dict[str, ModelErrorStats] = {}
    
    def record(self, result: ModelCallResult) -> None:
        model_id = result.model_id
        
        if model_id not in self._stats:
            self._stats[model_id] = ModelErrorStats()
        
        stats = self._stats[model_id]
        stats.total_calls += 1
        stats.total_retries += result.retry_count
        
        if result.success:
            stats.successful_calls += 1
        else:
            stats.failed_calls += 1
            if result.error_type == ErrorType.TRANSIENT:
                stats.transient_errors += 1
            elif result.error_type == ErrorType.PERMANENT:
                stats.permanent_errors += 1
            elif result.error_type == ErrorType.RATE_LIMIT:
                stats.rate_limit_errors += 1
            elif result.error_type == ErrorType.MODEL_ERROR:
                stats.model_errors += 1
    
    def get_stats(self, model_id: str) -> Optional[ModelErrorStats]:
        return self._stats.get(model_id)
    
    def to_dataframe(self) -> pd.DataFrame:
        rows = []
        for model_id, stats in self._stats.items():
            rows.append({
                "model": model_id,
                "total_calls": stats.total_calls,
                "successful": stats.successful_calls,
                "failed": stats.failed_calls,
                "error_rate_%": stats.error_rate,
                "availability_%": stats.availability,
                "total_retries": stats.total_retries,
            })
        return pd.DataFrame(rows)
    
    def reset(self):
        self._stats = {}


print("‚úÖ Colector de estad√≠sticas de errores definido")

---
## üìã PASO 6: Sistema de precios din√°micos

In [None]:
@dataclass
class ModelPricing:
    """
    Estructura de precios con metadata para auditor√≠a.
    """
    model_id: str
    name: str
    input_price_per_million: float
    output_price_per_million: float
    context_length: int = 0
    is_free: bool = False
    fetched_at: datetime = field(default_factory=datetime.now)
    source: str = "unknown"  # "api", "cache", "fallback"
    
    def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
        input_cost = (input_tokens * self.input_price_per_million) / 1_000_000
        output_cost = (output_tokens * self.output_price_per_million) / 1_000_000
        return round(input_cost + output_cost, 6)


# Precios fallback verificados (Diciembre 2024)
FALLBACK_PRICES = {
    # Anthropic
    "anthropic/claude-3.5-haiku": {"input": 0.80, "output": 4.0, "name": "Claude 3.5 Haiku"},
    "anthropic/claude-3.5-haiku-20241022": {"input": 0.80, "output": 4.0, "name": "Claude 3.5 Haiku"},
    "anthropic/claude-3.5-sonnet": {"input": 3.0, "output": 15.0, "name": "Claude 3.5 Sonnet"},
    "anthropic/claude-sonnet-4": {"input": 3.0, "output": 15.0, "name": "Claude Sonnet 4"},
    "anthropic/claude-haiku-4.5": {"input": 1.0, "output": 5.0, "name": "Claude Haiku 4.5"},
    
    # OpenAI
    "openai/gpt-4o-mini": {"input": 0.15, "output": 0.60, "name": "GPT-4o Mini"},
    "openai/gpt-4o": {"input": 2.50, "output": 10.0, "name": "GPT-4o"},
    
    # Google Free
    "google/gemini-2.0-flash-exp:free": {"input": 0, "output": 0, "name": "Gemini 2.0 Flash (Free)"},
    "google/gemma-3-27b-it:free": {"input": 0, "output": 0, "name": "Gemma 3 27B (Free)"},
    
    # Meta Free
    "meta-llama/llama-3.3-70b-instruct:free": {"input": 0, "output": 0, "name": "Llama 3.3 70B (Free)"},
    "meta-llama/llama-4-maverick:free": {"input": 0, "output": 0, "name": "Llama 4 Maverick (Free)"},
    
    # DeepSeek
    "deepseek/deepseek-r1:free": {"input": 0, "output": 0, "name": "DeepSeek R1 (Free)"},
    "deepseek/deepseek-chat-v3-0324:free": {"input": 0, "output": 0, "name": "DeepSeek V3 (Free)"},
    "deepseek/deepseek-chat": {"input": 0.14, "output": 0.28, "name": "DeepSeek Chat"},
    
    # Qwen
    "qwen/qwq-32b:free": {"input": 0, "output": 0, "name": "Qwen QwQ 32B (Free)"},
}


class DynamicPricingClient:
    """
    Cliente para obtener precios din√°micos de OpenRouter.
    
    Caracter√≠sticas:
    - Cach√© con TTL configurable
    - Fallback a precios conocidos
    - Timestamp para auditor√≠a
    """
    
    OPENROUTER_MODELS_URL = "https://openrouter.ai/api/v1/models"
    
    def __init__(self, api_key: Optional[str] = None, cache_ttl_hours: float = 1.0):
        self.api_key = api_key
        self.cache_ttl = timedelta(hours=cache_ttl_hours)
        self._cache: Dict[str, ModelPricing] = {}
        self._last_fetch: Optional[datetime] = None
        self._all_models: List[Dict] = []
    
    def _is_cache_valid(self) -> bool:
        if not self._last_fetch:
            return False
        return datetime.now() - self._last_fetch < self.cache_ttl
    
    def _fetch_models_from_api(self) -> List[Dict]:
        headers = {}
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"
        
        try:
            response = requests.get(
                self.OPENROUTER_MODELS_URL,
                headers=headers,
                timeout=30
            )
            response.raise_for_status()
            data = response.json()
            models = data.get("data", [])
            logger.info(f"Obtenidos {len(models)} modelos desde OpenRouter API")
            return models
        except Exception as e:
            logger.warning(f"Error obteniendo modelos de API: {e}")
            return []
    
    def _parse_model_pricing(self, model_data: Dict) -> ModelPricing:
        model_id = model_data.get("id", "")
        name = model_data.get("name", model_id)
        pricing = model_data.get("pricing", {})
        
        try:
            input_per_token = float(pricing.get("prompt", "0") or "0")
            output_per_token = float(pricing.get("completion", "0") or "0")
        except (ValueError, TypeError):
            input_per_token = 0
            output_per_token = 0
        
        input_per_million = input_per_token * 1_000_000
        output_per_million = output_per_token * 1_000_000
        
        return ModelPricing(
            model_id=model_id,
            name=name,
            input_price_per_million=round(input_per_million, 4),
            output_price_per_million=round(output_per_million, 4),
            context_length=model_data.get("context_length", 0),
            is_free=(input_per_million == 0 and output_per_million == 0),
            fetched_at=datetime.now(),
            source="api"
        )
    
    def refresh_prices(self, force: bool = False) -> bool:
        if not force and self._is_cache_valid():
            return False
        
        models = self._fetch_models_from_api()
        if not models:
            return False
        
        self._all_models = models
        self._last_fetch = datetime.now()
        
        for model_data in models:
            pricing = self._parse_model_pricing(model_data)
            self._cache[pricing.model_id] = pricing
        
        return True
    
    def get_model_pricing(self, model_id: str) -> ModelPricing:
        # Refrescar si cach√© expirado
        if not self._is_cache_valid():
            self.refresh_prices()
        
        # Buscar en cach√©
        if model_id in self._cache:
            return self._cache[model_id]
        
        # Buscar variantes
        base_id = model_id.split(":")[0]
        for cached_id, pricing in self._cache.items():
            if cached_id.split(":")[0] == base_id:
                return pricing
        
        # Fallback
        if model_id in FALLBACK_PRICES:
            fb = FALLBACK_PRICES[model_id]
            return ModelPricing(
                model_id=model_id,
                name=fb.get("name", model_id),
                input_price_per_million=fb["input"],
                output_price_per_million=fb["output"],
                is_free=(fb["input"] == 0 and fb["output"] == 0),
                source="fallback"
            )
        
        # Desconocido
        return ModelPricing(
            model_id=model_id,
            name=model_id,
            input_price_per_million=0,
            output_price_per_million=0,
            is_free=True,
            source="unknown"
        )
    
    def list_available_models(self, free_only: bool = False) -> List[ModelPricing]:
        if not self._cache:
            self.refresh_prices()
        
        models = list(self._cache.values())
        if free_only:
            models = [m for m in models if m.is_free]
        
        models.sort(key=lambda m: m.input_price_per_million + m.output_price_per_million)
        return models
    
    def get_pricing_summary(self) -> Dict[str, Any]:
        return {
            "fetched_at": self._last_fetch.isoformat() if self._last_fetch else None,
            "total_models": len(self._cache),
            "free_models": len([m for m in self._cache.values() if m.is_free]),
        }


print("‚úÖ Sistema de precios din√°micos definido")

---
## üìã PASO 7: Funci√≥n principal de llamada al modelo

In [None]:
def create_model_caller(client: OpenAI):
    """
    Factory que crea la funci√≥n de llamada al modelo con el cliente configurado.
    """
    
    @retry_with_backoff(max_retries=3, base_delay=1.0)
    def call_model_with_metrics(
        model: str,
        question: str,
        temperature: float = 0.7,
        top_p: float = 0.9,
        max_tokens: int = 500,
        stream: bool = True
    ) -> ModelCallResult:
        """
        Llama a un modelo con m√©tricas completas incluyendo TTFT.
        """
        start_time = time.time()
        ttft = None
        
        try:
            if stream:
                # Modo streaming para capturar TTFT
                response_chunks = []
                first_token_received = False
                
                stream_response = client.chat.completions.create(
                    model=model,
                    messages=[{"role": "user", "content": question}],
                    temperature=min(temperature, 1.0),
                    top_p=top_p,
                    max_tokens=max_tokens,
                    stream=True
                )
                
                for chunk in stream_response:
                    if not first_token_received:
                        if chunk.choices and chunk.choices[0].delta.content:
                            ttft = time.time() - start_time
                            first_token_received = True
                    
                    if chunk.choices and chunk.choices[0].delta.content:
                        response_chunks.append(chunk.choices[0].delta.content)
                
                text = "".join(response_chunks)
                latency = time.time() - start_time
                
                # Estimar tokens (streaming no siempre da usage)
                input_tokens = len(question) // 4 + 10
                output_tokens = len(text) // 4
            
            else:
                # Modo normal
                response = client.chat.completions.create(
                    model=model,
                    messages=[{"role": "user", "content": question}],
                    temperature=min(temperature, 1.0),
                    top_p=top_p,
                    max_tokens=max_tokens
                )
                
                latency = time.time() - start_time
                text = response.choices[0].message.content.strip()
                
                usage = response.usage
                input_tokens = usage.prompt_tokens if usage else len(question) // 4
                output_tokens = usage.completion_tokens if usage else len(text) // 4
            
            return ModelCallResult(
                success=True,
                text=text,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                total_tokens=input_tokens + output_tokens,
                latency_sec=round(latency, 3),
                ttft_sec=round(ttft, 3) if ttft else None,
                error_type=ErrorType.SUCCESS,
                model_id=model
            )
            
        except Exception as e:
            latency = time.time() - start_time
            error_type, error_message = classify_error(e)
            
            return ModelCallResult(
                success=False,
                latency_sec=round(latency, 3),
                error_type=error_type,
                error_message=error_message,
                model_id=model
            )
    
    return call_model_with_metrics


print("‚úÖ Funci√≥n de llamada al modelo definida")

---
## üìã PASO 8: Funciones de m√©tricas de calidad de texto

In [None]:
def calculate_lexical_diversity(text: str) -> float:
    """Type-Token Ratio (unique words / total words)."""
    words = text.lower().split()
    if len(words) == 0:
        return 0
    return round(len(set(words)) / len(words), 3)


def calculate_readability(text: str) -> float:
    """Flesch Reading Ease (0-100, higher = easier)."""
    try:
        return round(textstat.flesch_reading_ease(text), 1)
    except Exception:
        return 0


def count_words(text: str) -> int:
    """Cuenta palabras en el texto."""
    return len(text.split())


def count_sentences(text: str) -> int:
    """Cuenta oraciones en el texto."""
    try:
        return textstat.sentence_count(text)
    except Exception:
        return 0


def calculate_f1_score(response: str, reference: str) -> float:
    """F1 score a nivel de palabras."""
    if not reference or not response:
        return 0
    
    response_words = set(response.lower().split())
    reference_words = set(reference.lower().split())
    
    if len(response_words) == 0 or len(reference_words) == 0:
        return 0
    
    true_positives = len(response_words & reference_words)
    precision = true_positives / len(response_words)
    recall = true_positives / len(reference_words)
    
    if precision + recall == 0:
        return 0
    
    return round(2 * precision * recall / (precision + recall), 3)


def calculate_tokens_per_second(tokens: int, latency: float) -> float:
    """Tokens generados por segundo."""
    if latency <= 0:
        return 0
    return round(tokens / latency, 1)


print("‚úÖ Funciones de m√©tricas definidas")

---
## üìã PASO 9: Clase principal del Benchmark

In [None]:
class CommonBenchV2:
    """
    Benchmark profesional de LLMs.
    
    Caracter√≠sticas:
    - Precios din√°micos desde API
    - TTFT (Time to First Token)
    - Error rate como m√©trica
    - Retry autom√°tico
    - Metadata completa
    """
    
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.getenv('OPENROUTER_API_KEY')
        
        # Cliente OpenRouter
        self.client = OpenAI(
            base_url="https://openrouter.ai/api/v1",
            api_key=self.api_key
        )
        
        # Funci√≥n de llamada con retry
        self.call_model = create_model_caller(self.client)
        
        # Cliente de precios
        self.pricing_client = DynamicPricingClient(api_key=self.api_key)
        
        # Colector de errores
        self.error_collector = ErrorStatsCollector()
        
        # Estado
        self.last_results: List[Dict] = []
    
    def estimate_cost(
        self,
        models: List[str],
        iterations: int,
        avg_input_tokens: int = 100,
        avg_output_tokens: int = 400
    ) -> Dict[str, Any]:
        """
        Estima el coste ANTES de ejecutar el benchmark.
        """
        total_cost = 0.0
        model_costs = {}
        
        for model_id in models:
            pricing = self.pricing_client.get_model_pricing(model_id)
            cost_per_call = pricing.calculate_cost(avg_input_tokens, avg_output_tokens)
            model_total = cost_per_call * iterations
            
            model_costs[model_id] = {
                "name": pricing.name,
                "cost_per_call": cost_per_call,
                "total_cost": model_total,
                "is_free": pricing.is_free,
                "source": pricing.source
            }
            total_cost += model_total
        
        return {
            "total_estimated_cost_usd": round(total_cost, 4),
            "total_calls": len(models) * iterations,
            "models": model_costs
        }
    
    def run_benchmark(
        self,
        question: str,
        models: List[str],
        iterations: int = 5,
        temperature: float = 0.7,
        top_p: float = 0.9,
        max_tokens: int = 500,
        reference_text: Optional[str] = None,
        stream_for_ttft: bool = True,
        progress_callback=None
    ) -> pd.DataFrame:
        """
        Ejecuta el benchmark completo.
        """
        # Resetear estado
        self.last_results = []
        self.error_collector.reset()
        
        # Refrescar precios
        self.pricing_client.refresh_prices()
        
        total_calls = len(models) * iterations
        current_call = 0
        
        for iteration in range(iterations):
            for model_id in models:
                current_call += 1
                
                # Progreso
                if progress_callback:
                    progress_callback(
                        current_call / total_calls,
                        f"üìä Iter {iteration + 1}/{iterations} - {model_id.split('/')[-1]}"
                    )
                
                # Llamar al modelo
                result = self.call_model(
                    model=model_id,
                    question=question,
                    temperature=temperature,
                    top_p=top_p,
                    max_tokens=max_tokens,
                    stream=stream_for_ttft
                )
                
                # Registrar errores
                self.error_collector.record(result)
                
                # Obtener pricing
                pricing = self.pricing_client.get_model_pricing(model_id)
                cost = pricing.calculate_cost(result.input_tokens, result.output_tokens)
                
                # Construir registro
                record = self._build_record(
                    iteration=iteration + 1,
                    result=result,
                    pricing=pricing,
                    cost=cost,
                    question=question,
                    reference_text=reference_text
                )
                
                self.last_results.append(record)
        
        return pd.DataFrame(self.last_results)
    
    def _build_record(
        self,
        iteration: int,
        result: ModelCallResult,
        pricing: ModelPricing,
        cost: float,
        question: str,
        reference_text: Optional[str]
    ) -> Dict[str, Any]:
        """
        Construye un registro completo con todas las m√©tricas.
        """
        record = {
            # Identificaci√≥n
            "iteration": iteration,
            "model_id": result.model_id,
            "model_name": pricing.name,
            
            # Estado
            "success": result.success,
            "error_type": result.error_type.value if not result.success else None,
            "retry_count": result.retry_count,
            
            # Tokens
            "input_tokens": result.input_tokens,
            "output_tokens": result.output_tokens,
            "total_tokens": result.total_tokens,
            
            # Latencia
            "latency_sec": result.latency_sec,
            "ttft_sec": result.ttft_sec,
            "tokens_per_sec": calculate_tokens_per_second(
                result.output_tokens, result.latency_sec
            ),
            
            # Coste
            "cost_usd": cost,
            "is_free": pricing.is_free,
            "pricing_source": pricing.source,
        }
        
        # M√©tricas de calidad (solo si exitoso)
        if result.success and result.text:
            text = result.text
            record.update({
                "word_count": count_words(text),
                "sentence_count": count_sentences(text),
                "lexical_diversity": calculate_lexical_diversity(text),
                "readability_flesch": calculate_readability(text),
                "response_preview": text[:150] + "..." if len(text) > 150 else text,
            })
            
            # F1 si hay referencia
            if reference_text:
                record["f1_score"] = calculate_f1_score(text, reference_text)
        
        return record
    
    def get_aggregated_results(self) -> pd.DataFrame:
        """
        Agrega resultados por modelo.
        """
        if not self.last_results:
            return pd.DataFrame()
        
        df = pd.DataFrame(self.last_results)
        df_success = df[df['success'] == True].copy()
        
        if df_success.empty:
            return pd.DataFrame()
        
        # M√©tricas a agregar
        numeric_cols = [
            'latency_sec', 'ttft_sec', 'tokens_per_sec',
            'input_tokens', 'output_tokens', 'cost_usd',
            'word_count', 'lexical_diversity', 'readability_flesch'
        ]
        
        # Filtrar columnas existentes
        numeric_cols = [c for c in numeric_cols if c in df_success.columns]
        
        # Agregar
        agg_df = df_success.groupby('model_name')[numeric_cols].agg(
            ['mean', 'std', 'min', 'max']
        ).round(3)
        
        agg_df.columns = ['_'.join(col) for col in agg_df.columns]
        agg_df = agg_df.reset_index()
        
        return agg_df
    
    def get_error_stats(self) -> pd.DataFrame:
        """
        Estad√≠sticas de errores por modelo.
        """
        return self.error_collector.to_dataframe()


print("‚úÖ Clase CommonBenchV2 definida")

---
## üìã PASO 10: Funciones de visualizaci√≥n

In [None]:
def create_comparison_chart(df: pd.DataFrame) -> Optional[plt.Figure]:
    """
    Crea gr√°fico de comparaci√≥n: Cost vs Latency.
    """
    if df.empty:
        return None
    
    # Filtrar solo exitosos
    df_success = df[df['success'] == True].copy()
    if df_success.empty:
        return None
    
    # Agregar por modelo
    model_stats = df_success.groupby('model_name').agg({
        'cost_usd': 'mean',
        'latency_sec': 'mean',
        'tokens_per_sec': 'mean',
        'readability_flesch': 'mean'
    }).reset_index()
    
    # Crear figura
    fig, ax = plt.subplots(figsize=(10, 6))
    
    # Scatter plot
    scatter = ax.scatter(
        model_stats['latency_sec'],
        model_stats['cost_usd'] * 1000,  # Mostrar en mil√©simas
        s=model_stats['tokens_per_sec'] * 3,  # Tama√±o = velocidad
        c=model_stats['readability_flesch'],
        cmap='RdYlGn',
        alpha=0.7,
        edgecolors='black'
    )
    
    # Labels
    for idx, row in model_stats.iterrows():
        ax.annotate(
            row['model_name'],
            (row['latency_sec'], row['cost_usd'] * 1000),
            xytext=(5, 5),
            textcoords='offset points',
            fontsize=9,
            fontweight='bold'
        )
    
    ax.set_xlabel('Latencia (segundos)', fontsize=12)
    ax.set_ylabel('Coste (mil√©simas de USD)', fontsize=12)
    ax.set_title(
        'Comparaci√≥n de Modelos: Coste vs Velocidad\n'
        '(Tama√±o = tokens/sec, Color = legibilidad)',
        fontsize=14
    )
    ax.grid(True, alpha=0.3)
    
    # Colorbar
    cbar = plt.colorbar(scatter, ax=ax)
    cbar.set_label('Readability Score', fontsize=10)
    
    plt.tight_layout()
    return fig


def create_ttft_chart(df: pd.DataFrame) -> Optional[plt.Figure]:
    """
    Crea gr√°fico de TTFT por modelo.
    """
    if df.empty:
        return None
    
    df_success = df[(df['success'] == True) & (df['ttft_sec'].notna())].copy()
    if df_success.empty:
        return None
    
    fig, ax = plt.subplots(figsize=(10, 5))
    
    # Box plot de TTFT
    models = df_success['model_name'].unique()
    data = [df_success[df_success['model_name'] == m]['ttft_sec'].values for m in models]
    
    bp = ax.boxplot(data, labels=models, patch_artist=True)
    
    colors = plt.cm.Set3(np.linspace(0, 1, len(models)))
    for patch, color in zip(bp['boxes'], colors):
        patch.set_facecolor(color)
    
    ax.set_ylabel('Time to First Token (segundos)', fontsize=12)
    ax.set_title('TTFT por Modelo (M√©trica de UX)', fontsize=14)
    ax.grid(True, alpha=0.3, axis='y')
    
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    
    return fig


print("‚úÖ Funciones de visualizaci√≥n definidas")

---
## üìã PASO 11: Interfaz Gradio

In [None]:
# Inicializar benchmark global
benchmark = CommonBenchV2()

# Obtener modelos disponibles
print("Cargando modelos disponibles...")
benchmark.pricing_client.refresh_prices()

# Modelos recomendados para UI
RECOMMENDED_MODELS = [
    # Free tier
    "google/gemini-2.0-flash-exp:free",
    "meta-llama/llama-3.3-70b-instruct:free",
    "deepseek/deepseek-r1:free",
    "deepseek/deepseek-chat-v3-0324:free",
    "qwen/qwq-32b:free",
    # Budget tier
    "openai/gpt-4o-mini",
    "anthropic/claude-3.5-haiku",
    "deepseek/deepseek-chat",
    # Mid tier
    "anthropic/claude-3.5-sonnet",
    "openai/gpt-4o",
]

EXAMPLE_QUESTIONS = [
    "Explain quantum entanglement in simple terms",
    "What are the key differences between supervised and unsupervised learning?",
    "How does photosynthesis work?",
    "Explain the concept of blockchain technology",
    "What is recursion in programming? Give a simple example.",
]

print(f"‚úÖ {len(RECOMMENDED_MODELS)} modelos recomendados disponibles")

In [None]:
def run_benchmark_ui(
    question: str,
    model1: str, model2: str, model3: str, model4: str,
    iterations: int,
    temperature: float,
    top_p: float,
    max_tokens: int,
    reference_file,
    progress=gr.Progress()
):
    """
    Wrapper para la UI de Gradio.
    """
    # Validar
    if not question.strip():
        return None, None, None, None, None, "‚ùå Por favor ingresa una pregunta"
    
    # Obtener modelos seleccionados
    models = [m for m in [model1, model2, model3, model4] if m]
    if not models:
        return None, None, None, None, None, "‚ùå Selecciona al menos un modelo"
    
    # Leer referencia si existe
    reference_text = None
    if reference_file is not None:
        try:
            if isinstance(reference_file, bytes):
                reference_text = reference_file.decode('utf-8')
            else:
                with open(reference_file.name, 'r') as f:
                    reference_text = f.read()
        except Exception as e:
            logger.warning(f"Error leyendo referencia: {e}")
    
    # Mostrar estimaci√≥n de coste
    estimate = benchmark.estimate_cost(models, iterations)
    
    # Ejecutar benchmark
    df = benchmark.run_benchmark(
        question=question,
        models=models,
        iterations=iterations,
        temperature=temperature,
        top_p=top_p,
        max_tokens=max_tokens,
        reference_text=reference_text,
        stream_for_ttft=True,
        progress_callback=lambda p, m: progress(p, desc=m)
    )
    
    if df.empty:
        return None, None, None, None, None, "‚ùå Todas las llamadas fallaron"
    
    # Resultados agregados
    agg_df = benchmark.get_aggregated_results()
    
    # Estad√≠sticas de errores
    error_df = benchmark.get_error_stats()
    
    # Gr√°ficos
    chart1 = create_comparison_chart(df)
    chart2 = create_ttft_chart(df)
    
    # Status
    successful = len(df[df['success'] == True])
    total = len(df)
    cost_actual = df[df['success'] == True]['cost_usd'].sum()
    
    status = (
        f"‚úÖ Benchmark completado\n"
        f"üìä {successful}/{total} llamadas exitosas\n"
        f"üí∞ Coste real: ${cost_actual:.4f} (estimado: ${estimate['total_estimated_cost_usd']:.4f})"
    )
    
    return df, agg_df, error_df, chart1, chart2, status


def export_to_csv(df):
    """Exporta resultados a CSV."""
    if df is None or (isinstance(df, pd.DataFrame) and df.empty):
        return None
    
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    csv_path = f"/tmp/commonbench_results_{timestamp}.csv"
    df.to_csv(csv_path, index=False)
    return csv_path


def get_cost_estimate(model1, model2, model3, model4, iterations):
    """Obtiene estimaci√≥n de coste para mostrar en UI."""
    models = [m for m in [model1, model2, model3, model4] if m]
    if not models:
        return "Selecciona al menos un modelo"
    
    estimate = benchmark.estimate_cost(models, iterations)
    
    lines = [f"**Estimaci√≥n de coste:**"]
    lines.append(f"- Total: **${estimate['total_estimated_cost_usd']:.4f}**")
    lines.append(f"- Llamadas: {estimate['total_calls']}")
    lines.append("")
    
    for model_id, info in estimate['models'].items():
        if info['is_free']:
            lines.append(f"- {info['name']}: üÜì GRATIS")
        else:
            lines.append(f"- {info['name']}: ${info['total_cost']:.4f}")
    
    return "\n".join(lines)


print("‚úÖ Funciones de UI definidas")

In [None]:
# Crear interfaz Gradio
with gr.Blocks(
    title="commonBench v2 - Professional LLM Benchmarking",
    theme=gr.themes.Soft()
) as demo:
    
    gr.Markdown("""
    # üìä commonBench v2
    ### Professional LLM Benchmarking Tool
    
    Eval√∫a m√∫ltiples LLMs con m√©tricas completas:
    - ‚è±Ô∏è **Latencia y TTFT** (Time to First Token)
    - üí∞ **Costes reales** (precios din√°micos desde API)
    - üìù **Calidad del texto** (legibilidad, diversidad l√©xica)
    - ‚ö†Ô∏è **Fiabilidad** (error rate, availability)
    """)
    
    # === INPUTS ===
    with gr.Row():
        with gr.Column(scale=2):
            question_input = gr.Textbox(
                label="‚ùì Pregunta a evaluar",
                placeholder="Escribe tu pregunta aqu√≠...",
                lines=3,
                value=EXAMPLE_QUESTIONS[0]
            )
            
            gr.Markdown("### ü§ñ Selecciona Modelos (1-4)")
            
            with gr.Row():
                model1 = gr.Dropdown(
                    choices=RECOMMENDED_MODELS,
                    value=RECOMMENDED_MODELS[0],
                    label="Modelo 1"
                )
                model2 = gr.Dropdown(
                    choices=RECOMMENDED_MODELS,
                    value=RECOMMENDED_MODELS[5],
                    label="Modelo 2"
                )
            
            with gr.Row():
                model3 = gr.Dropdown(
                    choices=[None] + RECOMMENDED_MODELS,
                    value=None,
                    label="Modelo 3 (opcional)"
                )
                model4 = gr.Dropdown(
                    choices=[None] + RECOMMENDED_MODELS,
                    value=None,
                    label="Modelo 4 (opcional)"
                )
        
        with gr.Column(scale=1):
            gr.Markdown("### ‚öôÔ∏è Configuraci√≥n")
            
            iterations_slider = gr.Slider(
                minimum=1, maximum=30, value=5, step=1,
                label="üîÑ Iteraciones",
                info="M√°s iteraciones = m√°s fiable pero m√°s lento"
            )
            
            temperature = gr.Slider(
                minimum=0.0, maximum=1.0, value=0.7, step=0.1,
                label="üå°Ô∏è Temperature"
            )
            
            top_p = gr.Slider(
                minimum=0.0, maximum=1.0, value=0.9, step=0.05,
                label="üé≤ Top P"
            )
            
            max_tokens = gr.Slider(
                minimum=100, maximum=2000, value=500, step=100,
                label="üìè Max Tokens"
            )
            
            reference_file = gr.File(
                label="üìÑ Respuesta de referencia (opcional)",
                file_types=[".txt"],
                type="binary"
            )
            
            # Estimaci√≥n de coste
            cost_estimate = gr.Markdown("*Selecciona modelos para ver estimaci√≥n*")
    
    # Actualizar estimaci√≥n cuando cambian modelos/iteraciones
    for component in [model1, model2, model3, model4, iterations_slider]:
        component.change(
            fn=get_cost_estimate,
            inputs=[model1, model2, model3, model4, iterations_slider],
            outputs=[cost_estimate]
        )
    
    # === BOT√ìN ===
    run_btn = gr.Button(
        "üöÄ Ejecutar Benchmark",
        variant="primary",
        size="lg"
    )
    
    status_msg = gr.Markdown(
        "üí° **Configura y ejecuta el benchmark para ver resultados**"
    )
    
    # === OUTPUTS ===
    gr.Markdown("---")
    gr.Markdown("## üìä Resultados")
    
    with gr.Tabs():
        with gr.Tab("üìã Datos Raw"):
            raw_results = gr.Dataframe(
                label="Todas las mediciones",
                wrap=True
            )
            export_btn = gr.Button("üíæ Exportar a CSV")
            export_file = gr.File(label="Descargar CSV")
        
        with gr.Tab("üìà Estad√≠sticas Agregadas"):
            gr.Markdown("""
            **Estad√≠sticas por modelo:**
            - `mean`: Promedio
            - `std`: Desviaci√≥n est√°ndar (consistencia)
            - `min/max`: Valores extremos
            """)
            agg_results = gr.Dataframe(
                label="Agregado por modelo",
                wrap=True
            )
        
        with gr.Tab("‚ö†Ô∏è Error Rate"):
            gr.Markdown("""
            **M√©tricas de calidad operativa:**
            - `error_rate_%`: Porcentaje de llamadas fallidas
            - `availability_%`: Porcentaje de llamadas exitosas
            - `total_retries`: Reintentos necesarios
            """)
            error_results = gr.Dataframe(
                label="Estad√≠sticas de errores",
                wrap=True
            )
        
        with gr.Tab("üìä Coste vs Latencia"):
            comparison_chart = gr.Plot(
                label="Coste vs Velocidad (Tama√±o = tokens/sec, Color = legibilidad)"
            )
        
        with gr.Tab("‚è±Ô∏è TTFT"):
            ttft_chart = gr.Plot(
                label="Time to First Token por modelo"
            )
    
    # === STATE ===
    results_state = gr.State()
    
    # === CONNECTIONS ===
    run_btn.click(
        fn=run_benchmark_ui,
        inputs=[
            question_input,
            model1, model2, model3, model4,
            iterations_slider,
            temperature,
            top_p,
            max_tokens,
            reference_file
        ],
        outputs=[
            raw_results,
            agg_results,
            error_results,
            comparison_chart,
            ttft_chart,
            status_msg
        ]
    )
    
    raw_results.change(
        fn=lambda df: df,
        inputs=[raw_results],
        outputs=[results_state]
    )
    
    export_btn.click(
        fn=export_to_csv,
        inputs=[results_state],
        outputs=[export_file]
    )
    
    # === DOCUMENTACI√ìN ===
    gr.Markdown("""
    ---
    ### üìñ Gu√≠a de M√©tricas
    
    | M√©trica | Qu√© mide | Bueno es... |
    |---------|----------|-------------|
    | `latency_sec` | Tiempo total de respuesta | Bajo |
    | `ttft_sec` | Time to First Token (UX) | Bajo (<1s ideal) |
    | `tokens_per_sec` | Velocidad de generaci√≥n | Alto |
    | `cost_usd` | Coste en d√≥lares | Bajo |
    | `lexical_diversity` | Variedad de vocabulario | 0.6-0.8 |
    | `readability_flesch` | Facilidad de lectura | 60-70 |
    | `error_rate_%` | Porcentaje de fallos | Bajo (<5%) |
    | `f1_score` | Similitud con referencia | Alto |
    
    ### üí° Tips
    - Usa **5-10 iteraciones** para resultados fiables
    - Compara `mean` para rendimiento t√≠pico, `std` para consistencia
    - **TTFT bajo** = mejor experiencia de usuario
    - **Error rate** indica fiabilidad operativa del modelo/provider
    
    ### üéØ Casos de uso
    1. **Selecci√≥n de modelo**: Encuentra el mejor balance coste/calidad
    2. **Optimizaci√≥n de prompts**: Mide impacto de cambios
    3. **Monitoreo**: Detecta regresiones en modelos
    4. **Presupuesto**: Estima costes antes de ejecutar
    """)

print("‚úÖ Interfaz Gradio creada")

---
## üìã PASO 12: Lanzar la aplicaci√≥n

In [None]:
# Lanzar aplicaci√≥n
if __name__ == "__main__":
    demo.launch(
        server_name="0.0.0.0",
        server_port=7868,
        share=True  # Genera link p√∫blico autom√°ticamente
    )

print("\n" + "="*60)
print("‚úÖ commonBench v2 est√° corriendo en puerto 7868")
print("‚úÖ Link compartible generado autom√°ticamente")
print("="*60)

---
## üìã AP√âNDICE: M√©tricas y Competencias

### M√©tricas implementadas vs. Frameworks profesionales

| Categor√≠a | commonBench v2 | HELM (Stanford) | Notas |
|-----------|---------------|-----------------|-------|
| Latencia | ‚úÖ `latency_sec` | ‚úÖ | Est√°ndar |
| TTFT | ‚úÖ `ttft_sec` | ‚ö†Ô∏è Parcial | UX cr√≠tica |
| Throughput | ‚úÖ `tokens_per_sec` | ‚úÖ | Est√°ndar |
| Coste | ‚úÖ Din√°mico | ‚ùå | Ventaja |
| Error rate | ‚úÖ `error_rate_%` | ‚úÖ | Calidad operativa |
| Legibilidad | ‚úÖ Flesch | ‚úÖ | Est√°ndar |
| Diversidad | ‚úÖ TTR | ‚úÖ | Est√°ndar |

### Competencias demostradas

**"Familiarity with evaluation metrics and quality assessment frameworks":**

1. **Operational vs Model Quality**: Distinguir entre calidad de respuestas y fiabilidad del servicio
2. **TTFT**: Entender m√©tricas de UX espec√≠ficas de LLMs
3. **Error rate como m√©trica**: No ignorar fallos, medirlos
4. **Precios din√°micos**: Veracidad de m√©tricas de coste
5. **Reproducibilidad**: Metadata completa para auditor√≠a

### Vocabulario para entrevistas

- "Operational Quality" vs "Model Quality"
- "Time to First Token (TTFT)"
- "Error rate como m√©trica de calidad"
- "Reproducibilidad temporal"
- "Exponential backoff con jitter"
- "Cach√© con TTL"