<a href="https://colab.research.google.com/github/MustaphaBZ/Proyecto-IA-AgenteRag/blob/main/Proyecto_Final_AgenteRAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PROYECTO AGENTE RAG

## Conectar con Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Pip

In [None]:
# 1. Instalar dependencias necesarias
!pip install pinecone
!pip install sentence-transformers
!pip install transformers
!pip install tqdm
!pip install bitsandbytes
!pip install langchain-community
!pip install torch
!pip install pdfplumber
!pip install langgraph==0.3.1
!pip install gradio

##1: Importaciones

In [None]:
# === IMPORTS ===
from typing import Optional, List
from typing_extensions import TypedDict
from pydantic import BaseModel
from langgraph.graph import StateGraph
from langgraph.prebuilt import ToolNode
from langchain.docstore.document import Document as LangchainDocument
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.tools import StructuredTool
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline
from sentence_transformers import SentenceTransformer
from pinecone import Pinecone
import torch
import os
import unicodedata
import requests
import gradio as gr


##2: Tipos y clases

In [None]:
# === TIPOS ===
class WorkflowState(TypedDict, total=False):
    directorio: str
    carpeta: str
    archivos: Optional[str]
    archivos_txt: Optional[str]
    conversion: Optional[str]
    indexacion: Optional[str]
    documentos: Optional[List[LangchainDocument]]
    pregunta: Optional[str]
    respuesta: Optional[str]
    mensaje_final: Optional[str]

class DirectorioInput(BaseModel):
    directorio: str

class DirectorioCarpetaInput(BaseModel):
    directorio: str
    carpeta: str

class CarpetaInput(BaseModel):
    carpeta: str

class PreguntaInput(BaseModel):
    pregunta: str

class PreguntaConDocsInput(BaseModel):
    pregunta: str
    documentos: List[LangchainDocument]


##3: Configuración de modelos

In [None]:
# === MODELOS Y CONFIGURACIÓN ===
model_name = "NousResearch/Nous-Hermes-2-Mistral-7B-DPO"
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_use_double_quant=True,
                                bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16)
llm_model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=bnb_config)
tokenizer = AutoTokenizer.from_pretrained(model_name)
llm_pipeline = pipeline(model=llm_model, tokenizer=tokenizer, task="text-generation",
                        do_sample=True, temperature=0.2, repetition_penalty=1.1,
                        return_full_text=False, max_new_tokens=2048)

pinecone_api_key = "pcsk_6mZWLa_TF8H4JrD3FpzxXxBW91rkzYxKB66sv7YUFJ89nFdFDbV6gEVACuVNzSLdw2NJKW"
index_name = "info-knowledge"
pc = Pinecone(api_key=pinecone_api_key)

if not pc.has_index(index_name):
    pc.create_index_for_model(
        name=index_name,
        cloud="aws",
        region="us-east-1",
        embed={"model": "llama-text-embed-v2", "field_map": {"text": "chunk_text"}, "dimension": 384}
    )

index = pc.Index(index_name)


##4: Herramientas (tools)

In [None]:
# === TOOLS ===
def listar_pdfs_tool(directorio: str) -> dict:
    """
    Lista todos los archivos PDF en un directorio dado y devuelve sus nombres relativos.
    """
    archivos = []
    for root, _, files in os.walk(directorio):
        for file in files:
            if file.lower().endswith(".pdf"):
                archivos.append(os.path.relpath(os.path.join(root, file), directorio))
    return {"archivos": ", ".join(archivos)}

def convertir_pdfs_tool(directorio: str, carpeta: str) -> dict:
    """
    Convierte todos los archivos PDF en un directorio a archivos TXT y los guarda en una carpeta de salida.
    """
    import pdfplumber

    def extract_text_from_pdf(pdf_path):
        text = ""
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                extracted = page.extract_text()
                if extracted:
                    text += extracted + "\n"
        return text

    def process_large_pdf(pdf_path, input_folder, output_folder):
        relative_path = os.path.relpath(pdf_path, input_folder)
        txt_output_path = os.path.join(output_folder, os.path.splitext(relative_path)[0] + ".txt")
        os.makedirs(os.path.dirname(txt_output_path), exist_ok=True)
        text = extract_text_from_pdf(pdf_path)
        with open(txt_output_path, "w", encoding="utf-8") as f:
            f.write(text)
        return txt_output_path

    archivos_convertidos = []
    for root, _, files in os.walk(directorio):
        for file in files:
            if file.lower().endswith(".pdf"):
                pdf_path = os.path.join(root, file)
                txt_path = process_large_pdf(pdf_path, directorio, carpeta)
                archivos_convertidos.append(txt_path)

    return {"conversion": "TXT generados: " + ', '.join(archivos_convertidos)} if archivos_convertidos else {"conversion": "No se generaron archivos."}

##5: Indexación y recuperación

In [None]:
def indexar_tool(carpeta: str) -> dict:
    """
    Indexa todos los archivos TXT en una carpeta usando embeddings y los sube a Pinecone.
    """
    def extract_text_from_txt(txt_path):
        with open(txt_path, "r", encoding="utf-8") as f:
            return f.read()

    RAW_KB = []
    for root, _, files in os.walk(carpeta):
        for file in files:
            if file.lower().endswith(".txt"):
                path = os.path.join(root, file)
                content = extract_text_from_txt(path)
                RAW_KB.append(LangchainDocument(page_content=content, metadata={"source": path}))

    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
    split_docs = []
    for doc in RAW_KB:
        split_docs.extend(splitter.split_documents([doc]))

    if not pc.has_index(index_name):
        pc.create_index_for_model(
            name=index_name,
            cloud="aws",
            region="us-east-1",
            embed={"model": "llama-text-embed-v2", "field_map": {"text": "chunk_text"}, "dimension": 384}
        )

    idx = pc.Index(index_name)
    for doc in split_docs:
        source = doc.metadata["source"]
        safe_id = unicodedata.normalize("NFKD", source).encode("ascii", "ignore").decode("ascii").replace(os.sep, "_")
        embedding = embedding_model.encode(doc.page_content).tolist()
        metadata = doc.metadata.copy()
        metadata["chunk_text"] = doc.page_content
        idx.upsert([(safe_id, embedding, metadata)])

    return {"indexacion": f"✅ {len(RAW_KB)} archivos, {len(split_docs)} chunks indexados en Pinecone."}

def recuperar_contexto_tool(pregunta: str) -> dict:
    """
    Recupera los documentos más relevantes para una pregunta usando Pinecone.
    """
    embedding = embedding_model.encode(pregunta).tolist()
    result = index.query(vector=embedding, top_k=5, include_metadata=True)
    documentos = []
    for match in result["matches"]:
        metadata = match.get("metadata", {})
        texto = metadata.get("chunk_text", "") or metadata.get("text", "")
        if texto.strip():
            documentos.append(LangchainDocument(page_content=texto, metadata=metadata))
    return {"documentos": documentos, "pregunta": pregunta}


##6: Generación de respuestas y subida a Moodle

In [None]:
def generar_respuesta_tool(pregunta: str, documentos: List[LangchainDocument]) -> dict:
    """
    Genera 5 preguntas tipo test en formato AIKEN usando el contexto proporcionado.
    """
    contexto = "\n".join([doc.page_content for doc in documentos[:3]])
    prompt = f"""Eres un experto en redacción de exámenes tipo test. Usa SOLO el contexto siguiente para generar 5 preguntas tipo test en formato AIKEN...

Contexto:
{contexto}

Formato AIKEN sigue estrictamente el siguiente formato:
Pregunta
A) Opcion A
B) Opcion B
C) Opcion C
D) Opcion D
ANSWER: [Letra correcta]

Genera las 5 preguntas a continuación:
"""
    output = llm_pipeline(prompt)[0]["generated_text"].strip()
    with open("test_aiken.txt", "w", encoding="utf-8") as f:
        f.write(output)
    return {"respuesta": output}

def subir_aiken_a_moodle(nombre_archivo_txt, moodle_token, moodle_url_base):
    """
    Sube un archivo AIKEN a Moodle usando su API.
    """
    with open(nombre_archivo_txt, 'rb') as archivo:
        response = requests.post(
            f"{moodle_url_base}/webservice/upload.php",
            params={"token": moodle_token},
            files={"file": (nombre_archivo_txt, archivo, "text/plain")}
        )

    if response.status_code != 200:
        return {"error": f"Error HTTP {response.status_code}", "detalle": response.text}

    try:
        data = response.json()
    except Exception as e:
        return {"error": "No se pudo decodificar JSON", "detalle": str(e), "respuesta_cruda": response.text}

    if not isinstance(data, list) or len(data) == 0 or "itemid" not in data[0]:
        return {"error": "Respuesta inesperada al subir archivo", "respuesta_completa": data}

    return {"estado": " Archivo subido correctamente al draft de Moodle.", "itemid": data[0]["itemid"]}


def mover_a_privados_moodle(itemid_draft, moodle_token, moodle_url_base):
    """
    Mueve un archivo subido como draft al área de archivos privados del usuario en Moodle.
    """
    url = f"{moodle_url_base}/webservice/rest/server.php"
    params = {
        "wstoken": moodle_token,
        "wsfunction": "core_user_add_user_private_files",
        "moodlewsrestformat": "json",
        "draftid": itemid_draft
    }

    response = requests.post(url, params=params)

    try:
        try:
            if response.headers.get("Content-Type", "").startswith("application/json"):
                data = response.json()
            else:
                return {
                    "error": "Respuesta no es JSON",
                    "status_code": response.status_code,
                    "content_type": response.headers.get("Content-Type", ""),
                    "respuesta_cruda": response.text
                }
        except Exception as e:
            return {
                "error": "Excepción al decodificar JSON",
                "detalle": str(e),
                "respuesta_cruda": response.text
            }

    except Exception as e:
        return {
            "error": "No se pudo decodificar JSON al mover a privados",
            "detalle": str(e),
            "respuesta_cruda": response.text
        }

    if isinstance(data, dict) and "exception" in data:
        return {"error": data["errorcode"], "detalle": data["message"]}

    return {"estado": " Archivo movido correctamente a archivos privados."}

##7: Grafo y aplicación

In [None]:
# === GRAFO ===
listar_pdfs_structured = StructuredTool.from_function(listar_pdfs_tool, args_schema=DirectorioInput)
convertir_pdfs_structured = StructuredTool.from_function(convertir_pdfs_tool, args_schema=DirectorioCarpetaInput)
indexar_structured = StructuredTool.from_function(indexar_tool, args_schema=CarpetaInput)
recuperar_structured = StructuredTool.from_function(recuperar_contexto_tool, args_schema=PreguntaInput)
generar_respuesta_structured = StructuredTool.from_function(generar_respuesta_tool, args_schema=PreguntaConDocsInput)

grafo = StateGraph(WorkflowState)
grafo.add_node("ListarPDF", ToolNode.bind(listar_pdfs_structured))
grafo.add_node("ConvertirPDF", ToolNode.bind(convertir_pdfs_structured))
grafo.add_node("Indexar", ToolNode.bind(indexar_structured))
grafo.add_node("Recuperar", ToolNode.bind(recuperar_structured))
grafo.add_node("Generar", ToolNode.bind(generar_respuesta_structured))
grafo.set_entry_point("ListarPDF")
grafo.add_edge("ListarPDF", "ConvertirPDF")
grafo.add_edge("ConvertirPDF", "Indexar")
grafo.add_edge("Indexar", "Recuperar")
grafo.add_edge("Recuperar", "Generar")
grafo.set_finish_point("Generar")

app = grafo.compile()


## 8: Funciones principales

In [None]:
# === FUNCIONES PRINCIPALES ===
def generar_aiken(directorio: str, carpeta: str, pregunta: str, nombre_archivo: str):
    """Genera preguntas AIKEN y las guarda en el archivo especificado."""
    estado = {"directorio": directorio, "carpeta": carpeta, "pregunta": pregunta}
    result = app.invoke(estado)
    with open(nombre_archivo, "w", encoding="utf-8") as f:
        f.write(result.get("respuesta", ""))
    return result.get("respuesta", "Error al generar")

def subir_a_moodle(nombre_archivo: str, moodle_url: str, moodle_token: str):
    """Sube el archivo AIKEN especificado a Moodle."""
    try:
        subir_result = subir_aiken_a_moodle(nombre_archivo, moodle_token, moodle_url)
        if "itemid" not in subir_result:
            return f" Error al subir: {subir_result}"
        mover_result = mover_a_privados_moodle(subir_result["itemid"], moodle_token, moodle_url)
        return f" Subido a Moodle!\nDetalles: {mover_result}"
    except Exception as e:
        return f" Error crítico: {str(e)}"

def ejecutar_automatico(directorio: str, carpeta: str, pregunta: str, nombre_archivo: str,
                        moodle_url: str, moodle_token: str) -> tuple[str, str]:
    try:
        preguntas = generar_aiken(directorio, carpeta, pregunta, nombre_archivo)
        if not preguntas or "Error" in preguntas:
            return preguntas or " No se generaron preguntas", ""
        estado_moodle = subir_a_moodle(nombre_archivo, moodle_url, moodle_token)
        return preguntas, estado_moodle
    except Exception as e:
        return f" Error inesperado: {str(e)}", ""


## 9: Interfaz Gradio

In [None]:
# === INTERFAZ GRADIO ===
with gr.Blocks() as demo:
    gr.Markdown("# 🤖 Agente RAG")

    with gr.Row():
        directorio = gr.Textbox(label="* Carpeta con PDFs")
        carpeta_txt = gr.Textbox(label="* Carpeta de salida TXT")
        pregunta = gr.Textbox(label="* Tema para generar preguntas")
        nombre_archivo = gr.Textbox(label="* Nombre del archivo TXT")

    gr.Markdown("### Generar Preguntas")
    btn_generar = gr.Button(" Generar Preguntas")
    salida_preguntas = gr.Textbox(label=" Preguntas Generadas", lines=10)

    gr.Markdown("### + Subir a Moodle (Solo profesores)")
    with gr.Row():
        moodle_url = gr.Textbox(label=" URL de Moodle")
        moodle_token = gr.Textbox(label=" Token API")

    with gr.Row():
        btn_subir = gr.Button(" Subir Archivo a Moodle")
        btn_automatico = gr.Button(" Automático (Generar + Subir)", variant="primary")

    salida_moodle = gr.Textbox(label=" Estado de Moodle", lines=3)

    btn_generar.click(fn=generar_aiken,
                     inputs=[directorio, carpeta_txt, pregunta, nombre_archivo],
                     outputs=salida_preguntas)

    btn_subir.click(fn=subir_a_moodle,
                   inputs=[nombre_archivo, moodle_url, moodle_token],
                   outputs=salida_moodle)

    btn_automatico.click(fn=ejecutar_automatico,
                         inputs=[directorio, carpeta_txt, pregunta, nombre_archivo, moodle_url, moodle_token],
                         outputs=[salida_preguntas, salida_moodle])

demo.launch(share=True, debug=True)
