In [15]:
# Instalación de dependencias necesarias
!pip install -q langchain langchain-community sentence-transformers faiss-cpu PyPDF2 transformers ipywidgets huggingface_hub

In [16]:
# Importar bibliotecas
import os
import zipfile
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from transformers import pipeline
from huggingface_hub import login
from getpass import getpass

In [19]:
# Pedir el token de forma segura (no se mostrará mientras escribes)
print("Por favor, introduce tu token de Hugging Face:")
print("(Si no tienes un token, puedes obtenerlo en: https://huggingface.co/settings/tokens)")
print("(Si no deseas utilizar un token específico, simplemente presiona Enter para continuar)")
HF_TOKEN = getpass()

if HF_TOKEN.strip():
    # Configurar token de Hugging Face
    login(token=HF_TOKEN)  # Esto te autenticará con Hugging Face
    os.environ["HUGGINGFACE_TOKEN"] = HF_TOKEN
    print("Token de Hugging Face configurado correctamente.")
else:
    print("Continuando sin token específico. Solo se podrá acceder a modelos públicos.")

Por favor, introduce tu token de Hugging Face:
(Si no tienes un token, puedes obtenerlo en: https://huggingface.co/settings/tokens)
(Si no deseas utilizar un token específico, simplemente presiona Enter para continuar)
··········
Token de Hugging Face configurado correctamente.


In [20]:
# Configurar ruta del archivo ZIP
zip_path = "/content/docs.zip"
temp_dir = "/content/temp_docs"

print("Verificando rutas...")
# Crear directorio temporal si no existe
if not os.path.exists(temp_dir):
    os.makedirs(temp_dir)
    print(f"Directorio temporal creado: {temp_dir}")


Verificando rutas...


In [21]:
# Verificar si el archivo ZIP existe
if not os.path.exists(zip_path):
    print(f"ADVERTENCIA: No se encontró el archivo {zip_path}")
    print("Por favor, asegúrate de subir el archivo docs.zip a /content/")
else:
    # Extraer archivos del ZIP
    print(f"Extrayendo archivos de {zip_path}...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(temp_dir)
    print(f"Archivos extraídos en {temp_dir}")

# Función para cargar documentos
def load_documents(directory):
    documents = []

    if not os.path.exists(directory):
        print(f"El directorio {directory} no existe")
        return documents

    for root, _, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)

            # Cargar según tipo de archivo
            if file.lower().endswith('.pdf'):
                try:
                    loader = PyPDFLoader(file_path)
                    documents.extend(loader.load())
                    print(f"Cargado PDF: {file}")
                except Exception as e:
                    print(f"Error al cargar {file}: {str(e)}")

            elif file.lower().endswith(('.txt', '.md', '.csv')):
                try:
                    loader = TextLoader(file_path, encoding='utf-8')
                    documents.extend(loader.load())
                    print(f"Cargado texto: {file}")
                except UnicodeDecodeError:
                    try:
                        # Intentar con otra codificación si utf-8 falla
                        loader = TextLoader(file_path, encoding='latin-1')
                        documents.extend(loader.load())
                        print(f"Cargado texto con latin-1: {file}")
                    except Exception as e:
                        print(f"Error al cargar {file}: {str(e)}")

    return documents


Extrayendo archivos de /content/docs.zip...
Archivos extraídos en /content/temp_docs


In [22]:
# Cargar documentos
print("\nCargando documentos...")
docs = load_documents(temp_dir)
print(f"Se cargaron {len(docs)} documentos")

if len(docs) == 0:
    print("No se encontraron documentos para procesar. Verifica el contenido del archivo ZIP.")
else:
    # Dividir documentos en chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
        length_function=len
    )
    chunks = text_splitter.split_documents(docs)
    print(f"Documentos divididos en {len(chunks)} chunks")

    # Usar embeddings en español
    print("\nGenerando embeddings (esto puede tardar un poco)...")
    try:
        embeddings = HuggingFaceEmbeddings(
            model_name="hiiamsid/sentence_similarity_spanish_es"
        )

        # Crear índice vectorial
        print("Creando índice vectorial...")
        vectorstore = FAISS.from_documents(chunks, embeddings)
        print("Índice vectorial creado exitosamente")

        # Configurar modelo generativo (pequeño para Colab)
        print("\nConfigurando el modelo generativo...")
        try:
            # Modelo pequeño en español (disponible públicamente)
            pipe = pipeline("text-generation", model="flax-community/spanish-gpt2")
            print("Modelo generativo cargado: flax-community/spanish-gpt2")
        except Exception as e:
            print(f"Error al cargar el modelo generativo: {str(e)}")
            print("Intentando cargar un modelo alternativo...")
            try:
                pipe = pipeline("text-generation", model="DeepESP/gpt2-spanish")
                print("Modelo alternativo cargado: DeepESP/gpt2-spanish")
            except Exception as e:
                print(f"Error al cargar el modelo alternativo: {str(e)}")
                print("Utilizando modelos más básicos...")
                try:
                    # Si tienes acceso a IIC/hf-lluvia-prueba con tu token
                    pipe = pipeline("text-generation", model="IIC/hf-lluvia-prueba", token=HF_TOKEN)
                    print("Modelo privado cargado exitosamente con tu token")
                except:
                    pipe = pipeline("text-generation", model="distilgpt2")
                    print("Modelo básico cargado (nota: este modelo es en inglés)")


        def generate_text(prompt, max_length=150):
            try:
                result = pipe(prompt, max_length=max_length, num_return_sequences=1)
                return result[0]['generated_text']
            except Exception as e:
                print(f"Error en la generación de texto: {str(e)}")
                return f"Error en la generación: {str(e)}"

        # Función para buscar documentos relevantes y generar respuestas
        def consultar_rag(pregunta, k=3):
            # Recuperar documentos relevantes
            docs_similares = vectorstore.similarity_search(pregunta, k=k)

            # Extraer contenido de documentos relevantes
            contexto = "\n".join([doc.page_content for doc in docs_similares])

            # Generar prompt para el modelo
            prompt = f"""
            Contexto: {contexto}

            Pregunta: {pregunta}

            Respuesta:"""

            # Generar respuesta
            respuesta = generate_text(prompt, max_length=300)

            return {
                "documentos_relevantes": docs_similares,
                "respuesta": respuesta
            }

        # Interfaz simple
        try:
            from IPython.display import display, HTML
            import ipywidgets as widgets

            def mostrar_interfaz():
                titulo = widgets.HTML("<h2>Sistema RAG para Documentos en Español</h2>")
                entrada = widgets.Text(
                    value='',
                    placeholder='Escribe tu pregunta aquí',
                    description='Pregunta:',
                    disabled=False,
                    layout=widgets.Layout(width='80%')
                )
                boton = widgets.Button(description="Consultar")
                salida = widgets.Output()

                def on_button_clicked(b):
                    with salida:
                        salida.clear_output()
                        if entrada.value:
                            print("Buscando respuesta...")
                            resultado = consultar_rag(entrada.value)
                            print("\nRespuesta:")
                            print(resultado["respuesta"])
                            print("\nFuentes:")
                            for i, doc in enumerate(resultado["documentos_relevantes"]):
                                print(f"\nFuente {i+1}:")
                                print(doc.page_content[:200] + "...")

                boton.on_click(on_button_clicked)
                display(titulo, entrada, boton, salida)

            # Ejemplo de uso
            print("\n--- Sistema RAG listo para usar ---")
            print("Puedes hacer consultas directamente con consultar_rag('tu pregunta')")
            print("O usar la interfaz gráfica con mostrar_interfaz()")

            # Mostrar la interfaz automáticamente
            mostrar_interfaz()

        except Exception as e:
            print(f"Error al crear la interfaz: {str(e)}")
            print("Puedes hacer consultas directamente con consultar_rag('tu pregunta')")

    except Exception as e:
        print(f"Error al generar embeddings: {str(e)}")
        print("No se pudo crear el sistema RAG completo.")

print("\n--- Proceso completado ---")


Cargando documentos...
Cargado texto: parte3.txt
Cargado texto: parte2.txt
Cargado texto: parte1.txt
Se cargaron 3 documentos
Documentos divididos en 38 chunks

Generando embeddings (esto puede tardar un poco)...
Creando índice vectorial...
Índice vectorial creado exitosamente

Configurando el modelo generativo...
Error al cargar el modelo generativo: flax-community/spanish-gpt2 is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `huggingface-cli login` or by passing `token=<your_token>`
Intentando cargar un modelo alternativo...


config.json:   0%|          | 0.00/914 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/261M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/261M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/115 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/840k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/499k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/262 [00:00<?, ?B/s]

Device set to use cpu


Modelo alternativo cargado: DeepESP/gpt2-spanish

--- Sistema RAG listo para usar ---
Puedes hacer consultas directamente con consultar_rag('tu pregunta')
O usar la interfaz gráfica con mostrar_interfaz()


HTML(value='<h2>Sistema RAG para Documentos en Español</h2>')

Text(value='', description='Pregunta:', layout=Layout(width='80%'), placeholder='Escribe tu pregunta aquí')

Button(description='Consultar', style=ButtonStyle())

Output()


--- Proceso completado ---
