# Laboratorio TECH: Chatbot RAG + S3 + Telegram
By **Ing. Engler Gonzalez**

### Novedades clave
- **LLM seleccionable**: OpenAI *o* Gemini. Si no hay API key, cae en *modo fragmento más relevante*.
- **Persistencia real** en S3: índice FAISS guardado/cargado por equipo + reconstrucción desde `docs/`.
- **Telegram completo**: los clientes escriben al bot, el admin ve todos los chats, activa/desactiva **Auto-responder** y puede **intervenir** en cualquier momento.
- **Transcripciones** por `chat_id` en `mini_chatbot_work/logs/`.


## 0) Instalación

In [1]:
# %%capture
!pip -q install sentence-transformers faiss-cpu pypdf gradio boto3 openai==1.* tiktoken requests google-generativeai


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m44.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m322.5/322.5 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.0/14.0 MB[0m [31m48.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.7/85.7 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h

## 1) Configuración

In [2]:
# A) Entrada interactiva
import os
from getpass import getpass
from google.colab import userdata

# --- AWS / S3 ---
AWS_REGION = os.getenv("AWS_REGION", "us-east-2")
S3_BUCKET  = os.getenv("S3_BUCKET", "talentotech2025")
S3_PREFIX  = os.getenv("S3_PREFIX", "IA-Innovador/")  # prefijo base del curso

# --- LLM ---
LLM_PROVIDER = (os.getenv("LLM_PROVIDER") or "openai").lower()   # "openai" o "gemini"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or userdata.get("OPENAI_API_KEY")
OPENAI_MODEL   = os.getenv("OPENAI_MODEL")   or "gpt-4o-mini"
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") or userdata.get("GOOGLE_API_KEY")
GEMINI_MODEL   = os.getenv("GEMINI_MODEL")   or "gemini-1.5-flash"


# Pide claves si faltan (opcional)
AWS_ACCESS_KEY_ID     = os.getenv("AWS_ACCESS_KEY_ID")     or input("AWS_ACCESS_KEY_ID: ").strip()
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") or getpass("AWS_SECRET_ACCESS_KEY (oculto): ").strip()
AWS_SESSION_TOKEN     = os.getenv("AWS_SESSION_TOKEN")     # opcional (solo STS/Academy)

# Persistir en entorno
env = {
"AWS_REGION":AWS_REGION, "S3_BUCKET":S3_BUCKET, "S3_PREFIX":S3_PREFIX,
"AWS_ACCESS_KEY_ID":AWS_ACCESS_KEY_ID, "AWS_SECRET_ACCESS_KEY":AWS_SECRET_ACCESS_KEY
}
if AWS_SESSION_TOKEN: env["AWS_SESSION_TOKEN"]=AWS_SESSION_TOKEN
for k,v in env.items(): os.environ[k]=v

# LLM env (deja vacío si no tienes)
os.environ["LLM_PROVIDER"]=LLM_PROVIDER
if OPENAI_API_KEY: os.environ["OPENAI_API_KEY"]=OPENAI_API_KEY
os.environ["OPENAI_MODEL"]=OPENAI_MODEL
if GOOGLE_API_KEY: os.environ["GOOGLE_API_KEY"]=GOOGLE_API_KEY
os.environ["GEMINI_MODEL"]=GEMINI_MODEL

print("✔ AWS:", AWS_REGION, S3_BUCKET, S3_PREFIX)

if OPENAI_API_KEY or GOOGLE_API_KEY:
    print("✅ API Key(s) cargada(s) correctamente.")
    print("✔ LLM provider:", LLM_PROVIDER, "| OpenAI model:", OPENAI_MODEL, "| Gemini model:", GEMINI_MODEL)
else:
    print("⚠️ No se detectaron API Keys. El chat funcionará en modo 'fragmento más relevante'.")
    print("✔ LLM provider: Fragmento más relevante")

AWS_ACCESS_KEY_ID: AKIAQNBYZOKR56S2G57T
AWS_SECRET_ACCESS_KEY (oculto): ··········
✔ AWS: us-east-2 talentotech2025 IA-Innovador/
✅ API Key(s) cargada(s) correctamente.
✔ LLM provider: openai | OpenAI model: gpt-4o-mini | Gemini model: gemini-1.5-flash


## 2) S3 helpers (autoregión, listar carpetas/archivos, sync, prefijo efectivo)

In [3]:
import os, boto3, datetime, re, json, time, threading, requests, unicodedata
from pathlib import Path
from botocore.exceptions import ClientError
from typing import Any, Dict, List, Optional


def norm_prefix(p: str) -> str:
    if p is None:
        return ""
    p = p.strip().replace("\\", "/")
    p = p.lstrip("/")
    if p and not p.endswith("/"):
        p += "/"
    return p


def get_bucket_region(bucket: str) -> str:
    s3g = boto3.client("s3")
    loc = s3g.get_bucket_location(Bucket=bucket).get("LocationConstraint")
    return "us-east-1" if loc in (None, "EU") else loc


def s3_client_autoregion(bucket: str):
    try:
        region = get_bucket_region(bucket)
    except Exception:
        region = os.getenv("AWS_REGION", "us-east-2")
    return boto3.client("s3", region_name=region)


def s3_list_objects(bucket: str, prefix: str, delimiter: str = None):
    s3 = s3_client_autoregion(bucket)
    kwargs = {"Bucket": bucket, "Prefix": norm_prefix(prefix)}
    if delimiter:
        kwargs["Delimiter"] = delimiter
    keys, folders = [], []
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(**kwargs):
        for obj in page.get("Contents", []) or []:
            keys.append(obj["Key"])
        for cp in page.get("CommonPrefixes", []) or []:
            folders.append(cp["Prefix"])
    return keys, folders


def s3_list_immediate_folders(bucket: str, base_prefix: str):
    _, folders = s3_list_objects(bucket, norm_prefix(base_prefix), delimiter="/")
    return sorted({f.split("/")[-2] for f in folders}) if folders else []


def s3_sync_docs_to_local(bucket: str, prefix_docs: str, local_folder: str):
    s3 = s3_client_autoregion(bucket)
    prefix_docs = norm_prefix(prefix_docs)
    Path(local_folder).mkdir(parents=True, exist_ok=True)
    count = 0
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix_docs):
        for obj in page.get("Contents", []) or []:
            key = obj["Key"]
            if key.endswith("/"):
                continue
            rel = key[len(prefix_docs):]
            out = Path(local_folder) / rel
            out.parent.mkdir(parents=True, exist_ok=True)
            s3.download_file(bucket, key, str(out))
            count += 1
    return count


def s3_sync_local_docs_to_s3(bucket: str, prefix_docs: str, local_folder: str):
    s3 = s3_client_autoregion(bucket)
    prefix_docs = norm_prefix(prefix_docs)
    count = 0
    for root, _, files in os.walk(local_folder):
        for name in files:
            full = Path(root) / name
            rel = Path(full).relative_to(local_folder).as_posix()
            key = prefix_docs + rel
            s3.upload_file(str(full), bucket, key)
            count += 1
    return count


print("✔ Helpers S3 OK")


SYS_PROMPT = (
    "Eres un Generador de Conocimiento Conversacional (CEREBRO) para un chatbot RAG orientado a clientes de Talento TECH. "
    "Hablas en español neutro con tono cercano, empático y proactivo. "
    "Usa únicamente la información del contexto documental proporcionado. "
    "Si la respuesta no está en el contexto, explica qué falta y solicita el dato necesario con amabilidad. "
    "Resume en pasos claros cuando aplique y evita sonar como administrador interno."
)


def format_context(hits):
    lines = []
    for score, ch in hits:
        snippet = (ch.text[:350] + "…") if len(ch.text) > 350 else ch.text
        lines.append(f"[{ch.source_name} | score={score:.3f}] {snippet}")
    return "\n\n".join(lines)



def normalize_for_matching(text: str) -> str:
    text = unicodedata.normalize("NFKD", text or "")
    return "".join(ch for ch in text if not unicodedata.combining(ch)).lower()


def extract_relevant_sentences(question: str, hits, limit: int = 3):
    if not hits:
        return []
    question_norm = normalize_for_matching(question)
    keywords = [tok for tok in re.findall(r"\w+", question_norm) if len(tok) > 2]
    sentences = []
    seen = set()
    for _score, chunk in hits:
        text = getattr(chunk, "text", "") or ""
        parts = re.split(r"(?<=[.!?])\s+|\n+", text)
        for part in parts:
            cleaned = part.strip()
            if not cleaned:
                continue
            normalized = normalize_for_matching(cleaned)
            if keywords and not any(tok in normalized for tok in keywords):
                continue
            if normalized in seen:
                continue
            seen.add(normalized)
            sentences.append(cleaned)
            if len(sentences) >= limit:
                return sentences
    if not sentences:
        fallback = (getattr(hits[0][1], "text", "") or "").strip()
        if fallback:
            sentences.append(fallback[:400])
    return sentences[:limit]

def available_sources() -> List[str]:
    names: List[str] = []
    meta_path = globals().get("META_PATH")
    if meta_path:
        try:
            with open(meta_path, "r", encoding="utf-8") as handle:
                data = json.load(handle) or []
            for entry in data:
                if not isinstance(entry, dict):
                    continue
                name = entry.get("source_name")
                if name:
                    names.append(str(name))
        except Exception:
            pass
    chunks = globals().get("_chunks")
    if chunks:
        for chunk in chunks:
            name = getattr(chunk, "source_name", "")
            if name:
                names.append(str(name))
    seen = set()
    ordered: List[str] = []
    for name in names:
        if name not in seen:
            seen.add(name)
            ordered.append(name)
    return ordered


def build_fallback_message(question: str, hits=None) -> str:
    hits = list(hits or [])
    helper = globals().get("extract_relevant_sentences")
    snippets: List[str] = []
    if callable(helper):
        try:
            snippets = helper(question, hits, limit=3)
        except Exception:
            snippets = []
    if snippets:
        bullets = "\n".join(f"• {s}" for s in snippets)
        return (
            "Esto es lo más cercano que encontré en tus archivos:\n"
            + bullets
            + "\n\nSi necesitas que profundice en algo, dime qué detalle buscas."
        )
    sources = available_sources()
    if sources:
        return (
            "Aún no veo datos directos sobre eso. Revisa o comparte información en estos archivos disponibles: "
            + ", ".join(sources)
            + ". Si tienes más contexto, cuéntamelo y sigo buscando."
        )
    return (
        "Aún no tengo información sobre ese tema en los archivos cargados. "
        "Comparte más detalles o sube un documento relacionado y con gusto investigo."
    )



class LLMClient:
    DEFAULT_PROVIDER = "openai"
    FALLBACK_PROVIDER = "fragmento"

    def __init__(self):
        self._oai = None
        self._gem = None
        self.provider = self.DEFAULT_PROVIDER
        self.temperature = 0.2
        self.max_tokens = 400
        self.openai_model = "gpt-4o-mini"
        self.gemini_model = "gemini-1.5-flash"
        self.effective_provider = self.DEFAULT_PROVIDER
        self._cached_provider = None
        self._cached_openai_model = None
        self._cached_gemini_model = None
        self.configure(persist=False)

    def configure(self, provider: Optional[str] = None, temperature: Optional[float] = None,
                  max_tokens: Optional[int] = None, openai_model: Optional[str] = None,
                  gemini_model: Optional[str] = None, persist: bool = True) -> Dict[str, Any]:
        if provider is None:
            provider = os.getenv("LLM_PROVIDER", self.provider)
        provider = (provider or "").strip().lower()
        if provider not in {"openai", "gemini"}:
            provider = self.FALLBACK_PROVIDER
        self.provider = provider

        if temperature is None:
            temperature = os.getenv("LLM_TEMPERATURE", self.temperature)
        self.temperature = float(temperature)

        if max_tokens is None:
            max_tokens = os.getenv("LLM_MAX_TOKENS", self.max_tokens)
        self.max_tokens = int(max_tokens)

        if openai_model is None:
            openai_model = os.getenv("OPENAI_MODEL", self.openai_model)
        self.openai_model = (openai_model or "gpt-4o-mini").strip()

        if gemini_model is None:
            gemini_model = os.getenv("GEMINI_MODEL", self.gemini_model)
        self.gemini_model = (gemini_model or "gemini-1.5-flash").strip()

        if persist:
            os.environ["LLM_PROVIDER"] = self.provider
            os.environ["LLM_TEMPERATURE"] = str(self.temperature)
            os.environ["LLM_MAX_TOKENS"] = str(self.max_tokens)
            os.environ["OPENAI_MODEL"] = self.openai_model
            os.environ["GEMINI_MODEL"] = self.gemini_model

        if self.provider != self._cached_provider:
            self._oai = None
            self._gem = None
        if self.openai_model != self._cached_openai_model:
            self._oai = None
        if self.gemini_model != self._cached_gemini_model:
            self._gem = None

        self._cached_provider = self.provider
        self._cached_openai_model = self.openai_model
        self._cached_gemini_model = self.gemini_model

        return self.status()

    def status(self) -> Dict[str, Any]:
        has_openai = bool(os.getenv("OPENAI_API_KEY"))
        has_gemini = bool(os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY"))
        available: List[str] = []
        if has_openai:
            available.append("openai")
        if has_gemini:
            available.append("gemini")
        if not available:
            available = [self.FALLBACK_PROVIDER]
        effective = self.provider
        if effective not in {"openai", "gemini"}:
            effective = self.FALLBACK_PROVIDER
        elif effective not in available:
            effective = self.FALLBACK_PROVIDER
        self.effective_provider = effective
        return {
            "provider": effective,
            "configured_provider": self.provider,
            "available": available,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "openai_model": self.openai_model,
            "gemini_model": self.gemini_model,
            "has_openai_key": has_openai,
            "has_gemini_key": has_gemini,
        }

    def _ensure_openai(self):
        key = os.getenv("OPENAI_API_KEY")
        if not key:
            return None
        if self._oai is None:
            from openai import OpenAI
            self._oai = OpenAI(api_key=key)
        return self._oai

    def _ensure_gemini(self):
        key = os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY")
        if not key:
            return None
        if self._gem is None:
            import google.generativeai as genai
            genai.configure(api_key=key)
            self._gem = genai.GenerativeModel(self.gemini_model)
        return self._gem

    def _fallback_answer(self, question: str, hits) -> str:
        return build_fallback_message(question, hits)

    def generate(self, question: str, hits: Optional[List] = None,
                 temperature: Optional[float] = None, max_tokens: Optional[int] = None,
                 chat_history: Optional[List[Dict[str, str]]] = None) -> str:
        hits = hits or []
        context = format_context(hits) if hits else "(sin fragmentos relevantes)"
        temp = float(self.temperature if temperature is None else temperature)
        mtok = int(self.max_tokens if max_tokens is None else max_tokens)
        provider = self.status()["provider"]

        history_messages = []
        for turn in (chat_history or [])[-8:]:
            role = "assistant" if turn.get("role") == "assistant" else "user"
            content = (turn.get("content") or "").strip()
            if content:
                history_messages.append((role, content))

        if provider == "openai":
            cli = self._ensure_openai()
            if cli:
                try:
                    msgs = [{"role": "system", "content": SYS_PROMPT}]
                    for role, content in history_messages:
                        msgs.append({"role": role, "content": content})
                    user_sections = [f"Pregunta actual del usuario: {question}"]
                    user_sections.append(f"Contexto documental relevante:\n{context}")
                    user_sections.append("Responde de forma conversacional y orientada al cliente usando solo ese contexto.")
                    msgs.append({"role": "user", "content": "\n\n".join(user_sections)})
                    resp = cli.chat.completions.create(
                        model=self.openai_model,
                        messages=msgs,
                        temperature=temp,
                        max_tokens=mtok,
                    )
                    return (resp.choices[0].message.content or "").strip()
                except Exception as exc:
                    return f"⚠️ Error OpenAI: {exc}"
        elif provider == "gemini":
            model = self._ensure_gemini()
            if model:
                try:
                    history_lines = "\n".join(
                        ("Usuario: " + content) if role == "user" else ("Asistente: " + content)
                        for role, content in history_messages
                    )
                    prompt_parts = [SYS_PROMPT]
                    if history_lines:
                        prompt_parts.append("Conversación previa:\n" + history_lines)
                    prompt_parts.append(f"Pregunta actual del usuario: {question}")
                    prompt_parts.append(f"Contexto documental relevante:\n{context}")
                    prompt_parts.append("Responde de forma conversacional y orientada al cliente usando solo ese contexto.")
                    prompt = "\n\n".join(prompt_parts)
                    out = model.generate_content(prompt)
                    return (getattr(out, "text", "") or "").strip()
                except Exception as exc:
                    return f"⚠️ Error Gemini: {exc}"
        return self._fallback_answer(question, hits)


LLM = LLMClient()

def collect_sources(hits):
    return sorted({getattr(h[1], "source_name", "") for h in hits if getattr(h[1], "source_name", "")})


def rag_answer(question: str, top_k: int = 4, temperature: Optional[float] = None,
               max_tokens: Optional[int] = None, with_sources: bool = True, hits: Optional[List] = None,
               chat_history: Optional[List[Dict[str, str]]] = None):
    if hits is not None:
        use_hits = list(hits)
        if top_k is not None and int(top_k) > 0:
            use_hits = use_hits[:int(top_k)]
    else:
        loader = globals().get("ensure_index_loaded")
        if callable(loader):
            try:
                loader()
            except Exception:
                pass
        retrieval_question = question
        if chat_history:
            recent_user_turns = [t.get("content") for t in chat_history if t.get("role") == "user"]
            if recent_user_turns:
                retrieval_question = " ".join(recent_user_turns[-2:] + [question])
        use_hits = retrieve(retrieval_question, top_k=top_k) if "retrieve" in globals() else []
    if not use_hits:
        return build_fallback_message(question, []), []
    answer = LLM.generate(
        question,
        hits=use_hits,
        temperature=temperature,
        max_tokens=max_tokens,
        chat_history=chat_history,
    )
    if not (answer or "").strip():
        answer = build_fallback_message(question, use_hits)
    sources = collect_sources(use_hits)
    if with_sources and sources and "Fuentes" not in answer:
        answer = answer.rstrip() + "\n\nFuentes: " + ", ".join(sources)
    return answer, sources

✔ Helpers S3 OK


## 3) Núcleo RAG (loaders → chunking → FAISS → retrieval)

In [4]:
import os, json, uuid, shutil
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional
import pandas as pd
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer
import numpy as np, faiss, time

BASE_DIR = Path.cwd() / "mini_chatbot_work"
DOCS_DIR = BASE_DIR / "docs_raw"
INDEX_DIR = BASE_DIR / "faiss_index"
LOGS_DIR = BASE_DIR / "logs"
META_PATH = BASE_DIR / "docs_metadata.json"
for path in [BASE_DIR, DOCS_DIR, INDEX_DIR, LOGS_DIR]:
    path.mkdir(parents=True, exist_ok=True)


def load_txt(path: Path) -> str:
    try:
        return path.read_text(encoding="utf-8", errors="ignore")
    except Exception as exc:
        return f"[ERROR TXT] {exc}"


def load_pdf(path: Path) -> str:
    try:
        reader = PdfReader(str(path))
        return "\n".join((page.extract_text() or "") for page in reader.pages)
    except Exception as exc:
        return f"[ERROR PDF] {exc}"


def load_csv(path: Path, n: int = 1500) -> str:
    try:
        df = pd.read_csv(path, nrows=n)
    except UnicodeDecodeError:
        df = pd.read_csv(path, nrows=n, encoding="latin-1")
    preview = df.head(20).to_markdown(index=False)
    return "\n".join([
        f"# CSV: {path.name}",
        f"Columnas: {list(df.columns)}",
        "Muestra:\n" + preview,
    ])


def load_any(path: Path) -> str:
    ext = path.suffix.lower()
    if ext in {".txt", ".md"}:
        return load_txt(path)
    if ext == ".pdf":
        return load_pdf(path)
    if ext == ".csv":
        return load_csv(path)
    return f"[BINARIO] {path.name} (no indexado)"


@dataclass
class ChunkedDoc:
    doc_id: str
    source_name: str
    chunk_id: int
    text: str


def chunk_text(text: str, chunk_size: int = 800, overlap: int = 150) -> List[str]:
    tokens = text.split()
    if not tokens:
        return []
    pieces: List[str] = []
    step = max(1, chunk_size - overlap)
    i = 0
    while i < len(tokens):
        pieces.append(" ".join(tokens[i : i + chunk_size]))
        i += step
    return pieces


EMB_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
_emb = None
_index = None
_chunks: List[ChunkedDoc] = []


def get_model():
    global _emb
    if _emb is None:
        _emb = SentenceTransformer(EMB_MODEL_NAME)
    return _emb


def build_index_from_local(paths: List[Path], chunk_size: int = 800, overlap: int = 150):
    global _index, _chunks
    metas = []
    chunks: List[ChunkedDoc] = []
    for path in paths:
        path = Path(path)
        if not path.is_file():
            continue
        raw = load_any(path)
        doc_id = str(uuid.uuid4())
        metas.append({"doc_id": doc_id, "source_name": path.name, "path": str(path)})
        for idx, chunk in enumerate(chunk_text(raw, chunk_size, overlap)):
            if not chunk.strip():
                continue
            chunks.append(ChunkedDoc(doc_id=doc_id, source_name=path.name, chunk_id=idx, text=chunk))
    if not chunks:
        _index = None
        _chunks = []
        return 0, len(metas)

    embeddings = get_model().encode(
        [chunk.text for chunk in chunks],
        show_progress_bar=True,
        convert_to_numpy=True,
        normalize_embeddings=True,
    ).astype(np.float32)
    index = faiss.IndexFlatIP(embeddings.shape[1])
    index.add(embeddings)
    _index = index
    _chunks = chunks

    INDEX_DIR.mkdir(parents=True, exist_ok=True)
    faiss.write_index(_index, str(INDEX_DIR / "faiss.index"))
    json.dump([chunk.__dict__ for chunk in chunks], open(INDEX_DIR / "chunks.json", "w", encoding="utf-8"), ensure_ascii=False)
    json.dump(metas, open(META_PATH, "w", encoding="utf-8"), ensure_ascii=False, indent=2)
    return len(chunks), len(metas)


def load_index_local() -> bool:
    global _index, _chunks
    fidx = INDEX_DIR / "faiss.index"
    fch = INDEX_DIR / "chunks.json"
    if not (fidx.exists() and fch.exists()):
        return False
    _index = faiss.read_index(str(fidx))
    data = json.load(open(fch, "r", encoding="utf-8"))
    _chunks = [ChunkedDoc(**entry) for entry in data]
    return True


def ensure_index_loaded() -> bool:
    global _index, _chunks
    if _index is not None and _chunks:
        return True
    try:
        return load_index_local()
    except Exception:
        _index = None
        _chunks = []
        return False


def retrieve(question: str, top_k: int = 4) -> List[Tuple[float, ChunkedDoc]]:
    if not ensure_index_loaded():
        return []
    query = get_model().encode([question], convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
    distances, indices = _index.search(query, top_k)
    results: List[Tuple[float, ChunkedDoc]] = []
    for score, idx in zip(distances[0], indices[0]):
        if idx < 0:
            continue
        results.append((float(score), _chunks[idx]))
    return results


def log_event(chat_id: str, role: str, text: str):
    path = LOGS_DIR / f"{chat_id}.jsonl"
    record = {"t": time.time(), "role": role, "text": text}
    with open(path, "a", encoding="utf-8") as handle:
        handle.write(json.dumps(record, ensure_ascii=False) + "\n")


print("✔ Núcleo RAG OK")


✔ Núcleo RAG OK


### 4) Cargar/Guardar índice en S3 + reconstrucción desde docs en S3

In [5]:
def effective_team_prefix(base_prefix: str, team_folder: str):
    return norm_prefix(base_prefix) + norm_prefix(team_folder)


def s3_upload_index(bucket: str, base_prefix: str, team_folder: str):
    idx_prefix_root = effective_team_prefix(base_prefix, team_folder)
    idx_prefix_sub = idx_prefix_root + "index/"
    if not (INDEX_DIR / "faiss.index").exists() or not (INDEX_DIR / "chunks.json").exists():
        return "❌ No hay índice local (faiss.index / chunks.json). Construye primero."
    s3 = s3_client_autoregion(bucket)
    for target_prefix in [idx_prefix_root, idx_prefix_sub]:
        for name in ["faiss.index", "chunks.json"]:
            s3.upload_file(str(INDEX_DIR / name), bucket, f"{target_prefix}{name}")
        if META_PATH.exists():
            s3.upload_file(str(META_PATH), bucket, f"{target_prefix}docs_metadata.json")
    return f"☁️ Subido a: s3://{bucket}/{idx_prefix_root}  y  s3://{bucket}/{idx_prefix_sub}"


def _download_index_from_prefix(bucket: str, prefix: str):
    s3 = s3_client_autoregion(bucket)
    found = set()
    wanted = ["faiss.index", "chunks.json", "docs_metadata.json"]
    keys, _ = s3_list_objects(bucket, prefix)
    for key in keys:
        base = key.split("/")[-1]
        if base in wanted:
            out = INDEX_DIR / base if base != "docs_metadata.json" else META_PATH
            out.parent.mkdir(parents=True, exist_ok=True)
            s3.download_file(bucket, key, str(out))
            found.add(base)
    return found


def s3_download_index(bucket: str, base_prefix: str, team_folder: str):
    idx_prefix_root = effective_team_prefix(base_prefix, team_folder)
    idx_prefix_sub = idx_prefix_root + "index/"
    found = _download_index_from_prefix(bucket, idx_prefix_root)
    if not {"faiss.index", "chunks.json"}.issubset(found):
        found = _download_index_from_prefix(bucket, idx_prefix_sub)
    if {"faiss.index", "chunks.json"}.issubset(found):
        ok = load_index_local()
        return "📥 Índice cargado." if ok else "❌ Descargado pero falló carga local."
    return f"❌ No encontré índice en {idx_prefix_root} ni {idx_prefix_sub}."


def s3_rebuild_from_docs(bucket: str, base_prefix: str, team_folder: str, chunk_size=800, overlap=150):
    docs_prefix = effective_team_prefix(base_prefix, team_folder) + "docs/"
    count = s3_sync_docs_to_local(bucket, docs_prefix, str(DOCS_DIR))
    if count == 0:
        return "❌ No hay documentos en S3 (carpeta 'docs/'). Sube alguno primero."
    n_chunks, n_docs = build_index_from_local(list(Path(DOCS_DIR).glob('*')), chunk_size, overlap)
    return f"✅ Reconstruido desde S3: {n_docs} docs → {n_chunks} chunks."


def s3_upload_local_docs(bucket: str, base_prefix: str, team_folder: str):
    docs_prefix = effective_team_prefix(base_prefix, team_folder) + "docs/"
    count = s3_sync_local_docs_to_s3(bucket, docs_prefix, str(DOCS_DIR))
    return f"☁️ Subidos {count} archivo(s) a s3://{bucket}/{docs_prefix}"


def s3_download_docs(bucket: str, base_prefix: str, team_folder: str):
    docs_prefix = effective_team_prefix(base_prefix, team_folder) + "docs/"
    count = s3_sync_docs_to_local(bucket, docs_prefix, str(DOCS_DIR))
    return f"📥 Descargados {count} archivo(s) a {DOCS_DIR}"


## 5) UI Gradio — S3/equipo, Docs, Índice, Persistencia, Preguntas, Telegram (Admin)

In [6]:
import os, re, json, time, threading, requests
from pathlib import Path
from typing import Any, Dict, Optional, List




def _tt_answer_logic(question: str, for_telegram: bool = False,
                       chat_history: Optional[List[Dict[str, str]]] = None) -> str:
    question = (question or "").strip()
    if not question:
        return "Envíame un texto y responderé con lo que haya en tus archivos."

    chat_history = [
        {"role": item.get("role", "user"), "content": (item.get("content") or "").strip()}
        for item in (chat_history or [])
        if (item.get("content") or "").strip()
    ]

    fallback_error = None

    retrieval_query = question
    recent_user_turns = [t["content"] for t in chat_history if t.get("role") == "user"]
    if recent_user_turns:
        retrieval_query = " ".join(recent_user_turns[-2:] + [question])

    hits = []
    top_hits = []
    if 'retrieve' in globals():
        try:
            hits = retrieve(retrieval_query, top_k=6)
            if hits:
                limit = max(1, int(min(4, len(hits))))
                top_hits = hits[:limit]
        except Exception:
            hits = []
            top_hits = []

    if 'responder' in globals():
        try:
            reply = responder(question)
            if for_telegram:
                reply = re.sub(r"\n\s*Fuentes\s*:.*$", "", reply, flags=re.S)
            return reply
        except Exception as exc:
            fallback_error = f"⚠️ Error en responder(): {exc}"

    try:
        temp_env = os.getenv("LLM_TEMPERATURE")
        temperature = float(temp_env) if temp_env is not None else None
    except Exception:
        temperature = None
    try:
        max_tok_env = os.getenv("LLM_MAX_TOKENS")
        max_tokens = int(max_tok_env) if max_tok_env is not None else None
    except Exception:
        max_tokens = None

    if 'rag_answer' in globals():
        try:
            answer, _ = rag_answer(
                question,
                top_k=len(top_hits) or 4,
                temperature=temperature,
                max_tokens=max_tokens,
                with_sources=not for_telegram,
                hits=top_hits if top_hits else None,
                chat_history=chat_history,
            )
            return answer
        except Exception as exc:
            fallback_error = f"⚠️ Error en rag_answer(): {exc}"

    chosen_hits = top_hits if top_hits else hits
    answer = LLM.generate(
        question,
        hits=chosen_hits,
        temperature=temperature,
        max_tokens=max_tokens,
        chat_history=chat_history,
    )
    if not for_telegram and chosen_hits:
        sources = collect_sources(chosen_hits)
        if sources and "Fuentes" not in answer:
            answer = answer.rstrip() + "\n\nFuentes: " + ", ".join(sources)
    if fallback_error and "⚠️" not in answer:
        answer = answer.rstrip() + "\n\n" + fallback_error
    return answer or "No encuentro esa información en mis archivos."


class TTGram:
    HOLD_AFTER_ADMIN = 60
    HOLD_AFTER_ALERT = 120
    AUTO_CLOSE_AFTER = 600
    HISTORY_LIMIT = 12
    ALERT_REGEX = re.compile(r"\b(asesor|ayuda)\b", re.I)

    def __init__(self):
        base_token = (
            os.getenv("TELEGRAM_BOT_TOKEN")
            or (userdata.get("TELEGRAM_BOT_TOKEN") if 'userdata' in globals() else None)
            or ""
        )
        self.token = base_token.strip()
        self.offset = None
        self.global_auto = True
        self.stop = threading.Event()
        self.thread = None
        self.known: Dict[str, Dict[str, Any]] = {}
        self.admin_chat_id = (os.getenv("ADMIN_CHAT_ID") or "").strip()
        self.logs = LOGS_DIR if 'LOGS_DIR' in globals() else Path.cwd() / "mini_chatbot_work" / "logs"
        self.logs.mkdir(parents=True, exist_ok=True)

    def shutdown(self):
        """Detiene hilos/polling activos para evitar duplicados al re-ejecutar la celda."""
        self.stop.set()
        thread = getattr(self, "thread", None)
        if thread and thread.is_alive():
            try:
                thread.join(timeout=2)
            except Exception:
                pass
        self.thread = None

    # --------------- core helpers ---------------
    def _known_meta(self, chat_id: str) -> Dict[str, Any]:
        return self.known.setdefault(chat_id, {
            "title": "",
            "last_text": "",
            "auto": None,
            "hold_until": 0.0,
            "last_bot_reply": "",
            "last_bot_ts": 0.0,
            "history": [],
        })

    def _update_known(self, chat_id: str, title: str = "") -> Dict[str, Any]:
        meta = self._known_meta(chat_id)
        if title and not meta.get("title"):
            meta["title"] = title
        return meta

    def _append_history(self, meta: Dict[str, Any], role: str, text: str) -> List[Dict[str, str]]:
        history = meta.setdefault("history", [])
        clean = (text or "").strip()
        if not clean:
            return history
        history.append({"role": role, "content": clean})
        if len(history) > self.HISTORY_LIMIT:
            del history[:-self.HISTORY_LIMIT]
        return history


    def _log(self, chat_id: str, role: str, text: str):
        record = {"t": time.time(), "role": role, "text": text}
        try:
            with open(self.logs / f"{chat_id}.jsonl", "a", encoding="utf-8") as handle:
                handle.write(json.dumps(record, ensure_ascii=False) + "\n")
        except Exception:
            pass

    def _send(self, chat_id: str, text: str):
        if not self.token:
            raise RuntimeError("No TELEGRAM_BOT_TOKEN configurado.")
        response = requests.post(
            f"https://api.telegram.org/bot{self.token}/sendMessage",
            json={"chat_id": chat_id, "text": text},
            timeout=15,
        )
        response.raise_for_status()
        return response.json()

    def _send_silent(self, chat_id: str, text: str, role: Optional[str] = None, dedupe: bool = False) -> bool:
        meta = None
        if dedupe or role == "bot":
            meta = self._known_meta(chat_id)
            if dedupe and role == "bot":
                last = (meta.get("last_bot_reply") or "").strip().lower()
                if last and last == text.strip().lower():
                    return False
        try:
            self._send(chat_id, text)
            if role == "bot":
                if meta is None:
                    meta = self._known_meta(chat_id)
                meta["last_bot_reply"] = text.strip()
                meta["last_bot_ts"] = time.time()
            elif role == "admin":
                if meta is None:
                    meta = self._known_meta(chat_id)
                meta["last_admin_reply"] = text.strip()
            return True
        except Exception as exc:
            self._log(chat_id, "error", f"send fail: {exc}")
            return False


    def _mirror_to_admin(self, message: str, origin_chat: Optional[str] = None):
        admin = getattr(self, "admin_chat_id", "")
        if not admin or admin == origin_chat:
            return
        self._send_silent(admin, message)

    def _set_hold(self, chat_id: str, seconds: float):
        meta = self._known_meta(chat_id)
        meta["hold_until"] = time.time() + float(seconds)

    def _set_auto_chat(self, chat_id: str, flag: bool):
        meta = self._known_meta(chat_id)
        meta["auto"] = bool(flag)
        meta["hold_until"] = 0.0
        meta["awaiting_admin"] = False
        meta["closed_notified"] = False

    def _should_auto(self, chat_id: str) -> bool:
        meta = self._known_meta(chat_id)
        if meta.get("awaiting_admin"):
            return False
        hold_until = meta.get("hold_until", 0)
        if hold_until and time.time() < hold_until:
            return False
        auto = meta.get("auto")
        if auto is None:
            auto = self.global_auto
        return bool(auto)

    def _delete_webhook(self):
        if not self.token:
            return
        try:
            requests.get(f"https://api.telegram.org/bot{self.token}/deleteWebhook", timeout=10)
        except Exception:
            pass

    def _me(self):
        if not self.token:
            return "No TELEGRAM_BOT_TOKEN configurado."
        try:
            resp = requests.get(f"https://api.telegram.org/bot{self.token}/getMe", timeout=10)
            resp.raise_for_status()
            data = resp.json().get("result", {})
            username = data.get("username", "(sin username)")
            return "Bot: @" + username
        except Exception as exc:
            return f"Token/Bot error: {exc}"

    # --------------- admin commands ---------------
    def _handle_admin_command(self, text: str) -> bool:
        cid = getattr(self, "admin_chat_id", "")
        if not cid:
            return False
        cmd = text.strip()
        if cmd == "/admin":
            self._send_silent(cid, "Ya eres canal admin ✅", role="bot")
            return True
        if cmd.startswith("/auto_on"):
            parts = cmd.split()
            if len(parts) >= 2:
                self._set_auto_chat(parts[1], True)
                self._send_silent(cid, f"Auto ON para {parts[1]}", role="bot")
            return True
        if cmd.startswith("/auto_off"):
            parts = cmd.split()
            if len(parts) >= 2:
                self._set_auto_chat(parts[1], False)
                self._send_silent(cid, f"Auto OFF para {parts[1]}", role="bot")
            return True
        if cmd.startswith("/say"):
            parts = cmd.split(maxsplit=2)
            if len(parts) >= 3:
                target, message = parts[1], parts[2]
                if self._send_silent(target, message, role="admin"):
                    self._log(target, "admin", message)
                self._set_hold(target, self.HOLD_AFTER_ADMIN)
                meta = self._known_meta(target)
                meta["awaiting_admin"] = False
                meta["closed_notified"] = False
                self._mirror_to_admin(f"[ADMIN→{target}] {message}", origin_chat=cid)
            return True
        if cmd.startswith("/close"):
            parts = cmd.split(maxsplit=1)
            if len(parts) >= 2:
                self.close_chat(parts[1], notify_user=True)
            return True
        return False

    # --------------- public API ---------------
    def set_token(self, token: str):
        token = (token or self.token or os.getenv("TELEGRAM_BOT_TOKEN")
                 or (userdata.get("TELEGRAM_BOT_TOKEN") if 'userdata' in globals() else "")).strip()
        if token.upper().startswith("BOT:"):
            token = token.split(":", 1)[1].strip()
        self.token = token
        if token:
            os.environ["TELEGRAM_BOT_TOKEN"] = token
            if 'userdata' in globals():
                try:
                    userdata["TELEGRAM_BOT_TOKEN"] = token
                except Exception:
                    pass
        self._delete_webhook()
        return self._me()

    def toggle_poll(self, flag: bool):
        if flag:
            if self.thread and self.thread.is_alive():
                return "Auto-escuchar: ya activo."
            self.stop.clear()
            self._delete_webhook()
            self.thread = threading.Thread(target=self._loop, daemon=True)
            self.thread.start()
            return "Auto-escuchar: ON"
        self.stop.set()
        thread = getattr(self, "thread", None)
        if thread and thread.is_alive():
            try:
                thread.join(timeout=2)
            except Exception:
                pass
        self.thread = None
        return "Auto-escuchar: OFF"

    def poll_once(self):
        if not self.token:
            return "Sin TELEGRAM_BOT_TOKEN"
        try:
            params = {"timeout": 10}
            if self.offset is not None:
                params["offset"] = self.offset
            resp = requests.get(
                f"https://api.telegram.org/bot{self.token}/getUpdates",
                params=params,
                timeout=15,
            )
            if resp.status_code == 409:
                self._delete_webhook()
                self.offset = None
                try:
                    detail = resp.json().get("description", "")
                except Exception:
                    detail = resp.text
                return (
                    "⚠️ Telegram devolvió 409 (posible webhook activo). "
                    "Se eliminó el webhook; vuelve a pulsar 'Leer ahora'. "
                    f"Detalle: {detail.strip()}"
                )
            resp.raise_for_status()
            data = resp.json()
            for item in data.get("result", []):
                self.offset = item["update_id"] + 1
                self.handle_update(item)
            return f"OK, updates: {len(data.get('result', []))}"
        except Exception as exc:
            return f"❌ Poll error: {exc}"

    def poll_now(self):
        return self.poll_once()

    def _loop(self):
        while not self.stop.is_set():
            self.poll_once()
            self._check_auto_close()
            for _ in range(10):
                if self.stop.is_set():
                    break
                time.sleep(0.3)

    def _check_auto_close(self):
        now = time.time()
        for cid, meta in list(self.known.items()):
            alert_ts = meta.get("alert_ts")
            awaiting = meta.get("awaiting_admin", False)
            closed = meta.get("closed_notified", False)
            if awaiting and alert_ts and not closed and now - alert_ts > self.AUTO_CLOSE_AFTER:
                message = (
                    "Cierro esta conversación por inactividad. Si necesitas más ayuda, "
                    "escríbeme de nuevo y retomaré la consulta."
                )
                if self._send_silent(cid, message, role="bot"):
                    self._log(cid, "bot", message)
                meta["awaiting_admin"] = False
                meta["closed_notified"] = True
                meta["hold_until"] = 0.0
                meta["auto"] = self.global_auto
                if self.admin_chat_id:
                    self._send_silent(self.admin_chat_id, f"ℹ️ Conversación {cid} cerrada por inactividad.")

    def list_chats(self):
        if not self.known:
            return "(sin chats aún — envía /start al bot y pulsa 'Leer ahora')"
        now = time.time()
        lines = []
        for cid, meta in self.known.items():
            hold_remaining = max(0, int(meta.get("hold_until", 0) - now))
            auto = meta.get("auto")
            if auto is None:
                auto = self.global_auto
            awaiting = meta.get("awaiting_admin") and not meta.get("closed_notified")
            state = "esperando asesor" if awaiting else "activo"
            alert_age = ""
            if awaiting:
                age = max(0, int(now - meta.get("alert_ts", now)))
                alert_age = f" (hace {age}s)"
            prefix = "⚠️ " if awaiting else ""
            lines.append(
                f"{prefix}{cid} | {meta.get('title','')} | auto={'ON' if auto else 'OFF'} | hold={hold_remaining}s | {state}{alert_age} | último: {meta.get('last_text','')[:60]}"
            )
        return "\n".join(lines)

    def admin_send(self, chat_id: str, text: str):
        chat_id = (chat_id or "").strip()
        if not chat_id:
            return "Selecciona un chat."
        message = text or "(mensaje vacío)"
        try:
            self._send(chat_id, message)
            self._log(chat_id, "admin", message)
            meta = self._known_meta(chat_id)
            meta["last_text"] = message
            meta["awaiting_admin"] = False
            meta["closed_notified"] = False
            self._set_hold(chat_id, self.HOLD_AFTER_ADMIN)
            self._mirror_to_admin(f"[ADMIN→{chat_id}] {message}", origin_chat=self.admin_chat_id)
            return "✅ Mensaje enviado."
        except Exception as exc:
            return f"❌ Error: {exc}"

    def auto_toggle(self, flag: bool):
        self.global_auto = bool(flag)
        return f"Auto-responder (global): {'ON' if self.global_auto else 'OFF'}"

    def view_chat(self, chat_id: str, limit: int = 40):
        chat_id = (chat_id or "").strip()
        if not chat_id:
            return "Indica un chat_id."
        path = self.logs / f"{chat_id}.jsonl"
        if not path.exists():
            return "No hay historial para ese chat."
        try:
            raw = path.read_text(encoding="utf-8")
        except Exception as exc:
            return f"No se pudo leer el historial: {exc}"
        entries = [json.loads(line) for line in raw.splitlines() if line.strip()]
        entries = entries[-limit:]
        formatted = []
        for entry in entries:
            ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(entry.get("t", time.time())))
            role = entry.get("role", "").upper()
            text = entry.get("text", "")
            formatted.append(f"{ts} | {role}: {text}")
        return "\n".join(formatted)

    def release_hold(self, chat_id: str):
        chat_id = (chat_id or "").strip()
        if not chat_id:
            return "Indica un chat_id."
        meta = self._known_meta(chat_id)
        meta["hold_until"] = 0.0
        meta["awaiting_admin"] = False
        meta["closed_notified"] = False
        if meta.get("auto") is None:
            meta["auto"] = self.global_auto
        return f"Hold liberado y auto={'ON' if meta.get('auto') else 'OFF'}"

    def close_chat(self, chat_id: str, notify_user: bool = True):
        chat_id = (chat_id or "").strip()
        if not chat_id:
            return "Indica un chat_id."
        meta = self._known_meta(chat_id)
        closing = "Cierro esta conversación. Si necesitas más ayuda, escríbeme nuevamente." if notify_user else "Soporte cerrado."
        if notify_user:
            self._send_silent(chat_id, closing, role="bot")
            self._log(chat_id, "bot", closing)
        meta["awaiting_admin"] = False
        meta["hold_until"] = time.time() + 2
        meta["auto"] = self.global_auto
        meta["closed_notified"] = True
        if self.admin_chat_id:
            self._send_silent(self.admin_chat_id, f"ℹ️ Conversación {chat_id} marcada como cerrada.")
        return "✅ Conversación cerrada."

    def handle_update(self, item):
        message = item.get("message") or {}
        chat = message.get("chat") or {}
        chat_id = str(chat.get("id")) if chat.get("id") is not None else ""
        text = (message.get("text") or "").strip()
        title = chat.get("username") or chat.get("title") or ""
        if not chat_id or not text:
            return

        sender = message.get("from") or {}
        if sender.get("is_bot"):
            return

        meta = self._update_known(chat_id, title)
        meta["last_text"] = text
        if message.get("date"):
            meta["last_user_ts"] = message["date"]
        else:
            meta["last_user_ts"] = time.time()

        self._log(chat_id, "user", text)
        admin_chat = getattr(self, "admin_chat_id", "")
        is_admin_chat = chat_id == admin_chat
        if not is_admin_chat:
            self._mirror_to_admin(f"[USER→{chat_id}] {text}", origin_chat=chat_id)

        if is_admin_chat:
            if self._handle_admin_command(text):
                return

        if text == "/admin" and not admin_chat:
            self.admin_chat_id = chat_id
            os.environ["ADMIN_CHAT_ID"] = chat_id
            if 'userdata' in globals():
                try:
                    userdata["ADMIN_CHAT_ID"] = chat_id
                except Exception:
                    pass
            self._send_silent(chat_id, "Este chat queda configurado como canal admin ✅", role="bot")
            return

        history_ref = None
        history_for_model: List[Dict[str, str]] = []
        if not is_admin_chat:
            history_ref = self._append_history(meta, "user", text)
            history_for_model = history_ref[:-1]

        if self.ALERT_REGEX.search(text):
            handoff = (
                "He activado soporte humano. Un asesor se unirá en breve. "
                "También puedes seguir escribiendo y lo revisaré."
            )
            if self._send_silent(chat_id, handoff, role="bot"):
                self._log(chat_id, "bot", handoff)
            meta["awaiting_admin"] = True
            meta["alert_ts"] = time.time()
            meta["auto"] = False
            meta["closed_notified"] = False
            self._set_hold(chat_id, self.HOLD_AFTER_ALERT)
            if admin_chat:
                self._send_silent(
                    admin_chat,
                    f"⚠️ ALERTA: {chat_id} pidió ayuda (‘{text}’). Usa /say {chat_id} <mensaje> o /auto_on {chat_id} para reanudar el bot.",
                    role="bot",
                )

        if text.lower() == "/start":
            welcome = (
                "Hola, soy tu asistente de Talento TECH. Escríbeme tu duda y consultaré los archivos de tu equipo."
            )
            if self._send_silent(chat_id, welcome, role="bot"):
                self._log(chat_id, "bot", welcome)
            return

        if self._should_auto(chat_id):
            reply = _tt_answer_logic(text, for_telegram=True, chat_history=history_for_model)
            if self._send_silent(chat_id, reply, role="bot", dedupe=True):
                self._log(chat_id, "bot", reply)
                if history_ref is not None:
                    self._append_history(meta, "assistant", reply)
            self._mirror_to_admin(f"[BOT→{chat_id}] {reply}", origin_chat=chat_id)

    def status(self):
        data = {
            "token_configured": bool(self.token),
            "admin_chat": self.admin_chat_id or "(sin configurar)",
            "global_auto": self.global_auto,
            "known_chats": len(self.known),
            "awaiting_admin": [
                cid
                for cid, meta in self.known.items()
                if meta.get("awaiting_admin") and not meta.get("closed_notified")
            ],
        }
        return json.dumps(data, ensure_ascii=False, indent=2)

    def list_alerts(self):
        waiting = []
        now = time.time()
        for cid, meta in self.known.items():
            if not meta.get("awaiting_admin") or meta.get("closed_notified"):
                continue
            age = max(0, int(now - meta.get("alert_ts", now)))
            title = meta.get("title", "")
            last = meta.get("last_text", "")
            waiting.append(
                f"⚠️ {cid} | {title} | espera desde hace {age}s | último: {last[:80]}"
            )
        if not waiting:
            return "(sin chats solicitando asesor)"
        return "\n".join(waiting)


if "_TTG" in globals():
    try:
        _TTG.shutdown()
    except Exception:
        pass

_TTG = TTGram()


def ui_set_token(token: str):
    return _TTG.set_token(token)


def ui_poll_toggle(flag: bool):
    return _TTG.toggle_poll(bool(flag))


def ui_poll_once():
    return _TTG.poll_now()


def ui_list_chats():
    return _TTG.list_chats()


def ui_admin_send(chat_id: str, text: str):
    return _TTG.admin_send(chat_id, text)


def ui_auto_toggle(flag: bool):
    return _TTG.auto_toggle(bool(flag))


def ui_view_chat(chat_id: str):
    return _TTG.view_chat(chat_id)


def ui_close_chat(chat_id: str):
    return _TTG.close_chat(chat_id, notify_user=True)


def ui_release_hold(chat_id: str):
    return _TTG.release_hold(chat_id)


def ui_list_alerts():
    return _TTG.list_alerts()


def ui_bot_status():
    return _TTG.status()


In [7]:
STATE = {
    "bucket": os.getenv("S3_BUCKET", ""),
    "base_prefix": norm_prefix(os.getenv("S3_PREFIX", "")),
    "team_folder": "",
}


def apply_route(bucket, base_prefix, team_folder):
    STATE["bucket"] = (bucket or '').strip() or STATE["bucket"]
    STATE["base_prefix"] = norm_prefix(base_prefix or STATE["base_prefix"]) \
        or norm_prefix(os.getenv("S3_PREFIX", ""))
    STATE["team_folder"] = (team_folder or '').strip()

    eff_root = effective_team_prefix(STATE["base_prefix"], STATE["team_folder"])
    bucket_name = STATE["bucket"]
    if bucket_name:
        try:
            s3 = s3_client_autoregion(bucket_name)
            for sub in ("docs/", "index/"):
                key = eff_root + sub
                s3.put_object(Bucket=bucket_name, Key=key, Body=b'')
        except Exception:
            pass
    return f"✔ Ruta: s3://{bucket_name}/{eff_root}"


In [None]:
import gradio as gr
import shutil # Import shutil for copying files


def format_llm_status(status: Dict[str, Any]) -> str:
    available = ", ".join(status.get("available", []))
    lines = [
        f"Proveedor activo: {status.get('provider')}",
        f"Proveedor configurado: {status.get('configured_provider')}",
        f"Disponibles: {available or 'fragmento'}",
        f"Temperatura: {status.get('temperature')}",
        f"Máx. tokens: {status.get('max_tokens')}",
        f"Modelo OpenAI: {status.get('openai_model')} ({'llave ✅' if status.get('has_openai_key') else 'sin llave'})",
        f"Modelo Gemini: {status.get('gemini_model')} ({'llave ✅' if status.get('has_gemini_key') else 'sin llave'})",
    ]
    return "\n".join(lines)


def ui_apply_s3(bucket, base_prefix, team_folder):
    return apply_route(bucket, base_prefix, team_folder)


def ui_list_teams(bucket, base_prefix):
    bucket = (bucket or '').strip() or os.getenv("S3_BUCKET")
    base_prefix = norm_prefix(base_prefix or os.getenv("S3_PREFIX") or "")
    teams = s3_list_immediate_folders(bucket, base_prefix)
    return "\n".join(teams) if teams else "(sin subcarpetas)"


def ui_upload(files):
    saved = []
    DOCS_DIR.mkdir(parents=True, exist_ok=True)
    # Gradio's gr.File(type="filepath", file_count="multiple") provides a list of file paths
    for file_path in files or []:
        if not isinstance(file_path, str):
            continue # Skip if not a string path
        try:
            name = Path(file_path).name
            destination_path = DOCS_DIR / name
            # Explicitly open and read the source file, then write to the destination
            with open(file_path, 'rb') as f_in, open(destination_path, 'wb') as f_out:
                f_out.write(f_in.read())
            saved.append(name)
        except Exception as e:
            print(f"Error saving file {file_path}: {e}") # Log error for debugging
    if not saved:
        return "No se cargaron archivos."
    return f"Guardados local: {saved}"


def ui_sync_local_to_s3():
    return s3_upload_local_docs(STATE["bucket"], STATE["base_prefix"], STATE["team_folder"])


def ui_sync_s3_to_local():
    return s3_download_docs(STATE["bucket"], STATE["base_prefix"], STATE["team_folder"])


def ui_build_index(chunk_size, overlap):
    paths = list(Path(DOCS_DIR).glob("*"))
    if not paths:
        return "Sube o sincroniza documentos primero."
    n_chunks, n_docs = build_index_from_local(paths, int(chunk_size), int(overlap))
    return f"✅ Índice local: {n_docs} docs → {n_chunks} chunks."


def ui_rebuild_from_s3(chunk_size, overlap):
    return s3_rebuild_from_s3(STATE["bucket"], STATE["base_prefix"], STATE["team_folder"], int(chunk_size), int(overlap))


def ui_save_index():
    return s3_upload_index(STATE["bucket"], STATE["base_prefix"], STATE["team_folder"])


def ui_load_index():
    return s3_download_index(STATE["bucket"], STATE["base_prefix"], STATE["team_folder"])


def ui_ask(question, top_k, temp, max_tokens, history=None):
    question = (question or "").strip()
    history = list(history or [])
    if not question:
        return "Escribe una pregunta.", "", history
    if not load_index_local():
        return "Primero carga/crea un índice.", "", history
    history.append({"role": "user", "content": question})
    trimmed_history = history[:-1]
    answer, sources = rag_answer(
        question,
        top_k=int(top_k),
        temperature=float(temp),
        max_tokens=int(max_tokens),
        with_sources=False,
        chat_history=trimmed_history,
    )
    clean_answer = (answer or "").strip() or "No encuentro esa información en mis archivos."
    history.append({"role": "assistant", "content": clean_answer})
    if len(history) > 12:
        history = history[-12:]
    return clean_answer, "\n".join(sorted(set(sources))), history


def ui_llm_status():
    return format_llm_status(LLM.status())


def ui_llm_config(provider, openai_model, gemini_model, temperature, max_tokens):
    status = LLM.configure(
        provider=provider,
        openai_model=openai_model,
        gemini_model=gemini_model,
        temperature=float(temperature),
        max_tokens=int(max_tokens),
        persist=True,
    )
    return "✅ Configuración guardada.", format_llm_status(status)


def ui_available_sources():
    items = available_sources()
    if not items:
        return "No hay documentos indexados todavía. Usa las pestañas de Docs y Índice para cargarlos y construir el índice."
    return "\n".join(f"- {name}" for name in items)


def ui_view_chat_history(chat_id):
    return ui_view_chat(chat_id)


def ui_close_chat_from_ui(chat_id):
    return ui_close_chat(chat_id)


def ui_release_hold_from_ui(chat_id):
    return ui_release_hold(chat_id)


def ui_bot_status_box():
    return ui_bot_status()


with gr.Blocks(title="Chatbot RAG + S3 + Telegram (Admin & equipos)", theme=gr.themes.Soft()) as demo:
    gr.Markdown("### 🧠 Flujo: Docs → Índice → Preguntas → Telegram (Admin)")

    with gr.Tab("0) S3 (Equipo)"):
        with gr.Row():
            bucket = gr.Textbox(value=os.getenv("S3_BUCKET"), label="S3_BUCKET")
            base_prefix = gr.Textbox(value=os.getenv("S3_PREFIX"), label="Prefijo base (p.ej. IA-Innovador/)")
            team_folder = gr.Textbox(label="Carpeta del equipo (p.ej. Daniel)")
        s3_state = gr.Textbox(label="Estado de la ruta activa", lines=3)
        teams_out = gr.Textbox(label="Carpetas encontradas", lines=6)
        with gr.Row():
            btn_apply = gr.Button("Aplicar ruta")
            btn_list = gr.Button("Listar carpetas existentes")
        btn_apply.click(ui_apply_s3, inputs=[bucket, base_prefix, team_folder], outputs=s3_state)
        btn_list.click(ui_list_teams, inputs=[bucket, base_prefix], outputs=teams_out)

    with gr.Tab("1) Docs"):
        files = gr.File(label="Sube PDF/TXT/CSV", file_count="multiple", type="filepath")
        out_docs = gr.Textbox(label="Estado", lines=3)
        with gr.Row():
            btn_upload = gr.Button("Subir a local")
            btn_s3_to_local = gr.Button("S3 → Local")
            btn_local_to_s3 = gr.Button("Local → S3")
        btn_upload.click(ui_upload, inputs=files, outputs=out_docs)
        btn_s3_to_local.click(ui_sync_s3_to_local, outputs=out_docs)
        btn_local_to_s3.click(ui_sync_local_to_s3, outputs=out_docs)
        with gr.Accordion("Prompt base sugerido para generar el JSONL", open=False):
            gr.Markdown(
                """```text
Actúa como un Generador de Conocimiento Conversacional (CEREBRO) para un chatbot RAG.
Idioma: {español_neutro | otro}
Dominio/tema principal: {describe el tema o uso del bot}
Nombre del bot (solo para tono): {NomadaAI | MiBot | ...}

Tarea:
1) Lee TODOS los archivos adjuntos (PDF, Word, CSV/Excel, imágenes con tablas/texto) y extrae contenido.
2) Construye un corpus conversacional en formato JSONL (UNA línea por objeto) optimizado para recuperación semántica:
   - Entradas de varios niveles: documento, sección/página, párrafo/ítem, fila de tabla/registro.
   - Cada entrada debe poder responder mensajes libres (no FAQ rígido).

Esquema de CADA línea JSONL:
{
  "id": "slug-unico",
  "granularity": "doc|section|paragraph|record|definition|howto",
  "topic": "tema breve (máx. 6 palabras)",
  "user_variants": [
    "3–7 formas naturales de pedir esto en lenguaje libre"
  ],
  "assistant_reply": "respuesta fluida (80–160 palabras) basada SOLO en el contenido adjunto. Sin alucinar.",
  "facts": ["3–8 hechos puntuales y verificables (con unidades/fechas)"],
  "keywords": ["5–12 términos y sinónimos relevantes"],
  "entities": ["personas|lugares|organizaciones|variables|columnas si aplica"],
  "source": {
    "file": "nombre_del_archivo.ext",
    "locator": "pág. X | sección Y | hoja:Z fila:W | figura:N",
    "quote": "cita corta opcional (<=25 palabras) que respalda el dato"
  },
  "units": "estándar/unificado si había mezclas (%, años, muertes/1000, km, etc.)",
  "valid_for": "fechas o versión si aplica",
  "confidence": 0.0-1.0
}

Reglas de extracción y redacción:
- No inventes datos; si un dato no está en las fuentes, indica en assistant_reply qué falta y pide con amabilidad lo necesario.
- Normaliza números y unidades (2 decimales cuando corresponda). Explica siglas la primera vez.
- Para tablas/CSV: crea entradas "record" por fila importante con user_variants que un usuario real escribiría ("muéstrame la expectativa de vida de {país} en {año}", etc.).
- Para PDFs/Word: crea entradas "doc" (resumen ejecutivo), "section" y "paragraph" con títulos claros.
- Divide en fragmentos de 80–220 palabras máx. (ó 600–1000 caracteres) para favorecer embeddings. Evita párrafos gigantes.
- Incluye **fuente y locator SIEMPRE** (archivo y página/hoja/fila).
- Deduplica contenido parecido; si son equivalentes, conserva la versión más clara y marca las demás con "confidence" menor.
- Mantén tono didáctico, empático y directo; no menciones “soy una IA”.
- Si detectas PII sensible, anonimiza (ej.: “Juan P.”) a menos que el archivo indique uso explícito.

Cobertura mínima (si aplica al dominio):
1) Presentación/alcance del bot (qué hace y no hace)
2) Cómo consultar/qué aportar (fechas, archivo, contexto)
3) Resúmenes ejecutivos por documento
4) Definiciones y glosario
5) Procedimientos/How-to paso a paso
6) Datos tabulares claves como “record”
7) Políticas/limitaciones/privacidad
8) Fallbacks de desambiguación y escalamiento (“asesor”, “ayuda”)

Validación (al final, agrega 1 ÚLTIMA línea especial con id="__validation__"):
{
  "id": "__validation__",
  "stats": {
    "files_leidos": N,
    "entradas_total": N,
    "por_granularity": {"doc":n,"section":n,"paragraph":n,"record":n,"definition":n,"howto":n},
    "entradas_sin_source": n
  },
  "issues": ["lista breve de posibles problemas (páginas sin texto, columnas ambiguas, unidades mezcladas, etc.)"],
  "sugerencias": ["mejoras para próxima iteración (más metadatos, aclarar siglas, etc.)"]
}

Salida:
- ENTREGAR EXCLUSIVAMENTE líneas JSONL válidas (sin comentarios ni encabezados).
- Genera entre {80 y 200} entradas totales según el tamaño de los archivos.
```"""
            )

    with gr.Tab("2) Índice"):
        out_index = gr.Textbox(label="Estado", lines=4)
        with gr.Row():
            chunk_size = gr.Number(value=800, precision=0, label="chunk_size")
            overlap = gr.Number(value=150, precision=0, label="overlap")
        with gr.Row():
            btn_build = gr.Button("Construir índice desde local")
            btn_rebuild = gr.Button("Reconstruir índice desde S3")
        btn_build.click(ui_build_index, inputs=[chunk_size, overlap], outputs=out_index)
        btn_rebuild.click(ui_rebuild_from_s3, inputs=[chunk_size, overlap], outputs=out_index)

    with gr.Tab("3) Persistencia"):
        out_persist = gr.Textbox(label="Estado", lines=4)
        with gr.Row():
            btn_save = gr.Button("⬆️ Guardar índice en S3")
            btn_load = gr.Button("⬇️ Cargar índice desde S3")
        btn_save.click(ui_save_index, outputs=out_persist)
        btn_load.click(ui_load_index, outputs=out_persist)
        gr.Markdown("Se intenta en `.../<equipo>/` y en `.../<equipo>/index/`.")

    with gr.Tab("4) LLM y parámetros"):
        llm_status_box = gr.Textbox(value=ui_llm_status(), label="Estado actual", lines=7)
        with gr.Row():
            provider = gr.Dropdown(
                choices=["openai", "gemini", "fragmento"],
                value=LLM.status().get("configured_provider", "openai"),
                label="Proveedor",
            )
            temperature = gr.Slider(value=LLM.temperature, minimum=0.0, maximum=1.5, step=0.05, label="Temperatura")
            max_tokens = gr.Slider(value=LLM.max_tokens, minimum=50, maximum=1200, step=50, label="Máx. tokens")
        with gr.Row():
            openai_model = gr.Textbox(value=LLM.openai_model, label="Modelo OpenAI (si aplica)")
            gemini_model = gr.Textbox(value=LLM.gemini_model, label="Modelo Gemini (si aplica)")
        llm_feedback = gr.Textbox(label="Resultado", lines=2)
        btn_save_llm = gr.Button("Guardar configuración LLM")
        btn_refresh_llm = gr.Button("Actualizar estado")
        btn_save_llm.click(
            ui_llm_config,
            inputs=[provider, openai_model, gemini_model, temperature, max_tokens],
            outputs=[llm_feedback, llm_status_box],
        )
        btn_refresh_llm.click(lambda: ("", ui_llm_status()), outputs=[llm_feedback, llm_status_box])

    with gr.Tab("5) Preguntar"):
        gr.Markdown("Realiza pruebas con el mismo motor que usa Telegram.")
        gr.Markdown("**Guía rápida:** `top_k` controla cuántos fragmentos del índice se mandan al modelo y `Máx. tokens` limita la longitud de la respuesta generada.")
        question = gr.Textbox(label="Pregunta")
        with gr.Row():
            top_k = gr.Slider(value=4, minimum=1, maximum=10, step=1, label="top_k")
            temp_slider = gr.Slider(value=LLM.temperature, minimum=0.0, maximum=1.2, step=0.05, label="Temperatura (solo esta consulta)")
            max_tokens_slider = gr.Slider(value=LLM.max_tokens, minimum=100, maximum=1000, step=50, label="Máx. tokens (solo esta consulta)")
        answer_box = gr.Markdown("Respuesta")
        sources_box = gr.Textbox(label="Fuentes", lines=4, interactive=False)
        history_state = gr.State([])
        ask_button = gr.Button("Consultar")
        ask_button.click(
            ui_ask,
            inputs=[question, top_k, temp_slider, max_tokens_slider, history_state],
            outputs=[answer_box, sources_box, history_state],
        )
        btn_reset_chat = gr.Button("Nueva conversación")
        btn_reset_chat.click(lambda: ("", "", []), outputs=[answer_box, sources_box, history_state])

        with gr.Row():
            btn_list_sources = gr.Button("Ver archivos indexados")
            catalog_box = gr.Textbox(label="Archivos disponibles", lines=4, interactive=False)
        btn_list_sources.click(ui_available_sources, outputs=catalog_box)

    with gr.Tab("6) Telegram (Admin)"):
        gr.Markdown(
            "Cuando un usuario escribe 'asesor' o 'ayuda', el bot pausa las respuestas automáticas, avisa al admin y espera intervención humana."
        )
        with gr.Row():
            token_box = gr.Textbox(label="BOT TOKEN", type="password", value=os.getenv("TELEGRAM_BOT_TOKEN", ""))
            token_result = gr.Textbox(label="Bot", interactive=False)
            btn_set_token = gr.Button("Guardar token / Ver bot")
            btn_use_saved = gr.Button("Usar token existente")
        btn_set_token.click(ui_set_token, inputs=token_box, outputs=token_result)
        btn_use_saved.click(lambda: ui_set_token(""), outputs=token_result)

        with gr.Row():
            auto_checkbox = gr.Checkbox(label="Auto-responder (bot responde solo)", value=True)
            auto_state = gr.Textbox(label="Estado", interactive=False)
            auto_checkbox.change(ui_auto_toggle, inputs=auto_checkbox, outputs=auto_state)
        with gr.Row():
            poll_checkbox = gr.Checkbox(label="Polling (escuchar mensajes)", value=False)
            poll_state = gr.Textbox(label="Estado", interactive=False)
            poll_checkbox.change(ui_poll_toggle, inputs=poll_checkbox, outputs=poll_state)
            btn_poll_once = gr.Button("Leer ahora")
            poll_once_state = gr.Textbox(label="Resultado", interactive=False)
            btn_poll_once.click(ui_poll_once, outputs=poll_once_state)

        with gr.Row():
            btn_list_chats = gr.Button("Listar chats conocidos")
            chats_box = gr.Textbox(label="Chats", lines=8)
            btn_alerts = gr.Button("Chats pidiendo ayuda")
            alerts_box = gr.Textbox(label="Alertas", lines=6)
        btn_list_chats.click(ui_list_chats, outputs=chats_box)
        btn_alerts.click(ui_list_alerts, outputs=alerts_box)
        with gr.Row():
            status_button = gr.Button("Estado del bot")
            status_box = gr.Textbox(label="Status", lines=7)
        status_button.click(lambda: (ui_bot_status_box()), outputs=status_box)

        with gr.Row():
            chat_id_box = gr.Textbox(label="chat_id seleccionado")
            history_box = gr.Textbox(label="Historial reciente", lines=10)
        with gr.Row():
            btn_view = gr.Button("Ver historial")
            btn_release = gr.Button("Reanudar bot en chat")
            btn_close = gr.Button("Cerrar chat")
        btn_view.click(ui_view_chat_history, inputs=chat_id_box, outputs=history_box)
        btn_release.click(ui_release_hold_from_ui, inputs=chat_id_box, outputs=history_box)
        btn_close.click(ui_close_chat_from_ui, inputs=chat_id_box, outputs=history_box)

        message_box = gr.Textbox(label="Mensaje del admin", value="Hola, soy soporte. ¿En qué te ayudo?")
        send_result = gr.Textbox(label="Resultado", interactive=False)
        send_button = gr.Button("Enviar como admin")
        send_button.click(ui_admin_send, inputs=[chat_id_box, message_box], outputs=send_result)


demo.launch(share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://cc29ca6fae01c0fe4c.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/gradio/queueing.py", line 745, in process_events
    response = await route_utils.call_process_api(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/gradio/route_utils.py", line 353, in call_process_api
    output = await app.get_blocks().process_api(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/gradio/blocks.py", line 2116, in process_api
    result = await self.call_function(
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/gradio/blocks.py", line 1623, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]