<a href="https://colab.research.google.com/github/galenzo17/AI-personal-test/blob/main/agus_bench.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Instalar las bibliotecas necesarias
!pip install openai huggingface_hub requests

import openai
import subprocess
import time
import json
import os
from difflib import SequenceMatcher
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field

# -------------------- Configuración de Autenticación -------------------- #

# Cargar claves API desde variables de entorno para mayor seguridad
# Puedes establecer estas variables en Colab usando:
# import os
# os.environ['OPENAI_API_KEY'] = 'tu_clave_api'
# os.environ['HUGGINGFACE_API_KEY'] = 'tu_clave_api_hf'

OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')  # Clave API de OpenAI
HUGGINGFACE_API_KEY = os.getenv('HUGGINGFACE_API_KEY')  # Clave API de Hugging Face

# Configurar la clave API de OpenAI
if OPENAI_API_KEY:
    openai.api_key = OPENAI_API_KEY
else:
    print("⚠️ OpenAI API key not found. OpenAI models will be skipped.")

# -------------------- Definición de Clases para Modelos -------------------- #

@dataclass
class BenchmarkQuestion:
    question: str
    expected_answer: str

@dataclass
class ModelResult:
    question: str
    expected_answer: str
    correct_responses: int
    consistency: float
    answers: List[Dict[str, Any]] = field(default_factory=list)

class BaseModel:
    def __init__(self, name: str):
        self.name = name

    def query(self, prompt: str) -> Optional[str]:
        raise NotImplementedError("Esta método debe ser implementado por subclases.")

class OpenAIModel(BaseModel):
    def __init__(self, name: str, model_name: str, temperature: float = 0):
        super().__init__(name)
        self.model_name = model_name
        self.temperature = temperature

    def query(self, prompt: str) -> Optional[str]:
        try:
            response = openai.ChatCompletion.create(
                model=self.model_name,
                messages=[
                    {"role": "system", "content": "Eres un asistente útil."},
                    {"role": "user", "content": prompt}
                ],
                temperature=self.temperature,
                max_tokens=150
            )
            answer = response.choices[0].message['content'].strip()
            return answer
        except Exception as e:
            print(f"Error al consultar OpenAI model '{self.name}': {e}")
            return None

class HuggingFaceModel(BaseModel):
    def __init__(self, name: str, model_id: str, huggingface_api_key: Optional[str] = None):
        super().__init__(name)
        self.model_id = model_id
        self.hf_api_key = huggingface_api_key

    def query(self, prompt: str) -> Optional[str]:
        headers = {}
        if self.hf_api_key:
            headers["Authorization"] = f"Bearer {self.hf_api_key}"
        payload = {
            "inputs": prompt,
            "options": {"use_cache": False}
        }
        try:
            response = requests.post(
                f"https://api-inference.huggingface.co/models/{self.model_id}",
                headers=headers,
                json=payload
            )
            if response.status_code == 200:
                data = response.json()
                # Manejar diferentes formatos de respuesta
                if isinstance(data, list) and 'generated_text' in data[0]:
                    return data[0]['generated_text'].strip()
                elif isinstance(data, dict) and 'generated_text' in data:
                    return data['generated_text'].strip()
                else:
                    return json.dumps(data)
            else:
                print(f"Error en Hugging Face model '{self.name}': {response.status_code} - {response.text}")
                return None
        except Exception as e:
            print(f"Exception al consultar Hugging Face model '{self.name}': {e}")
            return None

class OllamaModel(BaseModel):
    def __init__(self, name: str, model_command: str):
        super().__init__(name)
        self.model_command = model_command  # Comando completo para ejecutar el modelo

    def query(self, prompt: str) -> Optional[str]:
        try:
            # Ejecutar el comando de Ollama con el prompt como entrada
            process = subprocess.Popen(
                self.model_command,
                shell=True,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            stdout, stderr = process.communicate(input=prompt, timeout=60)
            if process.returncode == 0:
                return stdout.strip()
            else:
                print(f"Error en Ollama model '{self.name}': {stderr}")
                return None
        except subprocess.TimeoutExpired:
            process.kill()
            print(f"Timeout al consultar Ollama model '{self.name}'.")
            return None
        except Exception as e:
            print(f"Exception al consultar Ollama model '{self.name}': {e}")
            return None

# -------------------- Definición de Funciones de Evaluación -------------------- #

def evaluate_answer(model_answer: str, expected_answer: str, similarity_threshold: float = 0.8) -> bool:
    """
    Evalúa la respuesta del modelo comparándola con la respuesta esperada.
    Utiliza una métrica de similitud basada en SequenceMatcher.
    """
    if not model_answer or not expected_answer:
        return False

    # Normalizar respuestas
    model_answer_norm = model_answer.lower().strip()
    expected_answer_norm = expected_answer.lower().strip()

    # Comparación directa
    if model_answer_norm == expected_answer_norm:
        return True

    # Comparación basada en similitud
    similarity = SequenceMatcher(None, model_answer_norm, expected_answer_norm).ratio()
    return similarity >= similarity_threshold

# -------------------- Definición de Preguntas para el Benchmark -------------------- #

benchmark_questions = [
    BenchmarkQuestion(
        question="¿Cuál es la capital de Francia?",
        expected_answer="París"
    ),
    BenchmarkQuestion(
        question="Resuelve la siguiente operación matemática: 15 * 12.",
        expected_answer="180"
    ),
    BenchmarkQuestion(
        question="¿Quién escribió 'Cien Años de Soledad'?",
        expected_answer="Gabriel García Márquez"
    )
]

# -------------------- Definición de Modelos a Evaluar -------------------- #

# Lista de modelos a evaluar. Puedes agregar o quitar modelos según tus necesidades.
models_to_evaluate: List[BaseModel] = []

# Ejemplo: Agregar un modelo de OpenAI
if OPENAI_API_KEY:
    models_to_evaluate.append(
        OpenAIModel(
            name="OpenAI-GPT-3.5-Turbo",
            model_name="gpt-3.5-turbo",
            temperature=0
        )
    )

# Ejemplo: Agregar un modelo de Hugging Face
if HUGGINGFACE_API_KEY:
    models_to_evaluate.append(
        HuggingFaceModel(
            name="HuggingFace-GPT-2",
            model_id="gpt2",
            huggingface_api_key=HUGGINGFACE_API_KEY
        )
    )

# Ejemplo: Agregar un modelo de Ollama (Debes ejecutar este script localmente)
# Descomenta y configura las siguientes líneas si estás ejecutando el script localmente
"""
models_to_evaluate.append(
    OllamaModel(
        name="Ollama-Llama-3.2-1B-Instruct",
        model_command="ollama run hf.co/bartowski/Llama-3.2-1B-Instruct-GGUF"
    )
)
"""

# -------------------- Función para Ejecutar el Benchmark -------------------- #

def run_benchmark(models: List[BaseModel], questions: List[BenchmarkQuestion], num_runs: int = 5) -> Dict[str, List[ModelResult]]:
    results = {}
    for model in models:
        print(f"\n===== Evaluando el modelo: {model.name} =====")
        model_results = []
        for q in questions:
            print(f"\nPregunta: {q.question}")
            print(f"Respuesta Esperada: {q.expected_answer}")
            correct_count = 0
            answers = []
            for run in range(1, num_runs + 1):
                print(f"  Consulta {run}/{num_runs}...")
                answer = model.query(q.question)
                if answer is None:
                    print("    No se pudo obtener una respuesta.")
                    answers.append({
                        "run": run,
                        "answer": None,
                        "is_correct": False
                    })
                    continue
                is_correct = evaluate_answer(answer, q.expected_answer)
                if is_correct:
                    correct_count += 1
                answers.append({
                    "run": run,
                    "answer": answer,
                    "is_correct": is_correct
                })
                status = "✅ Correcta" if is_correct else "❌ Incorrecta"
                print(f"    Respuesta: {answer} | {status}")
                time.sleep(1)  # Espera para evitar exceder los límites de la API
            consistency = correct_count / num_runs
            model_results.append(ModelResult(
                question=q.question,
                expected_answer=q.expected_answer,
                correct_responses=correct_count,
                consistency=consistency,
                answers=answers
            ))
        results[model.name] = model_results
    return results

# -------------------- Función para Presentar los Resultados -------------------- #

def display_results(results: Dict[str, List[ModelResult]]):
    for model_name, model_results in results.items():
        print(f"\n\n===== Resultados para el modelo: {model_name} =====\n")
        for res in model_results:
            print(f"Pregunta: {res.question}")
            print(f"Respuesta Esperada: {res.expected_answer}")
            print(f"Respuestas Correctas: {res.correct_responses} de {len(res.answers)}")
            print(f"Consistencia: {res.consistency * 100:.2f}%")
            print("Detalles de las respuestas:")
            for ans in res.answers:
                status = "✅ Correcta" if ans['is_correct'] else "❌ Incorrecta"
                print(f"  - Consulta {ans['run']}: {ans['answer']} | {status}")
            print("\n-----------------------------------\n")

# -------------------- Ejecución Principal -------------------- #

if __name__ == "__main__":
    # Ejecutar el benchmark
    benchmark_results = run_benchmark(models_to_evaluate, benchmark_questions, num_runs=5)

    # Presentar los resultados
    display_results(benchmark_results)

    # Opcional: Guardar los resultados en un archivo JSON
    with open("benchmark_results.json", "w", encoding="utf-8") as f:
        json.dump(benchmark_results, f, ensure_ascii=False, indent=4)
    print("\n✅ Resultados guardados en 'benchmark_results.json'")
