# Chatbot con **tus archivos** + Gemini + Telegram

By: **Ing. Engler Gonzalez**


**Sube todo a la carpeta `/content`**. El cuaderno ya la usa por defecto.
Flujo:
1) Instalar →
2) Subir archivos →
3) Crear KB →
4) Entrenar →
5) Gemini →
6) Telegram.


In [None]:
# ⬇️ 1) Instalación
%%bash
pip -q install pdfplumber pillow pytesseract speechrecognition pydub python-telegram-bot google-generativeai duckduckgo_search scikit-learn > /dev/null
if command -v apt >/dev/null 2>&1; then
  apt -qq update >/dev/null && apt -qq install -y tesseract-ocr >/dev/null || true
fi
echo 'Instalación lista ✅'

## 🔑 Claves / Tokens
- **Gemini (opcional)**: pega tu `GOOGLE_API_KEY`. Si está vacío, usarás TF‑IDF.
- **Telegram**: crea un bot con **@BotFather** y pega el `TELEGRAM_TOKEN`.

In [None]:
import os
from google.colab import userdata

GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
TELEGRAM_TOKEN = userdata.get('TELEGRAM_TOKEN')

if GOOGLE_API_KEY:
   os.environ['GOOGLE_API_KEY'] = GOOGLE_API_KEY.strip()
print('Gemini KEY cargada?', bool(os.getenv('GOOGLE_API_KEY')))

if TELEGRAM_TOKEN:
   os.environ['TELEGRAM_TOKEN'] = TELEGRAM_TOKEN.strip()
print('Telegram TOKEN cargado?', bool(os.getenv('TELEGRAM_TOKEN')))


In [None]:

import os, pathlib
if os.path.isdir('/content'):
    os.chdir('/content')
print('📂 Carpeta de trabajo:', pathlib.Path('.').resolve())
print('📄 Archivos actuales:', os.listdir())

In [None]:
import re
from typing import List, Dict
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Detectar si estamos en Colab para subir archivos
IN_COLAB = False
try:
    import google.colab  # type: ignore
    from google.colab import files  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False

# Gemini
USE_GEMINI = False
GEM_MODEL = None
try:
    import google.generativeai as genai
    if os.getenv('GOOGLE_API_KEY'):
        genai.configure(api_key=os.getenv('GOOGLE_API_KEY'))
        GEM_MODEL = genai.GenerativeModel('gemini-1.5-flash')
        USE_GEMINI = True
        print('✅ Gemini listo.')
    else:
        print('ℹ️ Sin GOOGLE_API_KEY')
except Exception as e:
    print('ℹ️ Gemini no disponible', e)

# Estado global
KB_CHUNKS: List[Dict] = []
_VECTORIZER = None
_MATRIX = None

## 📥 3) Subir y leer archivos (usa **/content**)
### A) Subir desde el panel *Files*
En la izquierda, entra a **content** → icono de **subir** (flecha ↑) → elige tus archivos. Quedan en `/content`.

### B) O usar la función (abre diálogo): `subir_archivos()`


In [15]:
def subir_archivos() -> List[str]:
    """Diálogo para subir archivos a /content en Colab."""
    if not IN_COLAB:
        print('No estás en Colab. Copia tus archivos a /content manualmente.')
        return []
    up = files.upload()
    guardados = []
    for nombre, datos in up.items():
        with open(nombre, 'wb') as f:
            f.write(datos)
        guardados.append(nombre)
    print('✅ Subidos:', guardados)
    return guardados

def listar_utiles():
    import glob
    print('📄 Aquí hay:')
    print(glob.glob('*.pdf')+glob.glob('*.csv')+glob.glob('*.txt')+glob.glob('*.png')+glob.glob('*.jpg')+glob.glob('*.jpeg')+glob.glob('*.wav')+glob.glob('*.mp3'))

def extraer_texto_archivo(path: str) -> str:
    import os
    texto = ''
    ext = os.path.splitext(path)[1].lower()
    try:
        if ext == '.txt':
            with open(path, 'r', encoding='utf-8', errors='ignore') as f:
                texto = f.read()
        elif ext == '.csv':
            tmp = pd.read_csv(path, encoding='utf-8', errors='ignore')
            texto = '\n'.join(tmp.astype(str).agg(' '.join, axis=1).tolist())
        elif ext == '.pdf':
            import pdfplumber
            with pdfplumber.open(path) as pdf:
                for page in pdf.pages:
                    texto += (page.extract_text() or '') + '\n'
        elif ext in ('.png', '.jpg', '.jpeg'):
            from PIL import Image
            import pytesseract
            texto = pytesseract.image_to_string(Image.open(path), lang='spa')
        elif ext == '.wav':
            import speech_recognition as sr
            r = sr.Recognizer()
            with sr.AudioFile(path) as source:
                audio = r.record(source)
            texto = r.recognize_google(audio, language='es-ES')
        elif ext == '.mp3':
            from pydub import AudioSegment
            import speech_recognition as sr
            wav = path + '.wav'
            AudioSegment.from_mp3(path).export(wav, format='wav')
            r = sr.Recognizer()
            with sr.AudioFile(wav) as source:
                audio = r.record(source)
            texto = r.recognize_google(audio, language='es-ES')
        else:
            print('Formato no soportado:', ext); return ''
    except Exception as e:
        print('Error leyendo', path, ':', e); return ''
    print('✅ Extraído', len(texto), 'caracteres de', path)
    return texto

def trocear_texto(texto: str, max_len: int = 900) -> List[str]:
    texto = re.sub(r'\s+', ' ', texto).strip()
    return [texto[i:i+max_len] for i in range(0, len(texto), max_len)]

def agregar_a_kb(path: str, texto: str, max_len: int = 900):
    global KB_CHUNKS
    for ch in trocear_texto(texto, max_len=max_len):
        if ch:
            KB_CHUNKS.append({'source': path, 'text': ch})
    print('KB ahora tiene', len(KB_CHUNKS), 'fragmentos.')

## 🧠 4) Entrenar índice y responder


In [29]:
def entrenar_indice():
    global _VECTORIZER, _MATRIX
    if not KB_CHUNKS:
        print('Primero agrega contenido a la KB.'); return
    corpus = [c['text'] for c in KB_CHUNKS]
    _VECTORIZER = TfidfVectorizer(lowercase=True, ngram_range=(1,2))
    _MATRIX = _VECTORIZER.fit_transform(corpus)
    print('✅ Índice TF‑IDF entrenado con', len(corpus), 'fragmentos.')

def buscar_contexto(query: str, top_k: int = 4):
    if _VECTORIZER is None or _MATRIX is None:
        entrenar_indice()
    if _VECTORIZER is None or _MATRIX is None:
        return []
    vec = _VECTORIZER.transform([query])
    sims = cosine_similarity(vec, _MATRIX).ravel()
    idx = sims.argsort()[::-1][:top_k]
    return [KB_CHUNKS[i] | {'score': float(sims[i])} for i in idx]

def responder(query: str) -> str:
    ctx = buscar_contexto(query)
    if not ctx:
        return 'No tengo información aún. Sube archivos y entrena el índice.'
    if USE_GEMINI and GEM_MODEL:
        context_text = '\n\n'.join([c['text'] for c in ctx])
        prompt = (
            'Responde en español, breve y claro, usando SOLO el contexto.\n'
            "Si no está en el contexto, di: 'No tengo esa información aún'.\n\n"
            + 'Contexto:\n' + context_text + '\n\n'
            + 'Pregunta: ' + query + '\nRespuesta:'
        )
        try:
            resp = GEM_MODEL.generate_content(prompt)
            txt = getattr(resp, 'text', '')
            return txt.strip() or 'No tengo esa información aún.'
        except Exception as e:
            print('Gemini falló; uso TF‑IDF:', e)
    best = max(ctx, key=lambda x: x['score'])
    return best['text'][:500]

## ⚙️ 5) Demo rápida
1)  Ejecuta `subir_archivos()`
2) Procesa y agrega a la KB.
3) Entrena.  
4) Pregunta.

In [None]:
# Subir (abre diálogo en Colab) y listar
archivos = subir_archivos() if IN_COLAB else []
listar_utiles()

In [None]:
# Procesar lo subido y cargar a KB
for a in archivos:
    t = extraer_texto_archivo(a)
    if t:
        agregar_a_kb(a, t)
print('Fragmentos en KB:', len(KB_CHUNKS))

In [30]:
entrenar_indice()
print(responder('¿Cuál es la actividad?'))

✅ Índice TF‑IDF entrenado con 18 fragmentos.
Las actividades son: Introducción a la representación de imágenes como matrices de píxeles; Introducción a scikit-image; y Presentación de OpenCV y sus características principales.


## 💬 6) Telegram
Responde a **cualquier texto** que reciba usando `responder(query)`.
Antes de lanzar: asegura **KB > 0** y ejecutaste `entrenar_indice()`.

In [None]:
# ==== BLOQUE ÚNICO: Mantiene tu estructura, responde saludos y usa Gemini SOLO con tu KB ====
# Requisitos opcionales: pip install google-generativeai
# Variables esperadas (ya las tienes en tu cuaderno): TELEGRAM_TOKEN, KB_CHUNKS, _VECTORIZER, responder(q)
# Si faltan, aquí ponemos valores seguros para que no truene.

import os, re, time, asyncio, nest_asyncio
from typing import List, Tuple, Optional
from telegram.ext import Application, MessageHandler, filters
from telegram import constants

# -------- Defaults seguros si algo no existe ----------
TELEGRAM_TOKEN = globals().get("TELEGRAM_TOKEN", os.getenv("TELEGRAM_TOKEN", ""))
if "KB_CHUNKS" not in globals(): KB_CHUNKS = []
if "_VECTORIZER" not in globals(): _VECTORIZER = None

if "responder" not in globals():
    def responder(q: str) -> str:
        # Respaldo mínimo si no tienes definida tu función
        return "Aún no tengo mi índice entrenado. Carga tus archivos y ejecuta entrenar_indice()."

def _trained_ok() -> bool:
    # Considera entrenado si hay al menos algo en la KB, aunque no haya vectorizador
    return bool(KB_CHUNKS)

# -------- Branding automático (saludo contextual) ----------
def _infer_branding_from_kb() -> str:
    # Busca pistas en la KB para construir el saludo
    text = " ".join([(t if isinstance(ch,(list,tuple)) and len(ch)==2 else "") for ch in KB_CHUNKS for t in (ch if isinstance(ch,(list,tuple)) else [str(ch)])])
    t = text.lower()
    base = "asistente virtual"
    if "procesamiento de datos" in t and ("ia" in t or "inteligencia artificial" in t):
        return f"{base} de procesamiento de datos con IA de bootcamps"
    if "talento tech" in t:
        return f"{base} de Talento Tech"
    if "botcamp" in t or "bootcamp" in t:
        return f"{base} del bootcamp"
    return f"{base}"

_BRANDING = _infer_branding_from_kb()

# -------- Gemini (opcional y solo con tu KB; NO web) ----------
HAVE_GEMINI = False
GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-1.5-flash")
try:
    import google.generativeai as genai
    _GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "").strip()
    if _GEMINI_API_KEY:
        genai.configure(api_key=_GEMINI_API_KEY)
        HAVE_GEMINI = True
except Exception:
    HAVE_GEMINI = False

def _is_low_conf(txt: Optional[str]) -> bool:
    if not txt: return True
    t = re.sub(r"\s+"," ",txt.strip().lower())
    band = [
        "kb vacía","kb vacia","índice sin entrenar","indice sin entrenar",
        "no estoy seguro","no tengo información","no encontr","error","excepción","exception","traceback"
    ]
    return (len(t) < 25) or any(b in t for b in band)

def _top_terms(s: str, k=12) -> List[str]:
    return re.findall(r"[a-zA-ZáéíóúñÁÉÍÓÚÑ0-9_-]{3,}", s or "")[:k]

def _retrieve_from_kb(q: str, k=3) -> List[Tuple[str,str,float]]:
    # Ranqueo ligero por superposición de términos (no depende de _VECTORIZER)
    qset = set(w.lower() for w in _top_terms(q, 20))
    scored=[]
    for ch in KB_CHUNKS:
        if isinstance(ch,(list,tuple)) and len(ch)==2:
            title, body = ch
        else:
            title, body = "", str(ch)
        tset = set(w.lower() for w in _top_terms(f"{title} {body}", 80))
        j = len(qset & tset) / max(1, len(qset | tset))
        if j>0:
            scored.append((title, body, j))
    scored.sort(key=lambda x: x[2], reverse=True)
    return scored[:k]

def _gemini_with_kb(q: str) -> Optional[str]:
    if not HAVE_GEMINI: return None
    hits = _retrieve_from_kb(q, k=4)
    if not hits and not _trained_ok():
        return None
    ctx = "\n\n".join([f"Fragmento {i+1}:\n{b}" for i,(_,b,_) in enumerate(hits)]) if hits else ""
    sys = ("Eres un asistente en español, breve y claro. Responde SOLO con base en el contexto proporcionado "
           "(fragmentos de KB del usuario). Si falta información, dilo con precisión.")
    try:
        model = genai.GenerativeModel(GEMINI_MODEL, system_instruction=sys)
        prompt = f"Pregunta:\n{q}\n\nContexto del usuario:\n{ctx}\n\nResponde en español de forma directa."
        r = model.generate_content(prompt)
        return (r.text or "").strip()
    except Exception:
        return None

# -------- Pipeline: usa tu responder(q) y, si queda corto, Gemini con tu KB ----------
def responder_contextual(q: str) -> str:
    # Saludo contextual (no mudo)
    if re.fullmatch(r"\s*(hola|buenas|hey|hi)\s*[!.]?\s*", q, flags=re.I):
        return f"Hola, soy tu {_BRANDING}. ¿En qué te ayudo?"

    # 1) Tu motor original
    try:
        base = responder(q)
    except Exception as e:
        base = f"(Tu buscador interno falló: {e})"

    if not _is_low_conf(base):
        return base

    # 2) Gemini con SOLO tu KB
    g = _gemini_with_kb(q)
    if g and not _is_low_conf(g):
        return g

    # 3) Último recurso: mensaje útil sin web
    if not _trained_ok():
        return ("Aún no tengo tu KB entrenada. Carga tus archivos y ejecuta 'entrenar_indice()'. "
                "Mientras tanto, puedo darte una orientación general si me das más detalle.")
    return base or "Necesito un poco más de detalle o material en tu KB para responder mejor."

# -------- Handlers (manteniendo tu estructura) ----------
async def on_message(update, context):
    q = (update.message.text or '').strip()
    if not q:
        await update.message.reply_text('Envíame un texto y responderé con lo que haya en tus archivos.')
        return
    # Ejecuta el pipeline en un hilo para no bloquear el loop
    ans = await asyncio.to_thread(responder_contextual, q)
    await update.message.reply_text(ans)

async def main_telegram():
    if not TELEGRAM_TOKEN:
        print('⚠️ Falta TELEGRAM_TOKEN. Pégalo en la celda de Claves.')
        return
    if not _trained_ok():
        print("ℹ️ KB vacía o índice sin entrenar. Puedes cargar archivos y ejecutar 'entrenar_indice()' cuando gustes.")

    app = Application.builder().token(TELEGRAM_TOKEN).build()
    app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, on_message))
    print('🤖 Bot escuchando… (envíale un mensaje en Telegram)')
    await app.run_polling()

# --- Arranque (igual que tu código) ---
nest_asyncio.apply()
# 👉 Descomenta para lanzar el bot (si tu entorno lo requiere ya está listo):
await main_telegram()


🤖 Bot escuchando… (envíale un mensaje en Telegram)


## 🆘 Ayuda rápida
- **¿Dónde subo?** A **/content** (panel Files → *content* → subir). Este cuaderno ya usa esa carpeta.
- **No encuentra archivo**: ejecuta la celda de ruta (muestra `/content`), vuelve a subir y usa el **nombre exacto**.
- **Gemini**: pega `GOOGLE_API_KEY` sin espacios extras y re‑ejecuta la celda de claves.
- **OCR/audio**: repite instalación si falla; usa imágenes claras o audio `.wav`.
- **Telegram**: pega `TELEGRAM_TOKEN`, asegúrate de KB>0 y de haber entrenado el índice.

In [None]:
import asyncio, nest_asyncio
from telegram.ext import Application, MessageHandler, filters

async def on_message(update, context):
    q = (update.message.text or '').strip()
    if not q:
        await update.message.reply_text('Envíame un texto y responderé con lo que haya en tus archivos.')
        return
    ans = responder(q)
    await update.message.reply_text(ans)

async def main_telegram():
    if not TELEGRAM_TOKEN:
        print('⚠️ Falta TELEGRAM_TOKEN. Pégalo en la celda de Claves.')
        return
    if not KB_CHUNKS or _VECTORIZER is None:
        print("⚠️ KB vacía o índice sin entrenar. Carga archivos y ejecuta 'entrenar_indice()' primero.")
    app = Application.builder().token(TELEGRAM_TOKEN).build()
    app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, on_message))
    print('🤖 Bot escuchando…')
    await app.run_polling()

nest_asyncio.apply()
# 👉 Descomenta para lanzar el bot:
await main_telegram()