<a href="https://colab.research.google.com/github/CamiloVga/Curso-IA-Para-Ciencia-de-Datos/blob/main/GenBI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 0. Instalaciones básicas
!pip install -q gradio pandas plotly sqlalchemy openpyxl xlrd seaborn matplotlib wordcloud

# 1. Importaciones y configuración
import subprocess
import json
import pandas as pd
import numpy as np
import sqlite3
import gradio as gr
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from io import StringIO
import time
import os
import re
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any

# Configuración de modelos
MODELO_SQL = "llama3.1:8b"
MODELO_ANALISIS = "llama3.1:8b"
TEMPERATURA_SQL = 0.1
TEMPERATURA_ANALISIS = 0.3

# 2. Instalador de Ollama mejorado
def instalar_ollama():
    """Instala Ollama si no está disponible"""
    try:
        subprocess.run(['ollama', '--version'], capture_output=True, check=True)
        print("✅ Ollama ya instalado")
    except:
        print("📦 Instalando Ollama...")
        os.system('curl -fsSL https://ollama.com/install.sh | sh')
        os.system('pkill ollama || true')
        os.system('nohup /usr/local/bin/ollama serve > /dev/null 2>&1 &')
        time.sleep(10)

        # Descargar modelos necesarios
        print(f"📥 Descargando modelo {MODELO_SQL}...")
        os.system(f'ollama pull {MODELO_SQL}')
        if MODELO_ANALISIS != MODELO_SQL:
            print(f"📥 Descargando modelo {MODELO_ANALISIS}...")
            os.system(f'ollama pull {MODELO_ANALISIS}')
        print("✅ Instalación completada")

# 3. Clase principal GenBI mejorada
class GenBIMejorado:
    def __init__(self):
        self.db = sqlite3.connect(":memory:", check_same_thread=False)
        self.tablas = {}
        self.preguntas_sugeridas = []
        self.datos_cargados = False
        self.historial_consultas = []
        self.cache_respuestas = {}

        # Patrones de interpretación de consultas
        self.patrones_consultas = {
            'agregacion': {
                'total': ['total', 'suma', 'sumar', 'cuánto', 'cuanto'],
                'promedio': ['promedio', 'media', 'average', 'avg'],
                'maximo': ['máximo', 'maximo', 'max', 'mayor', 'más alto'],
                'minimo': ['mínimo', 'minimo', 'min', 'menor', 'más bajo'],
                'conteo': ['cuántos', 'cuantos', 'contar', 'cantidad', 'número']
            },
            'temporal': {
                'tendencia': ['evolución', 'evolucion', 'tendencia', 'histórico', 'historico', 'tiempo'],
                'periodo': ['mes', 'año', 'semana', 'día', 'trimestre', 'fecha']
            },
            'comparacion': {
                'versus': ['vs', 'versus', 'comparar', 'comparación', 'diferencia'],
                'ranking': ['top', 'mejores', 'peores', 'ranking', 'primeros', 'últimos']
            },
            'distribucion': {
                'porcentaje': ['porcentaje', '%', 'proporción', 'proporcion', 'distribución'],
                'agrupacion': ['por', 'según', 'segun', 'agrupar', 'categoría']
            }
        }

    def leer_archivo(self, archivo_path: str) -> Tuple[Optional[pd.DataFrame], str]:
        """Lee múltiples formatos de archivo con mejor manejo de errores"""
        try:
            extension = os.path.splitext(archivo_path)[1].lower()

            # Configuraciones específicas por formato
            if extension == '.csv':
                # Intentar diferentes encodings y separadores
                for encoding in ['utf-8', 'latin-1', 'iso-8859-1']:
                    for sep in [',', ';', '\t', '|']:
                        try:
                            df = pd.read_csv(archivo_path, encoding=encoding, sep=sep)
                            if len(df.columns) > 1:  # Verificar que se separó correctamente
                                break
                        except:
                            continue
                    else:
                        continue
                    break

            elif extension in ['.xlsx', '.xls']:
                # Leer Excel con soporte para múltiples hojas
                excel_file = pd.ExcelFile(archivo_path)
                if len(excel_file.sheet_names) > 1:
                    # Si hay múltiples hojas, usar la primera o preguntar
                    df = pd.read_excel(archivo_path, sheet_name=0)
                else:
                    df = pd.read_excel(archivo_path)

            elif extension == '.json':
                # Intentar diferentes orientaciones de JSON
                try:
                    df = pd.read_json(archivo_path)
                except:
                    df = pd.read_json(archivo_path, orient='records')

            else:
                return None, f"❌ Formato no soportado: {extension}"

            # Limpiar nombres de columnas
            df.columns = df.columns.str.strip().str.replace(' ', '_')

            # Verificar que el DataFrame no esté vacío
            if df.empty:
                return None, "❌ El archivo está vacío"

            return df, "✅ Archivo leído correctamente"

        except Exception as e:
            return None, f"❌ Error leyendo archivo: {str(e)}"

    def detectar_tipos_inteligente(self, df: pd.DataFrame) -> Dict[str, List[str]]:
        """Detección inteligente de tipos de datos con inferencia mejorada"""
        tipos = {
            'numericas': [],
            'categoricas': [],
            'fechas': [],
            'booleanas': [],
            'ids': [],
            'textos_largos': []
        }

        for col in df.columns:
            col_data = df[col].dropna()

            if len(col_data) == 0:
                continue

            # Detectar IDs (columnas con valores únicos o casi únicos)
            if len(col_data.unique()) / len(col_data) > 0.95:
                if col_data.dtype in ['int64', 'object']:
                    tipos['ids'].append(col)
                    continue

            # Detectar fechas
            if self._es_fecha(col_data):
                try:
                    df[col] = pd.to_datetime(df[col])
                    tipos['fechas'].append(col)
                    continue
                except:
                    pass

            # Detectar booleanas
            if col_data.dtype == 'bool' or set(col_data.unique()) <= {True, False, 0, 1, 'SI', 'NO', 'Yes', 'No'}:
                tipos['booleanas'].append(col)
                continue

            # Detectar numéricas
            if pd.api.types.is_numeric_dtype(col_data):
                tipos['numericas'].append(col)
            # Detectar textos largos vs categóricas
            elif col_data.dtype == 'object':
                avg_length = col_data.astype(str).str.len().mean()
                unique_ratio = len(col_data.unique()) / len(col_data)

                if avg_length > 50 or unique_ratio > 0.8:
                    tipos['textos_largos'].append(col)
                else:
                    tipos['categoricas'].append(col)

        return tipos

    def _es_fecha(self, serie: pd.Series) -> bool:
        """Detecta si una serie contiene fechas"""
        if serie.dtype == 'datetime64[ns]':
            return True

        # Patrones comunes de fecha
        patrones_fecha = [
            r'\d{4}-\d{2}-\d{2}',  # YYYY-MM-DD
            r'\d{2}/\d{2}/\d{4}',  # DD/MM/YYYY o MM/DD/YYYY
            r'\d{2}-\d{2}-\d{4}',  # DD-MM-YYYY
        ]

        if serie.dtype == 'object':
            muestra = serie.head(10).astype(str)
            for patron in patrones_fecha:
                if muestra.str.match(patron).any():
                    return True

        return False

    def generar_preguntas_inteligentes(self, info_tabla: Dict, nombre_tabla: str) -> List[str]:
        """Genera preguntas más inteligentes y contextualizadas"""
        preguntas = []

        # Preguntas básicas siempre útiles
        preguntas.append(f"¿Cuántos registros hay en total?")
        preguntas.append(f"Muéstrame un resumen general de los datos")

        # Preguntas para numéricas
        if info_tabla['numericas']:
            col_principal = info_tabla['numericas'][0]
            preguntas.append(f"¿Cuál es el total y promedio de {col_principal}?")

            if len(info_tabla['numericas']) >= 2:
                col2 = info_tabla['numericas'][1]
                preguntas.append(f"¿Existe correlación entre {col_principal} y {col2}?")

        # Preguntas para categóricas
        if info_tabla['categoricas']:
            cat_principal = info_tabla['categoricas'][0]
            if info_tabla['numericas']:
                num_principal = info_tabla['numericas'][0]
                preguntas.append(f"¿Cómo se distribuye {num_principal} por {cat_principal}?")
                preguntas.append(f"¿Cuáles son los top 5 {cat_principal} con mayor {num_principal}?")
            else:
                preguntas.append(f"¿Cuál es la distribución de {cat_principal}?")

        # Preguntas temporales
        if info_tabla['fechas']:
            fecha_col = info_tabla['fechas'][0]
            if info_tabla['numericas']:
                preguntas.append(f"¿Cómo ha evolucionado {info_tabla['numericas'][0]} en el tiempo?")
            preguntas.append(f"¿Cuál es el rango de fechas en los datos?")

        # Preguntas de análisis avanzado
        if len(info_tabla['numericas']) >= 2:
            preguntas.append("¿Cuáles son las principales correlaciones entre variables numéricas?")

        if info_tabla['categoricas'] and info_tabla['numericas']:
            preguntas.append("¿Qué categorías tienen valores atípicos o extremos?")

        return preguntas[:8]  # Retornar hasta 8 preguntas

    def interpretar_consulta(self, pregunta: str) -> Dict[str, Any]:
        """Interpreta la consulta del usuario para entender mejor qué busca"""
        pregunta_lower = pregunta.lower()
        interpretacion = {
            'tipo_consulta': 'general',
            'agregaciones': [],
            'columnas_mencionadas': [],
            'filtros': [],
            'agrupaciones': [],
            'ordenamiento': None,
            'limite': None,
            'tipo_visualizacion': 'tabla'
        }

        # Detectar tipo de consulta principal
        for tipo, palabras in self.patrones_consultas['agregacion'].items():
            if any(palabra in pregunta_lower for palabra in palabras):
                interpretacion['agregaciones'].append(tipo)
                interpretacion['tipo_consulta'] = 'agregacion'

        # Detectar consultas temporales
        if any(palabra in pregunta_lower for palabra in self.patrones_consultas['temporal']['tendencia']):
            interpretacion['tipo_consulta'] = 'temporal'
            interpretacion['tipo_visualizacion'] = 'linea'

        # Detectar comparaciones
        if any(palabra in pregunta_lower for palabra in self.patrones_consultas['comparacion']['versus']):
            interpretacion['tipo_consulta'] = 'comparacion'
            interpretacion['tipo_visualizacion'] = 'barra_agrupada'

        # Detectar rankings
        if any(palabra in pregunta_lower for palabra in self.patrones_consultas['comparacion']['ranking']):
            interpretacion['tipo_consulta'] = 'ranking'
            # Extraer número del top
            match = re.search(r'top\s*(\d+)', pregunta_lower)
            if match:
                interpretacion['limite'] = int(match.group(1))
            else:
                interpretacion['limite'] = 10

        # Detectar distribuciones
        if any(palabra in pregunta_lower for palabra in self.patrones_consultas['distribucion']['porcentaje']):
            interpretacion['tipo_consulta'] = 'distribucion'
            interpretacion['tipo_visualizacion'] = 'pie'

        # Detectar columnas mencionadas
        for tabla, info in self.tablas.items():
            for col in info['columnas']:
                if col.lower() in pregunta_lower or col.lower().replace('_', ' ') in pregunta_lower:
                    interpretacion['columnas_mencionadas'].append(col)

        return interpretacion

    def generar_sql_inteligente(self, pregunta: str, interpretacion: Dict[str, Any]) -> str:
        """Genera SQL más inteligente basado en la interpretación de la consulta"""
        if not self.tablas:
            return "SELECT 'No hay datos cargados' as mensaje;"

        tabla_principal = list(self.tablas.keys())[0]
        info_tabla = self.tablas[tabla_principal]

        # SQL según tipo de consulta
        if interpretacion['tipo_consulta'] == 'general':
            # Consulta general - mostrar resumen
            return f"SELECT * FROM {tabla_principal} LIMIT 10;"

        elif interpretacion['tipo_consulta'] == 'agregacion':
            return self._generar_sql_agregacion(tabla_principal, info_tabla, interpretacion)

        elif interpretacion['tipo_consulta'] == 'temporal':
            return self._generar_sql_temporal(tabla_principal, info_tabla, interpretacion)

        elif interpretacion['tipo_consulta'] == 'ranking':
            return self._generar_sql_ranking(tabla_principal, info_tabla, interpretacion)

        elif interpretacion['tipo_consulta'] == 'distribucion':
            return self._generar_sql_distribucion(tabla_principal, info_tabla, interpretacion)

        elif interpretacion['tipo_consulta'] == 'comparacion':
            return self._generar_sql_comparacion(tabla_principal, info_tabla, interpretacion)

        else:
            # Fallback a generación con Ollama
            return self.generar_sql_con_llm(pregunta)

    def _generar_sql_agregacion(self, tabla: str, info: Dict, interp: Dict) -> str:
        """Genera SQL para consultas de agregación"""
        cols_num = info['numericas']
        cols_cat = info['categoricas']

        if not cols_num:
            return f"SELECT COUNT(*) as total_registros FROM {tabla};"

        # Determinar columna numérica a usar
        col_num = cols_num[0]
        if interp['columnas_mencionadas']:
            for col in interp['columnas_mencionadas']:
                if col in cols_num:
                    col_num = col
                    break

        # Construir agregaciones
        agregaciones = []
        if 'total' in interp['agregaciones']:
            agregaciones.append(f"SUM({col_num}) as total_{col_num}")
        if 'promedio' in interp['agregaciones']:
            agregaciones.append(f"AVG({col_num}) as promedio_{col_num}")
        if 'maximo' in interp['agregaciones']:
            agregaciones.append(f"MAX({col_num}) as maximo_{col_num}")
        if 'minimo' in interp['agregaciones']:
            agregaciones.append(f"MIN({col_num}) as minimo_{col_num}")
        if 'conteo' in interp['agregaciones']:
            agregaciones.append(f"COUNT(*) as cantidad")

        if not agregaciones:
            agregaciones = [f"SUM({col_num}) as total", f"AVG({col_num}) as promedio", "COUNT(*) as cantidad"]

        # Determinar si hay agrupación
        group_by = ""
        if cols_cat and any(cat in interp['columnas_mencionadas'] for cat in cols_cat):
            col_cat = next((cat for cat in cols_cat if cat in interp['columnas_mencionadas']), cols_cat[0])
            group_by = f" GROUP BY {col_cat}"

        sql = f"SELECT {', '.join(agregaciones)} FROM {tabla}{group_by};"
        return sql

    def _generar_sql_temporal(self, tabla: str, info: Dict, interp: Dict) -> str:
        """Genera SQL para análisis temporal"""
        if not info['fechas'] or not info['numericas']:
            return f"SELECT * FROM {tabla} LIMIT 20;"

        fecha_col = info['fechas'][0]
        num_col = info['numericas'][0]

        # Buscar columna numérica mencionada
        for col in interp['columnas_mencionadas']:
            if col in info['numericas']:
                num_col = col
                break

        sql = f"""
        SELECT
            DATE({fecha_col}) as fecha,
            SUM({num_col}) as total_{num_col},
            AVG({num_col}) as promedio_{num_col},
            COUNT(*) as registros
        FROM {tabla}
        WHERE {fecha_col} IS NOT NULL
        GROUP BY DATE({fecha_col})
        ORDER BY fecha;
        """

        return sql.strip()

    def _generar_sql_ranking(self, tabla: str, info: Dict, interp: Dict) -> str:
        """Genera SQL para rankings"""
        limite = interp['limite'] or 10

        if not info['numericas']:
            # Ranking por conteo
            if info['categoricas']:
                col_cat = info['categoricas'][0]
                return f"""
                SELECT {col_cat}, COUNT(*) as cantidad
                FROM {tabla}
                GROUP BY {col_cat}
                ORDER BY cantidad DESC
                LIMIT {limite};
                """
        else:
            num_col = info['numericas'][0]
            if info['categoricas']:
                cat_col = info['categoricas'][0]
                return f"""
                SELECT {cat_col}, SUM({num_col}) as total_{num_col}
                FROM {tabla}
                GROUP BY {cat_col}
                ORDER BY total_{num_col} DESC
                LIMIT {limite};
                """
            else:
                return f"""
                SELECT *
                FROM {tabla}
                ORDER BY {num_col} DESC
                LIMIT {limite};
                """

    def _generar_sql_distribucion(self, tabla: str, info: Dict, interp: Dict) -> str:
        """Genera SQL para distribuciones y porcentajes"""
        if not info['categoricas']:
            return f"SELECT COUNT(*) as total FROM {tabla};"

        col_cat = info['categoricas'][0]
        # Buscar columna categórica mencionada
        for col in interp['columnas_mencionadas']:
            if col in info['categoricas']:
                col_cat = col
                break

        sql = f"""
        SELECT
            {col_cat},
            COUNT(*) as cantidad,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM {tabla}), 2) as porcentaje
        FROM {tabla}
        GROUP BY {col_cat}
        ORDER BY cantidad DESC;
        """

        return sql.strip()

    def _generar_sql_comparacion(self, tabla: str, info: Dict, interp: Dict) -> str:
        """Genera SQL para comparaciones"""
        if len(info['numericas']) >= 2:
            # Comparar dos variables numéricas
            return f"""
            SELECT {info['numericas'][0]}, {info['numericas'][1]}
            FROM {tabla}
            WHERE {info['numericas'][0]} IS NOT NULL
            AND {info['numericas'][1]} IS NOT NULL;
            """
        elif info['categoricas'] and info['numericas']:
            # Comparar categorías
            return f"""
            SELECT
                {info['categoricas'][0]},
                AVG({info['numericas'][0]}) as promedio,
                MIN({info['numericas'][0]}) as minimo,
                MAX({info['numericas'][0]}) as maximo
            FROM {tabla}
            GROUP BY {info['categoricas'][0]};
            """
        else:
            return f"SELECT * FROM {tabla} LIMIT 20;"

    def generar_sql_con_llm(self, pregunta: str) -> str:
        """Genera SQL usando el modelo de lenguaje cuando las reglas no son suficientes"""
        contexto = self.obtener_contexto_detallado()

        prompt = f"""Eres un experto en SQL para SQLite. Genera SQL preciso basado en la pregunta del usuario.

ESQUEMA DE LA BASE DE DATOS:
{contexto}

PREGUNTA DEL USUARIO: {pregunta}

REGLAS IMPORTANTES:
1. Usa SOLO los nombres exactos de tablas y columnas del esquema
2. Genera SQL válido para SQLite
3. Si la pregunta pide distribución o porcentajes, usa COUNT(*) y GROUP BY
4. Para porcentajes usa: COUNT(*) * 100.0 / (SELECT COUNT(*) FROM tabla)
5. NO incluyas comentarios en el SQL
6. Termina con punto y coma

Responde ÚNICAMENTE con el SQL:"""

        respuesta = self.consultar_ollama(prompt, MODELO_SQL, TEMPERATURA_SQL)

        # Extraer solo el SQL de la respuesta
        sql = self._extraer_sql_de_respuesta(respuesta)
        return sql

    def _extraer_sql_de_respuesta(self, respuesta: str) -> str:
        """Extrae SQL limpio de la respuesta del modelo"""
        # Buscar el primer SELECT, WITH, o INSERT
        lineas = respuesta.strip().split('\n')
        sql_keywords = ['SELECT', 'WITH', 'INSERT', 'UPDATE', 'DELETE']

        sql_encontrado = []
        en_sql = False

        for linea in lineas:
            linea_upper = linea.strip().upper()

            # Iniciar captura si encontramos palabra clave SQL
            if any(linea_upper.startswith(kw) for kw in sql_keywords):
                en_sql = True

            if en_sql:
                # Dejar de capturar si encontramos algo que no es SQL
                if linea.strip() and not any(char in linea for char in [';', ',', '(', ')', 'SELECT', 'FROM', 'WHERE', 'GROUP', 'ORDER']):
                    if not linea.strip().endswith(';'):
                        break

                sql_encontrado.append(linea)

                # Si termina con punto y coma, terminar
                if linea.strip().endswith(';'):
                    break

        sql = '\n'.join(sql_encontrado).strip()

        # Si no se encontró SQL válido, intentar extraer de otra forma
        if not sql:
            # Buscar entre comillas o código
            match = re.search(r'```sql?\s*(.*?)```', respuesta, re.DOTALL)
            if match:
                sql = match.group(1).strip()
            else:
                # Último intento: tomar la primera línea que parezca SQL
                for linea in lineas:
                    if any(kw in linea.upper() for kw in sql_keywords):
                        sql = linea.strip()
                        break

        # Asegurar que termine con punto y coma
        if sql and not sql.endswith(';'):
            sql += ';'

        return sql or "SELECT 'No se pudo generar SQL' as error;"

    def consultar_ollama(self, prompt: str, modelo: str, temperatura: float) -> str:
        """Consulta el modelo Ollama con mejor manejo de errores"""
        # Escapar caracteres especiales en el prompt
        prompt_escapado = json.dumps(prompt)

        comando = f'curl -s http://localhost:11434/api/generate -d \'{{"model": "{modelo}", "prompt": {prompt_escapado}, "stream": false, "temperature": {temperatura}}}\''

        try:
            resultado = subprocess.run(comando, shell=True, capture_output=True, text=True, timeout=30)

            if resultado.returncode == 0 and resultado.stdout:
                data = json.loads(resultado.stdout)
                return data.get('response', 'Sin respuesta del modelo')
            else:
                return f"Error en la consulta: {resultado.stderr}"

        except subprocess.TimeoutExpired:
            return "Timeout: La consulta tardó demasiado tiempo"
        except json.JSONDecodeError:
            return "Error decodificando la respuesta del modelo"
        except Exception as e:
            return f"Error inesperado: {str(e)}"

    def ejecutar_sql(self, sql: str) -> Tuple[pd.DataFrame, str]:
        """Ejecuta SQL con mejor manejo de errores y validación"""
        try:
            # Validación básica del SQL
            sql_upper = sql.upper()
            if any(palabra in sql_upper for palabra in ['DROP', 'DELETE', 'TRUNCATE', 'ALTER']):
                return pd.DataFrame(), "❌ Operación no permitida por seguridad"

            # Ejecutar consulta
            df = pd.read_sql_query(sql, self.db)

            # Verificar resultados
            if df.empty:
                return df, "⚠️ La consulta no devolvió resultados"
            else:
                return df, f"✅ {len(df)} filas obtenidas"

        except sqlite3.OperationalError as e:
            return pd.DataFrame(), f"❌ Error SQL: {str(e)}"
        except Exception as e:
            return pd.DataFrame(), f"❌ Error inesperado: {str(e)}"

    def crear_visualizacion_inteligente(self, datos: pd.DataFrame, interpretacion: Dict[str, Any]) -> Optional[go.Figure]:
        """Crea visualizaciones más inteligentes según el tipo de datos y consulta"""
        if datos.empty:
            return None

        try:
            # Determinar tipo de visualización según la interpretación
            tipo_viz = interpretacion.get('tipo_visualizacion', 'auto')

            if tipo_viz == 'auto':
                # Determinar automáticamente el mejor tipo
                tipo_viz = self._determinar_tipo_visualizacion(datos)

            # Crear visualización según el tipo
            if tipo_viz == 'barra':
                return self._crear_grafico_barras(datos)
            elif tipo_viz == 'linea':
                return self._crear_grafico_lineas(datos)
            elif tipo_viz == 'pie':
                return self._crear_grafico_pie(datos)
            elif tipo_viz == 'scatter':
                return self._crear_grafico_dispersion(datos)
            elif tipo_viz == 'heatmap':
                return self._crear_heatmap(datos)
            elif tipo_viz == 'barra_agrupada':
                return self._crear_barras_agrupadas(datos)
            else:
                # Visualización por defecto
                return self._crear_visualizacion_automatica(datos)

        except Exception as e:
            print(f"Error creando visualización: {e}")
            return None

    def _determinar_tipo_visualizacion(self, datos: pd.DataFrame) -> str:
        """Determina el mejor tipo de visualización para los datos"""
        num_cols = datos.select_dtypes(include=['number']).columns
        cat_cols = datos.select_dtypes(include=['object']).columns
        date_cols = datos.select_dtypes(include=['datetime']).columns

        # Reglas para determinar visualización
        if len(datos) == 1:
            return 'indicador'  # Una sola fila, mostrar como indicador
        elif len(date_cols) > 0 and len(num_cols) > 0:
            return 'linea'  # Serie temporal
        elif len(cat_cols) == 1 and len(num_cols) == 1 and len(datos) <= 20:
            return 'barra'  # Categorías con valores
        elif len(cat_cols) == 1 and 'porcentaje' in datos.columns:
            return 'pie'  # Distribución porcentual
        elif len(num_cols) >= 2 and len(datos) > 10:
            return 'scatter'  # Relación entre variables
        elif len(num_cols) > 3:
            return 'heatmap'  # Múltiples correlaciones
        else:
            return 'barra'  # Por defecto

    def _crear_grafico_barras(self, datos: pd.DataFrame) -> go.Figure:
        """Crea un gráfico de barras mejorado"""
        # Identificar columnas
        num_cols = datos.select_dtypes(include=['number']).columns
        cat_cols = datos.select_dtypes(include=['object']).columns

        if len(cat_cols) > 0 and len(num_cols) > 0:
            x_col = cat_cols[0]
            y_col = num_cols[0]

            # Ordenar por valor si hay muchas categorías
            if len(datos) > 15:
                datos = datos.nlargest(15, y_col)

            fig = go.Figure(data=[
                go.Bar(
                    x=datos[x_col],
                    y=datos[y_col],
                    text=datos[y_col].round(2),
                    textposition='auto',
                    marker_color='rgba(55, 128, 191, 0.7)',
                    marker_line_color='rgba(55, 128, 191, 1.0)',
                    marker_line_width=1.5
                )
            ])

            fig.update_layout(
                title=f'{y_col} por {x_col}',
                xaxis_title=x_col,
                yaxis_title=y_col,
                template='plotly_white',
                showlegend=False,
                height=500
            )

            # Rotar etiquetas si son largas
            if datos[x_col].astype(str).str.len().mean() > 10:
                fig.update_xaxes(tickangle=-45)

            return fig

        return None

    def _crear_grafico_lineas(self, datos: pd.DataFrame) -> go.Figure:
        """Crea un gráfico de líneas para series temporales"""
        date_cols = datos.select_dtypes(include=['datetime']).columns
        num_cols = datos.select_dtypes(include=['number']).columns

        if len(date_cols) > 0 and len(num_cols) > 0:
            x_col = date_cols[0]

            fig = go.Figure()

            # Agregar una línea por cada columna numérica
            colors = px.colors.qualitative.Set3
            for i, y_col in enumerate(num_cols[:5]):  # Máximo 5 líneas
                fig.add_trace(go.Scatter(
                    x=datos[x_col],
                    y=datos[y_col],
                    mode='lines+markers',
                    name=y_col,
                    line=dict(color=colors[i % len(colors)], width=2),
                    marker=dict(size=6)
                ))

            fig.update_layout(
                title='Evolución temporal',
                xaxis_title='Fecha',
                yaxis_title='Valores',
                template='plotly_white',
                height=500,
                hovermode='x unified'
            )

            return fig

        return None

    def _crear_grafico_pie(self, datos: pd.DataFrame) -> go.Figure:
        """Crea un gráfico circular para distribuciones"""
        # Buscar columnas de categoría y valor
        cat_cols = datos.select_dtypes(include=['object']).columns
        num_cols = datos.select_dtypes(include=['number']).columns

        if len(cat_cols) > 0 and len(num_cols) > 0:
            labels_col = cat_cols[0]
            values_col = num_cols[0]

            # Limitar a top 10 categorías si hay muchas
            if len(datos) > 10:
                otros = datos.nsmallest(len(datos) - 10, values_col)[values_col].sum()
                datos = datos.nlargest(10, values_col)
                if otros > 0:
                    datos = pd.concat([
                        datos,
                        pd.DataFrame({labels_col: ['Otros'], values_col: [otros]})
                    ])

            fig = go.Figure(data=[go.Pie(
                labels=datos[labels_col],
                values=datos[values_col],
                hole=0.3,
                textinfo='label+percent',
                textposition='auto'
            )])

            fig.update_layout(
                title=f'Distribución de {values_col} por {labels_col}',
                template='plotly_white',
                height=500
            )

            return fig

        return None

    def _crear_grafico_dispersion(self, datos: pd.DataFrame) -> go.Figure:
        """Crea un gráfico de dispersión para relaciones entre variables"""
        num_cols = list(datos.select_dtypes(include=['number']).columns)
        cat_cols = list(datos.select_dtypes(include=['object']).columns)

        if len(num_cols) >= 2:
            x_col = num_cols[0]
            y_col = num_cols[1]

            # Color por categoría si existe
            color_col = cat_cols[0] if cat_cols else None
            size_col = num_cols[2] if len(num_cols) > 2 else None

            fig = px.scatter(
                datos,
                x=x_col,
                y=y_col,
                color=color_col,
                size=size_col,
                title=f'Relación entre {x_col} y {y_col}',
                template='plotly_white',
                height=500
            )

            # Agregar línea de tendencia si no hay categorías
            if not color_col and len(datos) > 10:
                fig.add_trace(go.Scatter(
                    x=datos[x_col],
                    y=datos[y_col],
                    mode='lines',
                    name='Tendencia',
                    line=dict(dash='dash', color='red'),
                    showlegend=False
                ))

            return fig

        return None

    def _crear_heatmap(self, datos: pd.DataFrame) -> go.Figure:
        """Crea un mapa de calor para correlaciones"""
        num_cols = datos.select_dtypes(include=['number']).columns

        if len(num_cols) >= 3:
            # Calcular matriz de correlación
            corr_matrix = datos[num_cols].corr()

            fig = go.Figure(data=go.Heatmap(
                z=corr_matrix.values,
                x=corr_matrix.columns,
                y=corr_matrix.columns,
                colorscale='RdBu',
                zmid=0,
                text=corr_matrix.values.round(2),
                texttemplate='%{text}',
                textfont={"size": 10}
            ))

            fig.update_layout(
                title='Matriz de Correlación',
                template='plotly_white',
                height=600,
                width=800
            )

            return fig

        return None

    def _crear_barras_agrupadas(self, datos: pd.DataFrame) -> go.Figure:
        """Crea un gráfico de barras agrupadas para comparaciones"""
        cat_cols = list(datos.select_dtypes(include=['object']).columns)
        num_cols = list(datos.select_dtypes(include=['number']).columns)

        if len(cat_cols) >= 1 and len(num_cols) >= 2:
            x_col = cat_cols[0]

            fig = go.Figure()

            colors = px.colors.qualitative.Set3
            for i, y_col in enumerate(num_cols[:4]):  # Máximo 4 series
                fig.add_trace(go.Bar(
                    name=y_col,
                    x=datos[x_col],
                    y=datos[y_col],
                    marker_color=colors[i % len(colors)]
                ))

            fig.update_layout(
                title=f'Comparación de métricas por {x_col}',
                xaxis_title=x_col,
                yaxis_title='Valores',
                barmode='group',
                template='plotly_white',
                height=500
            )

            return fig

        return None

    def _crear_visualizacion_automatica(self, datos: pd.DataFrame) -> go.Figure:
        """Crea una visualización automática como fallback"""
        # Intentar diferentes tipos de visualización en orden
        visualizaciones = [
            self._crear_grafico_barras,
            self._crear_grafico_lineas,
            self._crear_grafico_pie,
            self._crear_grafico_dispersion,
            self._crear_heatmap
        ]

        for crear_viz in visualizaciones:
            viz = crear_viz(datos)
            if viz is not None:
                return viz

        # Si nada funciona, crear una tabla
        return self._crear_tabla_visual(datos)

    def _crear_tabla_visual(self, datos: pd.DataFrame) -> go.Figure:
        """Crea una tabla visual cuando otros gráficos no son apropiados"""
        # Limitar filas para visualización
        datos_viz = datos.head(20)

        fig = go.Figure(data=[go.Table(
            header=dict(
                values=list(datos_viz.columns),
                fill_color='paleturquoise',
                align='left'
            ),
            cells=dict(
                values=[datos_viz[col] for col in datos_viz.columns],
                fill_color='lavender',
                align='left'
            )
        )])

        fig.update_layout(
            title='Tabla de Resultados',
            height=500
        )

        return fig

    def analizar_datos_inteligente(self, pregunta: str, datos: pd.DataFrame, interpretacion: Dict[str, Any]) -> str:
        """Análisis más inteligente y contextualizado de los datos"""
        if datos.empty:
            return "No hay datos para analizar. Verifica tu consulta."

        # Crear contexto rico para el análisis
        contexto_analisis = {
            'pregunta': pregunta,
            'tipo_consulta': interpretacion['tipo_consulta'],
            'num_filas': len(datos),
            'columnas': list(datos.columns),
            'tipos_datos': datos.dtypes.to_dict()
        }

        # Agregar estadísticas relevantes
        estadisticas = {}
        for col in datos.select_dtypes(include=['number']).columns:
            estadisticas[col] = {
                'media': datos[col].mean(),
                'mediana': datos[col].median(),
                'min': datos[col].min(),
                'max': datos[col].max(),
                'std': datos[col].std()
            }

        contexto_analisis['estadisticas'] = estadisticas

        # Preparar datos para el modelo
        datos_str = datos.head(50).to_string()  # Limitar para no sobrecargar

        prompt = f"""Analiza estos datos y proporciona insights valiosos.

CONTEXTO:
- Pregunta del usuario: {pregunta}
- Tipo de consulta: {interpretacion['tipo_consulta']}
- Número de resultados: {len(datos)}

DATOS:
{datos_str}

ESTADÍSTICAS:
{json.dumps(estadisticas, indent=2, default=str)}

INSTRUCCIONES:
1. Responde directamente la pregunta del usuario
2. Proporciona 2-3 insights clave basados en los datos
3. Si hay patrones o anomalías, menciónalos
4. Sugiere análisis adicionales si son relevantes
5. Sé conciso pero informativo

FORMATO DE RESPUESTA:
📊 **Respuesta directa:**
[Respuesta a la pregunta]

🔍 **Insights clave:**
• [Insight 1]
• [Insight 2]
• [Insight 3 si aplica]

💡 **Recomendaciones:**
[Sugerencias de análisis adicionales o acciones]
"""

        return self.consultar_ollama(prompt, MODELO_ANALISIS, TEMPERATURA_ANALISIS)

    def cargar_datos(self, archivo) -> Tuple[str, List[str], str]:
        """Carga datos con análisis mejorado"""
        if not archivo:
            return "❌ No se seleccionó archivo", [], ""

        # Leer archivo
        df, mensaje = self.leer_archivo(archivo.name)
        if df is None:
            return mensaje, [], ""

        # Procesar nombre de tabla
        nombre_tabla = os.path.splitext(os.path.basename(archivo.name))[0]
        nombre_tabla = re.sub(r'[^a-zA-Z0-9_]', '_', nombre_tabla)

        try:
            # Guardar en base de datos
            df.to_sql(nombre_tabla, self.db, if_exists='replace', index=False)

            # Detectar tipos de datos inteligentemente
            tipos = self.detectar_tipos_inteligente(df)

            # Crear información de tabla
            info = {
                'filas': len(df),
                'columnas': list(df.columns),
                'tipos': df.dtypes.to_dict(),
                **tipos,
                'muestra': df.head(5).to_dict()
            }

            self.tablas[nombre_tabla] = info

            # Generar preguntas sugeridas
            self.preguntas_sugeridas = self.generar_preguntas_inteligentes(info, nombre_tabla)
            self.datos_cargados = True

            # Crear resumen mejorado
            resumen = self._crear_resumen_datos(nombre_tabla, df, info)

            return f"✅ Datos cargados: {nombre_tabla} ({info['filas']:,} filas)", self.preguntas_sugeridas, resumen

        except Exception as e:
            return f"❌ Error: {str(e)}", [], ""

    def _crear_resumen_datos(self, nombre_tabla: str, df: pd.DataFrame, info: Dict) -> str:
        """Crea un resumen visual y informativo de los datos"""
        resumen = f"""# 📊 Resumen de Datos: {nombre_tabla}

## 📈 Información General
- **Total de registros:** {info['filas']:,}
- **Total de columnas:** {len(info['columnas'])}
- **Tamaño en memoria:** {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB

## 📋 Tipos de Columnas
"""

        if info['numericas']:
            resumen += f"### 🔢 Numéricas ({len(info['numericas'])})\n"
            for col in info['numericas'][:5]:
                resumen += f"- **{col}**: {df[col].dtype}\n"
            if len(info['numericas']) > 5:
                resumen += f"- *... y {len(info['numericas']) - 5} más*\n"

        if info['categoricas']:
            resumen += f"\n### 📝 Categóricas ({len(info['categoricas'])})\n"
            for col in info['categoricas'][:5]:
                n_unique = df[col].nunique()
                resumen += f"- **{col}**: {n_unique} valores únicos\n"
            if len(info['categoricas']) > 5:
                resumen += f"- *... y {len(info['categoricas']) - 5} más*\n"

        if info['fechas']:
            resumen += f"\n### 📅 Fechas ({len(info['fechas'])})\n"
            for col in info['fechas']:
                min_fecha = df[col].min()
                max_fecha = df[col].max()
                resumen += f"- **{col}**: desde {min_fecha} hasta {max_fecha}\n"

        # Vista previa mejorada
        resumen += f"\n## 👀 Vista Previa\n```\n{df.head(5).to_string()}\n```"

        # Estadísticas rápidas
        if info['numericas']:
            resumen += f"\n## 📊 Estadísticas Rápidas\n"
            stats_df = df[info['numericas']].describe().round(2)
            resumen += f"```\n{stats_df.to_string()}\n```"

        return resumen

    def obtener_contexto_detallado(self) -> str:
        """Genera contexto detallado para el LLM"""
        if not self.tablas:
            return "No hay tablas cargadas en la base de datos."

        contexto = "ESQUEMA DE BASE DE DATOS:\n\n"

        for tabla, info in self.tablas.items():
            contexto += f"TABLA: {tabla}\n"
            contexto += f"Total filas: {info['filas']}\n\n"
            contexto += "COLUMNAS:\n"

            # Organizar columnas por tipo
            for col in info['columnas']:
                tipo = str(info['tipos'].get(col, 'unknown'))
                categoria = "otros"

                if col in info['numericas']:
                    categoria = "numérica"
                elif col in info['categoricas']:
                    categoria = "categórica"
                elif col in info['fechas']:
                    categoria = "fecha"
                elif col in info['booleanas']:
                    categoria = "booleana"
                elif col in info['ids']:
                    categoria = "identificador"

                contexto += f"  - {col} ({categoria}, {tipo})\n"

            contexto += "\n"

        return contexto

    def procesar_consulta(self, pregunta: str) -> Tuple[str, str, Optional[go.Figure]]:
        """Procesa una consulta con interpretación inteligente"""
        if not self.datos_cargados:
            return "⚠️ Por favor carga datos primero", "", None

        # Verificar caché
        if pregunta in self.cache_respuestas:
            cached = self.cache_respuestas[pregunta]
            return cached['respuesta'], cached['datos_html'], cached['grafico']

        try:
            # Interpretar la consulta
            interpretacion = self.interpretar_consulta(pregunta)

            # Generar SQL inteligente
            sql = self.generar_sql_inteligente(pregunta, interpretacion)

            # Ejecutar SQL
            datos, estado = self.ejecutar_sql(sql)

            # Si falla, intentar con LLM
            if datos.empty and "Error" in estado:
                sql = self.generar_sql_con_llm(pregunta)
                datos, estado = self.ejecutar_sql(sql)

            # Analizar resultados
            analisis = self.analizar_datos_inteligente(pregunta, datos, interpretacion)

            # Crear visualización
            grafico = self.crear_visualizacion_inteligente(datos, interpretacion)

            # Formatear respuesta
            respuesta = f"""### 🤖 Consulta SQL Generada:
```sql
{sql}
```

### 📊 Estado: {estado}

### 🔍 Análisis:
{analisis}
"""

            # Crear HTML de datos
            if not datos.empty:
                datos_html = f"""
<div style="max-height: 400px; overflow-y: auto;">
{datos.to_html(index=False, classes='dataframe', max_rows=100)}
</div>
"""
            else:
                datos_html = "<p>No se obtuvieron datos</p>"

            # Guardar en caché
            self.cache_respuestas[pregunta] = {
                'respuesta': respuesta,
                'datos_html': datos_html,
                'grafico': grafico
            }

            # Agregar al historial
            self.historial_consultas.append({
                'pregunta': pregunta,
                'sql': sql,
                'resultado_filas': len(datos),
                'timestamp': datetime.now()
            })

            return respuesta, datos_html, grafico

        except Exception as e:
            error_msg = f"❌ Error procesando consulta: {str(e)}"
            return error_msg, "", None


# 4. Inicializar sistema
print("🚀 Instalando componentes necesarios...")
instalar_ollama()
genbi = GenBIMejorado()

# 5. Funciones para Gradio mejoradas
def chat_respuesta(pregunta: str, historial: List) -> Tuple[List, str, Any, str]:
    """Maneja las respuestas del chat"""
    if not pregunta.strip():
        return historial, "", None, ""

    respuesta, datos_html, grafico = genbi.procesar_consulta(pregunta)
    historial.append([pregunta, respuesta])

    return historial, datos_html, grafico, ""

def cargar_archivo(archivo) -> Tuple[str, str, gr.Dropdown, gr.Group]:
    """Carga el archivo y actualiza la interfaz"""
    if archivo is None:
        return "⚠️ Selecciona un archivo", "", gr.update(), gr.update(visible=False)

    resultado, preguntas, resumen = genbi.cargar_datos(archivo)

    return (
        resultado,
        resumen,
        gr.update(choices=preguntas, value=None if not preguntas else preguntas[0]),
        gr.update(visible=genbi.datos_cargados)
    )

def usar_pregunta_sugerida(pregunta_seleccionada: str) -> str:
    """Usa una pregunta sugerida"""
    return pregunta_seleccionada if pregunta_seleccionada else ""

def exportar_resultados(datos_html: str) -> str:
    """Exporta los resultados a CSV"""
    if not datos_html or "No se obtuvieron datos" in datos_html:
        return None

    # Extraer tabla del HTML
    try:
        from io import StringIO
        df = pd.read_html(StringIO(datos_html))[0]
        csv = df.to_csv(index=False)
        return csv
    except:
        return None

# 6. Interfaz Gradio mejorada
with gr.Blocks(
    title="GenBI Inteligente",
    theme=gr.themes.Soft(),
    css="""
    .gradio-container {
        font-family: 'Inter', sans-serif;
    }
    .dataframe {
        font-size: 12px;
        width: 100%;
    }
    .gr-button-primary {
        background-color: #2563eb !important;
    }
    .gr-button-primary:hover {
        background-color: #1d4ed8 !important;
    }
    """
) as app:
    gr.Markdown(
        """
        # 🧠 GenBI Inteligente - Análisis de Datos con IA
        ### Carga tus datos y hazle preguntas en lenguaje natural
        """
    )

    with gr.Tab("📁 1. Cargar Datos", elem_id="datos-tab"):
        gr.Markdown(
            """
            ### Paso 1: Selecciona tu archivo de datos
            Formatos soportados: **CSV**, **Excel** (.xlsx, .xls), **JSON**
            """
        )

        with gr.Row():
            with gr.Column(scale=1):
                archivo_input = gr.File(
                    label="Arrastra tu archivo aquí o haz clic para seleccionar",
                    file_types=[".csv", ".xlsx", ".xls", ".json"],
                    elem_id="file-upload"
                )
                cargar_btn = gr.Button(
                    "🚀 Cargar y Analizar",
                    variant="primary",
                    size="lg",
                    elem_id="load-btn"
                )
                resultado_carga = gr.Textbox(
                    label="Estado",
                    interactive=False,
                    elem_id="status"
                )

            with gr.Column(scale=2):
                resumen_datos = gr.Markdown(
                    "### 📊 Resumen de Datos\n*Carga un archivo para ver el análisis*",
                    elem_id="data-summary"
                )

    with gr.Tab("💬 2. Hacer Preguntas", elem_id="chat-tab"):
        chat_disponible = gr.Markdown(
            "⚠️ **Primero carga datos en la pestaña anterior**",
            visible=True,
            elem_id="chat-warning"
        )

        with gr.Group(visible=False, elem_id="chat-interface") as chat_group:
            gr.Markdown("### 💡 Preguntas Sugeridas")

            with gr.Row():
                preguntas_dropdown = gr.Dropdown(
                    choices=[],
                    label="Selecciona una pregunta o escribe la tuya",
                    interactive=True,
                    elem_id="suggested-questions"
                )
                usar_sugerida_btn = gr.Button(
                    "📝 Usar",
                    size="sm",
                    elem_id="use-suggested"
                )

            chatbot = gr.Chatbot(
                label="Conversación",
                height=400,
                elem_id="chatbot"
            )

            with gr.Row():
                pregunta = gr.Textbox(
                    label="Tu pregunta",
                    placeholder="Ej: ¿Cuál es el total de ventas por categoría?",
                    scale=4,
                    elem_id="question-input"
                )
                enviar = gr.Button(
                    "📤 Enviar",
                    scale=1,
                    variant="primary",
                    elem_id="send-btn"
                )

            with gr.Accordion("📊 Resultados Detallados", open=True):
                datos_display = gr.HTML(
                    label="Datos",
                    elem_id="data-display"
                )
                grafico_display = gr.Plot(
                    label="Visualización",
                    elem_id="chart-display"
                )

    with gr.Tab("📚 Ayuda", elem_id="help-tab"):
        gr.Markdown(
            """
            ## 🎯 Cómo usar GenBI Inteligente

            ### 1. **Carga tus datos**
            - Soporta archivos CSV, Excel y JSON
            - Los datos se analizan automáticamente
            - Se detectan tipos de columnas inteligentemente

            ### 2. **Haz preguntas en lenguaje natural**
            - "¿Cuál es el total de ventas?"
            - "Muéstrame la evolución mensual"
            - "¿Cuáles son los top 10 productos?"
            - "Compara las ventas por región"

            ### 3. **Tipos de análisis soportados**
            - 📊 **Agregaciones**: suma, promedio, máximo, mínimo
            - 📈 **Tendencias**: evolución temporal
            - 🥇 **Rankings**: top N elementos
            - 📊 **Distribuciones**: porcentajes y proporciones
            - 🔍 **Comparaciones**: entre categorías

            ### 💡 Tips
            - Usa las preguntas sugeridas como punto de partida
            - Sé específico con los nombres de columnas
            - Los gráficos se generan automáticamente
            """
        )

    # Eventos mejorados
    cargar_btn.click(
        cargar_archivo,
        inputs=[archivo_input],
        outputs=[resultado_carga, resumen_datos, preguntas_dropdown, chat_group]
    )

    usar_sugerida_btn.click(
        usar_pregunta_sugerida,
        inputs=[preguntas_dropdown],
        outputs=[pregunta]
    )

    # Eventos de chat
    enviar.click(
        chat_respuesta,
        inputs=[pregunta, chatbot],
        outputs=[chatbot, datos_display, grafico_display, pregunta]
    )

    pregunta.submit(
        chat_respuesta,
        inputs=[pregunta, chatbot],
        outputs=[chatbot, datos_display, grafico_display, pregunta]
    )

# 7. Lanzar aplicación
if __name__ == "__main__":
    print("✨ GenBI Inteligente está listo!")
    print("🌐 Abriendo interfaz web...")
    app.launch(
        share=True,
        server_port=7860,
        show_error=True
    )

🚀 Instalando componentes necesarios...
📦 Instalando Ollama...
📥 Descargando modelo llama3.1:8b...
✅ Instalación completada


  chatbot = gr.Chatbot(


✨ GenBI Inteligente está listo!
🌐 Abriendo interfaz web...
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://8d4b69cc238804de9d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
