# Instalar dependencias

In [None]:
#Instalar dependencias - ejecutar si es la primera vez
#%pip install openai python-dotenv gradio PyMuPDF docx2txt
#%pip install --upgrade diffusers transformers accelerate scipy safetensors
#%pip install --upgrade huggingface_hub[hf_xet]  # Incluye hf_xet para mejor rendimiento
#%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121  # Para GPU CUDA 12.1
#%pip install Pillow  # Para manipulación de imágenes
#%pip install deep-translator  # Para traducir prompts automáticamente

# Si hay errores de compatibilidad, intenta:
#pip install diffusers transformers --force-reinstall
#pip install diffusers==0.21.4 transformers==4.35.2 --force-reinstall

# Importar librerias

In [None]:
import gradio as gr
import os
import io
import time
import uuid
import base64
from datetime import datetime
from PIL import Image, ImageDraw
from requests.exceptions import Timeout, ConnectionError
from dotenv import load_dotenv
from openai import OpenAI
from huggingface_hub import InferenceClient
import torch 
from diffusers import StableDiffusionPipeline
import fitz # PyMuPDF
import docx2txt
import hashlib

# Definir las variables de las API KEYS

In [49]:
load_dotenv(override=True)
open_router_api_key = os.getenv('OPEN_ROUTER_API_KEY')
gemini_api_key = os.getenv('GEMINI_API_KEY')
groq_api_key = os.getenv('GROQ_API_KEY')
hf_token = os.getenv('HF_TOKEN')  

### Modelos no disponibles

In [50]:
if not hf_token:
    print("⚠️Advertencia: HF_TOKEN no encontrado. Helsinki no funcionará.")
else:
    print("HF_TOKEN configurado correctamente.")

HF_TOKEN configurado correctamente.


# Conectar los distintos modelos

## Google

In [51]:
gemini_model = dict(model=OpenAI(api_key=gemini_api_key, base_url="https://generativelanguage.googleapis.com/v1beta/openai/"), model_name="gemini-2.5-flash")

## Helsinki

In [52]:
helsinki_client = InferenceClient(
    api_key=hf_token,
)

class HelsinkiModel:
    def __init__(self, client):
        self.client = client
        self.model_name = "Helsinki-NLP/opus-mt-en-es"
        
    def translate(self, text):
        try:
            result = self.client.translation(text, model=self.model_name)
            
            if isinstance(result, list) and len(result) > 0:
                return result[0].get("translation_text", result[0])
            elif isinstance(result, dict):
                return result.get("translation_text", str(result))
            else:
                return str(result)
                
        except Exception as e:
            return f"Error en traducción: {str(e)}"
    
    def chat_completions_create(self, messages):
        try:
            user_message = ""
            for msg in reversed(messages):
                if msg["role"] == "user":
                    user_message = msg["content"]
                    break
            
            translation = self.translate(user_message)
            
            class Choice:
                def __init__(self, translation):
                    self.message = type('obj', (object,), {'content': translation})()
            
            class Response:
                def __init__(self, translation):
                    self.choices = [Choice(translation)]
            
            return Response(translation)
        except Exception as e:
            class Choice:
                def __init__(self, error):
                    self.message = type('obj', (object,), {'content': f"Error Helsinki: {error}"})()
            
            class Response:
                def __init__(self, error):
                    self.choices = [Choice(error)]
            
            return Response(str(e))

helsinki_model_instance = HelsinkiModel(helsinki_client)

helsinki_model = dict(
    model=helsinki_model_instance, 
    model_name="Helsinki-NLP/opus-mt-en-es"
)

## Runwayml

In [None]:
class RunwaymlModel:
    def __init__(self):
        self.model_id = "runwayml/stable-diffusion-v1-5"
        self.pipe = None
        self.model_name = "runwayml/stable-diffusion-v1-5"
        self.initialized = False
        self.use_placeholder = False
        
    def initialize_model(self):
        if self.initialized:
            return
            
        print("Intentando inicializar Stable Diffusion...")
        
        try:
            self.pipe = StableDiffusionPipeline.from_pretrained(
                self.model_id,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                safety_checker=None,
                requires_safety_checker=False,
                use_safetensors=True
            )
            
            device = "cuda" if torch.cuda.is_available() else "cpu"
            print(f"Usando dispositivo: {device}")
            
            self.pipe = self.pipe.to(device)
            
            if torch.cuda.is_available():
                self.pipe.enable_attention_slicing()
                try:
                    self.pipe.enable_model_cpu_offload()
                except:
                    print("⚠️CPU offload no disponible")
            
            print("Stable Diffusion inicializado correctamente")
            self.use_placeholder = False
            self.initialized = True
            
        except ImportError as e:
            self.use_placeholder = True
            self.pipe = "placeholder"
            self.initialized = True
            
        except Exception as e:
            self.use_placeholder = True
            self.pipe = "placeholder"
            self.initialized = True
        
    def generate_image(self, prompt, negative_prompt="blurry, low quality, distorted", num_inference_steps=20, guidance_scale=7.5):
        try:
            if not self.initialized:
                self.initialize_model()
            
            if self.use_placeholder or self.pipe == "placeholder":
                return self._generate_placeholder(prompt)
            else:
                return self._generate_with_diffusion(prompt, negative_prompt, num_inference_steps, guidance_scale)
                
        except Exception as e:
            print(f"Error en generación, usando placeholder: {e}")
            return self._generate_placeholder(prompt)
    
    def _generate_with_diffusion(self, prompt, negative_prompt, num_inference_steps, guidance_scale):
        try:          
            generation_kwargs = {
                "prompt": prompt,
                "negative_prompt": negative_prompt,
                "num_inference_steps": num_inference_steps,
                "guidance_scale": guidance_scale,
                "height": 512,
                "width": 512,
            }
            
            # Generar imagen
            with torch.autocast("cuda" if torch.cuda.is_available() else "cpu"):
                result = self.pipe(**generation_kwargs)
                image = result.images[0]
            
            # Convertir a base64
            buffer = io.BytesIO()
            image.save(buffer, format='PNG')
            img_base64 = base64.b64encode(buffer.getvalue()).decode()
            
            # Guardar imagen
            timestamp = int(time.time())
            os.makedirs("images", exist_ok=True)
            image_filename = f"images/stable_diffusion_{timestamp}.png"
            image.save(image_filename)
            
            print(f"Imagen generada con Stable Diffusion: {image_filename}")
            return image, img_base64, image_filename
            
        except Exception as e:
            print(f"Error con Stable Diffusion, usando placeholder: {e}")
            return self._generate_placeholder(prompt)
    
    def _generate_placeholder(self, prompt):
        """Genera una imagen placeholder visual"""
        try:
            timestamp = int(time.time())
            hash_value = int(hashlib.md5(prompt.encode()).hexdigest()[:6], 16)
            
            r = (hash_value >> 16) & 255
            g = (hash_value >> 8) & 255
            b = hash_value & 255
            
            # Asegurar colores visibles
            r = max(50, min(200, r))
            g = max(50, min(200, g))
            b = max(50, min(200, b))
            
            img = Image.new('RGB', (512, 512), color=(r, g, b))
            draw = ImageDraw.Draw(img)
            
            try:
                words = prompt.split()[:4]
                text = " ".join(words)
                
                bbox = draw.textbbox((0, 0), text)
                text_width = bbox[2] - bbox[0]
                text_height = bbox[3] - bbox[1]
                
                x = (512 - text_width) // 2
                y = (512 - text_height) // 2
                
                for dx in [-1, 0, 1]:
                    for dy in [-1, 0, 1]:
                        if dx != 0 or dy != 0:
                            draw.text((x + dx, y + dy), text, fill=(0, 0, 0))
                
                draw.text((x, y), text, fill=(255, 255, 255))
                
                placeholder_text = "PLACEHOLDER"
                bbox2 = draw.textbbox((0, 0), placeholder_text)
                text_width2 = bbox2[2] - bbox2[0]
                x2 = (512 - text_width2) // 2
                y2 = 450
                
                draw.text((x2, y2), placeholder_text, fill=(200, 200, 200))
                
            except Exception as text_error:
                print(f"Error con texto: {text_error}")
                draw.ellipse([200, 200, 312, 312], fill=(255, 255, 255))
            
            buffer = io.BytesIO()
            img.save(buffer, format='PNG')
            img_base64 = base64.b64encode(buffer.getvalue()).decode()
            
            os.makedirs("images", exist_ok=True)
            image_filename = f"images/placeholder_image_{timestamp}.png"
            img.save(image_filename)
            
            return img, img_base64, image_filename
            
        except Exception as e:
            print(f"Error generando imagen placeholder: {e}")
            raise e
    
    def chat_completions_create(self, messages):
        """Interfaz compatible con OpenAI para generación de imágenes"""
        try:
            user_message = ""
            for msg in reversed(messages):
                if msg["role"] == "user":
                    user_message = msg["content"]
                    break
            
            if not user_message:
                raise Exception("No se encontró prompt para generar imagen")
            
            image, img_base64, filename = self.generate_image(user_message)
            
            if self.use_placeholder or self.pipe == "placeholder":
                response_content = f"![Imagen generada](data:image/png;base64,{img_base64})\n"
            else:
                response_content = f"![Imagen generada](data:image/png;base64,{img_base64})\n"
            
            class Choice:
                def __init__(self, content):
                    self.message = type('obj', (object,), {'content': content})()
            
            class Response:
                def __init__(self, content):
                    self.choices = [Choice(content)]
            
            return Response(response_content)
            
        except Exception as e:
            error_msg = f"**Error generando imagen:** {str(e)}"
            print(error_msg)
            
            class Choice:
                def __init__(self, error):
                    self.message = type('obj', (object,), {'content': error})()
            
            class Response:
                def __init__(self, error):
                    self.choices = [Choice(error)]
            
            return Response(error_msg)

runwayml_model_instance = RunwaymlModel()

runwayml_model = dict(
    model=runwayml_model_instance, 
    model_name="runwayml/stable-diffusion-v1-5"

)

### stabilityai

In [54]:
# Modelo gratuito de Hugging Face para generación de imágenes
class HuggingFaceImageModel:
    def __init__(self, client):
        self.client = client
        self.model_name = "stabilityai/stable-diffusion-xl-base-1.0"
        
    def generate_image_hf(self, prompt):
        try:
            # Usar el cliente de Hugging Face para generar imagen
            image = self.client.text_to_image(
                prompt=prompt,
                model=self.model_name
            )
            
            # La imagen ya es un objeto PIL, no necesita conversión desde bytes
            # Convertir a base64
            buffer = io.BytesIO()
            image.save(buffer, format='PNG')
            img_base64 = base64.b64encode(buffer.getvalue()).decode()
            
            # Guardar imagen
            timestamp = int(time.time())
            os.makedirs("images", exist_ok=True)
            image_filename = f"images/hf_sdxl_{timestamp}.png"
            image.save(image_filename)
            
            print(f"Imagen generada con Hugging Face SDXL: {image_filename}")
            return image, img_base64, image_filename
            
        except Exception as e:
            print(f"Error con Hugging Face, generando placeholder: {e}")
            return self._generate_placeholder(prompt)
    
    def _generate_placeholder(self, prompt):
        """Genera una imagen placeholder visual"""
        try:
            timestamp = int(time.time())
            hash_value = int(hashlib.md5(prompt.encode()).hexdigest()[:6], 16)
            
            r = (hash_value >> 16) & 255
            g = (hash_value >> 8) & 255
            b = hash_value & 255
            
            r = max(50, min(200, r))
            g = max(50, min(200, g))
            b = max(50, min(200, b))
            
            img = Image.new('RGB', (512, 512), color=(r, g, b))
            draw = ImageDraw.Draw(img)
            
            try:
                words = prompt.split()[:4]
                text = " ".join(words)
                
                bbox = draw.textbbox((0, 0), text)
                text_width = bbox[2] - bbox[0]
                text_height = bbox[3] - bbox[1]
                
                x = (512 - text_width) // 2
                y = (512 - text_height) // 2
                
                for dx in [-1, 0, 1]:
                    for dy in [-1, 0, 1]:
                        if dx != 0 or dy != 0:
                            draw.text((x + dx, y + dy), text, fill=(0, 0, 0))
                
                draw.text((x, y), text, fill=(255, 255, 255))
                
                placeholder_text = "HF PLACEHOLDER"
                bbox2 = draw.textbbox((0, 0), placeholder_text)
                text_width2 = bbox2[2] - bbox2[0]
                x2 = (512 - text_width2) // 2
                y2 = 450
                
                draw.text((x2, y2), placeholder_text, fill=(200, 200, 200))
                
            except Exception as text_error:
                print(f"Error con texto: {text_error}")
                draw.ellipse([200, 200, 312, 312], fill=(255, 255, 255))
            
            buffer = io.BytesIO()
            img.save(buffer, format='PNG')
            img_base64 = base64.b64encode(buffer.getvalue()).decode()
            
            os.makedirs("images", exist_ok=True)
            image_filename = f"images/placeholder_hf_{timestamp}.png"
            img.save(image_filename)
            
            return img, img_base64, image_filename
            
        except Exception as e:
            print(f"Error generando placeholder: {e}")
            raise e
    
    def chat_completions_create(self, messages):
        """Interfaz compatible con OpenAI para generación de imágenes"""
        try:
            user_message = ""
            for msg in reversed(messages):
                if msg["role"] == "user":
                    user_message = msg["content"]
                    break
            
            if not user_message:
                raise Exception("No se encontró prompt para generar imagen")
            
            image, img_base64, filename = self.generate_image_hf(user_message)
            
            response_content = f"![Imagen generada](data:image/png;base64,{img_base64})\n**Generado con Stable Diffusion XL (Hugging Face)**"
            
            class Choice:
                def __init__(self, content):
                    self.message = type('obj', (object,), {'content': content})()
            
            class Response:
                def __init__(self, content):
                    self.choices = [Choice(content)]
            
            return Response(response_content)
            
        except Exception as e:
            error_msg = f"**Error generando imagen:** {str(e)}"
            print(error_msg)
            
            class Choice:
                def __init__(self, error):
                    self.message = type('obj', (object,), {'content': error})()
            
            class Response:
                def __init__(self, error):
                    self.choices = [Choice(error)]
            
            return Response(error_msg)

hf_image_model_instance = HuggingFaceImageModel(helsinki_client)

stability_model = dict(
    model=hf_image_model_instance,
    model_name="stabilityai/stable-diffusion-xl-base-1.0"

)

## OpenRouterAI

In [55]:
open_router_model = dict(model=OpenAI(
  base_url="https://openrouter.ai/api/v1",
  api_key=open_router_api_key,
), 
model_name="deepseek/deepseek-r1-0528")

## Groq

In [56]:
groq_model = dict(model=OpenAI(api_key=groq_api_key, base_url="https://api.groq.com/openai/v1"), model_name="llama-3.3-70b-versatile")

## Funcion de chat

In [57]:
metrics_data = {
    "total_requests": 0,
    "successful_requests": 0,
    "failed_requests": 0,
    "requests_by_model": {},
    "requests_by_task": {},
    "response_times": [],
    "errors": [],
    "session_start": datetime.now().isoformat(),
    "last_request": None,
    "images_generated": 0,
    "translations_made": 0
}

def log_metric(metric_type, **kwargs):
    """Registra métricas del sistema"""
    global metrics_data
    
    try:
        current_time = datetime.now().isoformat()
        
        if metric_type == "request_start":
            metrics_data["total_requests"] += 1
            metrics_data["last_request"] = current_time
            
            # Contar por modelo
            model = kwargs.get("model", "unknown")
            if model not in metrics_data["requests_by_model"]:
                metrics_data["requests_by_model"][model] = 0
            metrics_data["requests_by_model"][model] += 1
            
            # Contar por tarea
            task = kwargs.get("task", "unknown")
            if task not in metrics_data["requests_by_task"]:
                metrics_data["requests_by_task"][task] = 0
            metrics_data["requests_by_task"][task] += 1
            
        elif metric_type == "request_success":
            metrics_data["successful_requests"] += 1
            response_time = kwargs.get("response_time", 0)
            metrics_data["response_times"].append(response_time)
            
            # Contar tipos específicos
            task = kwargs.get("task", "")
            if task == "imagenes":
                metrics_data["images_generated"] += 1
            elif task == "traduccion":
                metrics_data["translations_made"] += 1
                
        elif metric_type == "request_error":
            metrics_data["failed_requests"] += 1
            error_info = {
                "timestamp": current_time,
                "model": kwargs.get("model", "unknown"),
                "task": kwargs.get("task", "unknown"),
                "error": kwargs.get("error", "Unknown error"),
                "user_message": kwargs.get("user_message", "")[:100]  # Primeros 100 chars
            }
            metrics_data["errors"].append(error_info)
            
            # Mantener solo los últimos 50 errores para no usar demasiada memoria
            if len(metrics_data["errors"]) > 50:
                metrics_data["errors"] = metrics_data["errors"][-50:]
                
    except Exception as e:
        print(f"Error registrando métrica: {e}")

def get_metrics_summary():
    try:
        response_times = metrics_data["response_times"]
        if response_times:
            avg_response_time = sum(response_times) / len(response_times)
            min_response_time = min(response_times)
            max_response_time = max(response_times)
        else:
            avg_response_time = min_response_time = max_response_time = 0
        
        # Calcular tasa de éxito
        total = metrics_data["total_requests"]
        success_rate = (metrics_data["successful_requests"] / total * 100) if total > 0 else 0
        
        # Tiempo de sesión
        session_start = datetime.fromisoformat(metrics_data["session_start"])
        session_duration = datetime.now() - session_start
        
        # Modelo más usado
        model_usage = metrics_data["requests_by_model"]
        most_used_model = max(model_usage.items(), key=lambda x: x[1]) if model_usage else ("Ninguno", 0)
        
        # Tarea más usada
        task_usage = metrics_data["requests_by_task"]
        most_used_task = max(task_usage.items(), key=lambda x: x[1]) if task_usage else ("Ninguna", 0)
        
        summary = f"""# **Métricas del ChatBot**

## **Estadísticas Generales**
- **Total de solicitudes:** {total}
- **Solicitudes exitosas:** {metrics_data["successful_requests"]} ({success_rate:.1f}%)
- **Solicitudes fallidas:** {metrics_data["failed_requests"]}
- **Imágenes generadas:** {metrics_data["images_generated"]}
- **Traducciones realizadas:** {metrics_data["translations_made"]}

## **Tiempos de Respuesta**
- **Promedio:** {avg_response_time:.2f}s
- **Mínimo:** {min_response_time:.2f}s
- **Máximo:** {max_response_time:.2f}s
- **Total de mediciones:** {len(response_times)}

## **Uso por Modelo**
"""
        # Agregar uso por modelo
        for model, count in sorted(model_usage.items(), key=lambda x: x[1], reverse=True):
            percentage = (count / total * 100) if total > 0 else 0
            summary += f"- **{model.capitalize()}:** {count} ({percentage:.1f}%)\n"
        
        summary += f"\n## **Uso por Tarea**\n"
        
        # Agregar uso por tarea
        for task, count in sorted(task_usage.items(), key=lambda x: x[1], reverse=True):
            percentage = (count / total * 100) if total > 0 else 0
            summary += f"- **{task.capitalize()}:** {count} ({percentage:.1f}%)\n"
        
        summary += f"""
## **Información de Sesión**
- **Inicio de sesión:** {session_start.strftime("%Y-%m-%d %H:%M:%S")}
- **Duración:** {str(session_duration).split('.')[0]}
- **Última solicitud:** {metrics_data["last_request"] or "Ninguna"}

## **Más Utilizados**
- **Modelo más usado:** {most_used_model[0]} ({most_used_model[1]} usos)
- **Tarea más usada:** {most_used_task[0]} ({most_used_task[1]} usos)
"""
        # Agregar errores recientes si los hay
        recent_errors = metrics_data["errors"][-5:] if metrics_data["errors"] else []
        if recent_errors:
            summary += "\n## **Errores Recientes (Últimos 5)**\n"
            for i, error in enumerate(recent_errors, 1):
                timestamp = error["timestamp"][:19].replace("T", " ")  # Formato legible
                summary += f"{i}. **[{timestamp}]** {error['model']} - {error['task']}: {error['error'][:100]}...\n"
        
        return summary
        
    except Exception as e:
        return f"**Error generando métricas:** {str(e)}"

def reset_metrics():
    global metrics_data
    metrics_data = {
        "total_requests": 0,
        "successful_requests": 0,
        "failed_requests": 0,
        "requests_by_model": {},
        "requests_by_task": {},
        "response_times": [],
        "errors": [],
        "session_start": datetime.now().isoformat(),
        "last_request": None,
        "images_generated": 0,
        "translations_made": 0
    }
    return "**Métricas reiniciadas correctamente**"

def obtener_valor(dict: dict, key: str):
    return dict.get(key, "Clave no encontrada")

models = dict(
    gemini=gemini_model,
    open_router=open_router_model,
    groq=groq_model,
    helsinki=helsinki_model,  
    runwayml=runwayml_model,
    stabilityai=stability_model
)

tasks_config = {
    "traduccion": {
        "models": ["gemini", "helsinki"], 
        "description": "Traducir texto entre idiomas (Helsinki: solo inglés → español)"
    },
    "resumen": {
        "models": ["groq", "open_router"],
        "description": "Resumir texto o documentos"
    },
    "imagenes": {
        "models": ["runwayml", "stabilityai"],  
        "description": "Generar imágenes usando Stable Diffusion local o Hugging Face SDXL (gratis)"
    }
}
sent_files = []

def get_available_models(task):
    """Obtiene los modelos disponibles para una tarea específica"""
    return tasks_config.get(task, {}).get("models", [])

def validate_input(message):
    """Valida el mensaje de entrada"""
    if not message:
        return False, "**Error**: No puedes enviar un mensaje vacío. Por favor, escribe algo."
    
    if len(message.strip()) == 0:
        return False, "**Error**: El mensaje solo contiene espacios en blanco. Por favor, escribe un mensaje válido."
    
    if len(message) > 8000:
        return False, "**Error**: El mensaje es demasiado largo (máximo 8000 caracteres). Por favor, acórtalo."
    
    return True, None

def format_inference_time(seconds):
    if seconds < 1:
        return f"{seconds*1000:.0f}ms"
    elif seconds < 60:
        return f"{seconds:.2f}s"
    else:
        minutes = int(seconds // 60)
        remaining_seconds = seconds % 60
        return f"{minutes}m {remaining_seconds:.1f}s"

def create_initial_chat():
    chat_id = f"Chat-{str(uuid.uuid4())[:8]}"
    return chat_id, []

def chat(message: dict, history, model, task):   
    total_start_time = time.time()
    log_metric("request_start", model=model, task=task, user_message=message)
    is_valid, error_msg = validate_input(message["text"])
    if not is_valid:
        log_metric("request_error", model=model, task=task, error="Invalid input", user_message=message)
        return error_msg
    try:
        print(f"Tarea seleccionada: {task}")
        print(f"Modelo seleccionado: {model}")
        if task not in tasks_config:
            error = f"Tarea '{task}' no reconocida"
            log_metric("request_error", model=model, task=task, error=error, user_message=message)
            return f"❌ **Error**: {error}. Tareas disponibles: {', '.join(tasks_config.keys())}"
        
        available_models = get_available_models(task)
        if not available_models:
            error = f"No hay modelos disponibles para la tarea '{task}'"
            log_metric("request_error", model=model, task=task, error=error, user_message=message)
            return f"❌ **Error**: {error}."
        
        if model not in available_models:
            error = f"El modelo '{model}' no está disponible para la tarea '{task}'"
            log_metric("request_error", model=model, task=task, error=error, user_message=message)
            return f"❌ **Error**: {error}. Modelos disponibles: {', '.join(available_models)}"
        
        if model not in models:
            error = f"El modelo '{model}' no está configurado en el sistema"
            log_metric("request_error", model=model, task=task, error=error, user_message=message)
            return f"❌ **Error**: {error}."
        
        model_config = models[model]
        if not model_config or obtener_valor(model_config, "model") == "Clave no encontrada":
            error = f"El modelo '{model}' no está disponible. Verifica la configuración de API keys"
            log_metric("request_error", model=model, task=task, error=error, user_message=message)
            return f"❌ **Error**: {error}."

        print(f"modelo: {obtener_valor(model_config, 'model_name')}")

        system_prompt = "Eres un asistente de IA que responde preguntas y ayuda con tareas."
        
        if task == "traduccion":
            if model == "helsinki":
                system_prompt = "Eres un traductor especializado que traduce ÚNICAMENTE de inglés a español usando modelos Helsinki-NLP. Solo acepta texto en inglés y lo traduce al español."
            else:
                system_prompt = "Eres un traductor experto. Tu tarea es traducir texto entre diferentes idiomas de manera precisa y natural."
        elif task == "resumen":
            system_prompt = "Eres un asistente especializado en resumir texto. Tu tarea es crear resúmenes claros y concisos del contenido proporcionado."
        elif task == "imagenes":
            if model == "runwayml":
                system_prompt = "Eres un generador de imágenes que usa Stable Diffusion cuando está disponible, o genera placeholders visuales cuando hay problemas técnicos. Conviertes descripciones de texto en imágenes."

        messages = [
            {"role": "system", "content": f"{system_prompt}"}
        ]

        if message["files"]:
            sent_files.append(message["files"][0])
            for file in message["files"]:
                if os.path.basename(file).endswith('.pdf'):
                    file_content = read_file_pdf(file)
                elif os.path.basename(file).endswith('.txt'):
                    file_content = read_file_txt(file)
                elif os.path.basename(file).endswith('.docx'):
                    file_content = read_file_docx(file)
                else:
                    file_content = "Tipo de archivo no soportado. Solo se permiten archivos .txt, .pdf y .docx"
                
                # Añadir el contenido del archivo al historial como un mensaje del sistema
                history.append({"role": "system", "content": f"Contenido del archivo \"{os.path.basename(file)}\": {file_content}"})

        for msg in history:
            if valid_history(msg, sent_files[0] if sent_files else ""):
                messages.append({"role": msg["role"], "content": msg["content"]})
        
        messages.append({"role": "user", "content": message["text"]})
        model_instance = obtener_valor(model_config, "model")
        response_content = None
        
        inference_start_time = None
        inference_end_time = None
        total_attempts = 0
        
        max_retries = 2
        for attempt in range(max_retries + 1):
            try:
                total_attempts += 1
                
                inference_start_time = time.time()
                if model in ["helsinki", "runwayml", "stabilityai"]:
                    resp = model_instance.chat_completions_create(
                        messages=messages
                    )
                else:
                    resp = model_instance.chat.completions.create(
                        model=obtener_valor(model_config, "model_name"),
                        messages=messages
                    )
                    
                inference_end_time = time.time()

                if not resp or not hasattr(resp, 'choices') or len(resp.choices) == 0:
                    raise Exception("Respuesta vacía del modelo")
                
                if not hasattr(resp.choices[0], 'message') or not hasattr(resp.choices[0].message, 'content'):
                    raise Exception("Formato de respuesta inválido")
                
                response_content = resp.choices[0].message.content
                
                if not response_content or len(response_content.strip()) == 0:
                    raise Exception("El modelo devolvió una respuesta vacía")
                
                break 
                
            except Timeout:
                inference_end_time = time.time()  
                if attempt < max_retries:
                    print(f"Timeout en intento {attempt + 1}, reintentando...")
                    time.sleep(2)  
                    continue
                else:
                    error = f"El modelo '{model}' tardó demasiado en responder"
                    log_metric("request_error", model=model, task=task, error=error, user_message=message)
                    return f"⏱**Error de Timeout**: {error}. Intenta de nuevo en unos momentos."
            
            except ConnectionError:
                inference_end_time = time.time()  
                if attempt < max_retries:
                    print(f"Error de conexión en intento {attempt + 1}, reintentando...")
                    time.sleep(2)
                    continue
                else:
                    error = f"No se pudo conectar con el modelo '{model}'"
                    log_metric("request_error", model=model, task=task, error=error, user_message=message)
                    return f"**Error de Conexión**: {error}. Verifica tu conexión a internet."
            
            except Exception as api_error:
                inference_end_time = time.time()
                error_msg = str(api_error).lower()
                
                if "rate limit" in error_msg or "quota" in error_msg:
                    error = f"Límite de uso alcanzado para el modelo '{model}'"
                    log_metric("request_error", model=model, task=task, error=error, user_message=message)
                    return f"**Límite de Uso**: {error}. Intenta más tarde o cambia de modelo."
                
                elif "authentication" in error_msg or "unauthorized" in error_msg or "api key" in error_msg:
                    error = f"Error de autenticación del modelo '{model}'"
                    log_metric("request_error", model=model, task=task, error=error, user_message=message)
                    return f"**Error de Autenticación**: La API key del modelo '{model}' es inválida o ha expirado. Verifica tu configuración."
                
                elif "not found" in error_msg or "404" in error_msg:
                    error = f"El modelo '{obtener_valor(model_config, 'model_name')}' no está disponible"
                    log_metric("request_error", model=model, task=task, error=error, user_message=message)
                    return f"**Modelo No Encontrado**: {error} en este momento."
                
                elif "server error" in error_msg or "500" in error_msg or "502" in error_msg or "503" in error_msg:
                    if attempt < max_retries:
                        print(f"Error del servidor en intento {attempt + 1}, reintentando...")
                        time.sleep(3)
                        continue
                    else:
                        error = f"El servicio del modelo '{model}' está temporalmente no disponible"
                        log_metric("request_error", model=model, task=task, error=error, user_message=message)
                        return f"🔧 **Error del Servidor**: {error}. Intenta más tarde."
                
                elif "content policy" in error_msg or "safety" in error_msg:
                    error = f"Contenido bloqueado por las políticas de seguridad del modelo '{model}'"
                    log_metric("request_error", model=model, task=task, error=error, user_message=message)
                    return f"⚠️**Contenido Bloqueado**: {error}. Reformula tu pregunta."
                
                else:
                    if attempt < max_retries:
                        print(f"Error genérico en intento {attempt + 1}: {api_error}")
                        time.sleep(2)
                        continue
                    else:
                        error = f"Problema con el modelo '{model}': {str(api_error)[:100]}"
                        log_metric("request_error", model=model, task=task, error=error, user_message=message)
                        return f"**Error**: {error}..."

        if not response_content:
            error = f"No se pudo obtener una respuesta válida del modelo '{model}' después de {max_retries + 1} intentos"
            log_metric("request_error", model=model, task=task, error=error, user_message=message)
            return f"**Error**: {error}."
        
        total_end_time = time.time()
        total_time = total_end_time - total_start_time
        
        if inference_start_time and inference_end_time:
            inference_time = inference_end_time - inference_start_time
        else:
            inference_time = total_time
        
        log_metric("request_success", model=model, task=task, response_time=inference_time)
        
        model_name = obtener_valor(model_config, "model_name")
        model_display = f"{model.capitalize()}"
        if model_name and model_name != "Clave no encontrada":
            model_display += f" ({model_name})"
        
        time_metadata = f"<small style='color: #888; font-size: 0.85em;'>⚡ {format_inference_time(inference_time)}"
        
        if total_attempts > 1:
            time_metadata += f" ({total_attempts}º intento)"
        
        time_metadata += f" • {model_display}</small>"
        
        final_response = response_content + "\n\n" + time_metadata
        
    except Exception as e:
        total_end_time = time.time()
        total_time = total_end_time - total_start_time
        error = f"Error inesperado: {str(e)[:200]}"
        log_metric("request_error", model=model, task=task, error=error, user_message=message)
        print(f"Error inesperado: {e}")
        return f"❌ **Error Inesperado**: Ocurrió un problema técnico: {str(e)[:200]}... \n\n<small style='color: #888;'>⏱️ {format_inference_time(total_time)}</small>"
        
    return final_response    
    
def valid_history(history, file):
    if len(history["content"]) > 0 and isinstance(history["content"], tuple):
        if history["content"][0] == file:
            return False
    return True
    
def read_file_txt(file):
    try:
        if file is None:
            return ""
        with open(file, 'r', encoding='utf-8') as f:
            return f.read()
    except Exception as e:
        return f"Error al leer el archivo: {e}"
    
def read_file_pdf(file):
    try:
        with fitz.open(file) as doc:
            text = ""
            for page in doc:
                text += page.get_text()
        return text
    except Exception as e:
        return f"Error al leer el archivo PDF: {e}"
    
def read_file_docx(file):
    try:
        texto = docx2txt.process(file)
        return texto
    except Exception as e:
        return f"Error al leer DOCX {file}: {e}"
    
def update_models_for_task(task):
    try:
        available_models = get_available_models(task)
        if available_models:
            return gr.Dropdown(choices=available_models, value=available_models[0])
        else:
            return gr.Dropdown(choices=[], value=None)
    except Exception as e:
        print(f"Error al actualizar modelos: {e}")
        return gr.Dropdown(choices=[], value=None)


## Creacion de la interfaz del Chat con Gradio

In [None]:
with gr.Blocks() as app:
    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("## Configuración del Chat")
            
            task_selector = gr.Dropdown(
                choices=list(tasks_config.keys()),
                value="traduccion",
                label="Tipo de tarea"
            )
            
            model_selector = gr.Dropdown(
                choices=get_available_models("traduccion"),
                value="gemini" if get_available_models("traduccion") else None,
                label="Modelo de IA"
            )
            
            task_info = gr.Markdown(f"**Descripción:** {tasks_config['traduccion']['description']}")
            
            with gr.Row():
                metrics_btn = gr.Button("📊 Ver Métricas", size="sm", variant="secondary")
                reset_metrics_btn = gr.Button("🔄 Reiniciar Métricas", size="sm", variant="secondary")
            
            metrics_display = gr.Markdown(
                value="Haz clic en **Ver Métricas** para mostrar las estadísticas de uso.",
                visible=False,
                label="Métricas del Sistema"
            )
            
            metrics_visible = gr.State(False)

        with gr.Column(scale=3):
            current_model = gr.State("gemini")
            current_task = gr.State("traduccion")

            task_selector.change(
                fn=lambda task: [
                    update_models_for_task(task),
                    f"**Descripción:** {tasks_config.get(task, {}).get('description', '')}", 
                    task,
                    get_available_models(task)[0] if get_available_models(task) else None
                ],
                inputs=task_selector,
                outputs=[model_selector, task_info, current_task, current_model]
            )

            model_selector.change(
                fn=lambda model: model,
                inputs=model_selector,
                outputs=current_model
            )

            chatbot = gr.Chatbot(type="messages", height=500)

            chat_interface = gr.ChatInterface(chat, 
                             chatbot=chatbot, 
                             additional_inputs=[current_model, current_task],
                             type="messages", 
                             title="Chat con Modelos de IA", 
                             multimodal=True,
                             save_history=True,
                             autoscroll=True,
                             stop_btn=True
                            )
            # Función para toggle de métricas
            def toggle_metrics(is_visible):
                if is_visible:
                    return False, gr.Markdown(visible=False)
                else:
                    metrics_content = get_metrics_summary()
                    return True, gr.Markdown(value=metrics_content, visible=True)

            # Función para reiniciar métricas
            def reset_metrics_display(is_visible):
                result = reset_metrics()
                if is_visible:
                    return is_visible, gr.Markdown(value=result, visible=True)
                else:
                    return is_visible, gr.Markdown(visible=False)

            metrics_btn.click(
                fn=toggle_metrics,
                inputs=metrics_visible,
                outputs=[metrics_visible, metrics_display]
            )

            reset_metrics_btn.click(
                fn=reset_metrics_display,
                inputs=metrics_visible,
                outputs=[metrics_visible, metrics_display]
            )

app.launch()

* Running on local URL:  http://127.0.0.1:7869
* Running on public URL: https://c01e8425928ad383bd.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




Tarea seleccionada: imagenes
Modelo seleccionado: stabilityai
modelo: stabilityai/stable-diffusion-xl-base-1.0
Imagen generada con Hugging Face SDXL: images/hf_sdxl_1759654294.png
