PIPELINE RAG - DATA INGESTION TO VECTOR DB PIPELINE

In [1]:
import os 
from langchain_community.document_loaders import PyPDFLoader,PyMuPDFLoader,TextLoader, CSVLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import JSONLoader
from langchain_community.document_loaders import Docx2txtLoader
from langchain_community.document_loaders.excel import UnstructuredExcelLoader
from pathlib import Path
from typing import List, Any

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#Funcion que lee y transforma data en estructura document langchain
def load_all_documents(data_dir: str) -> List[Any]:
    """
    Load all supported files from the data directory and convert to LangChain document structure.
    Supported: PDF, TXT, CSV, Excel, Word, JSON
    """
    # Use project root data folder
    data_path = Path(data_dir).resolve()
    print(f"[DEBUG] Data path: {data_path}")
    documents = []

    # PDF files
    pdf_files = list(data_path.glob('**/*.pdf'))
    print(f"[DEBUG] Found {len(pdf_files)} PDF files: {[str(f) for f in pdf_files]}")
    for pdf_file in pdf_files:
        print(f"[DEBUG] Loading PDF: {pdf_file}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            loaded = loader.load()
            print(f"[DEBUG] Loaded {len(loaded)} PDF docs from {pdf_file}")
            documents.extend(loaded)
        except Exception as e:
            print(f"[ERROR] Failed to load PDF {pdf_file}: {e}")

    # TXT files
    txt_files = list(data_path.glob('**/*.txt'))
    print(f"[DEBUG] Found {len(txt_files)} TXT files: {[str(f) for f in txt_files]}")
    for txt_file in txt_files:
        print(f"[DEBUG] Loading TXT: {txt_file}")
        try:
            loader = TextLoader(str(txt_file) , encoding="utf-8")
            loaded = loader.load()
            print(f"[DEBUG] Loaded {len(loaded)} TXT docs from {txt_file}")
            documents.extend(loaded)
        except Exception as e:
            print(f"[ERROR] Failed to load TXT {txt_file}: {e}")

    # CSV files
    csv_files = list(data_path.glob('**/*.csv'))
    print(f"[DEBUG] Found {len(csv_files)} CSV files: {[str(f) for f in csv_files]}")
    for csv_file in csv_files:
        print(f"[DEBUG] Loading CSV: {csv_file}")
        try:
            loader = CSVLoader(str(csv_file))
            loaded = loader.load()
            print(f"[DEBUG] Loaded {len(loaded)} CSV docs from {csv_file}")
            documents.extend(loaded)
        except Exception as e:
            print(f"[ERROR] Failed to load CSV {csv_file}: {e}")

    # Excel files
    xlsx_files = list(data_path.glob('**/*.xlsx'))
    print(f"[DEBUG] Found {len(xlsx_files)} Excel files: {[str(f) for f in xlsx_files]}")
    for xlsx_file in xlsx_files:
        print(f"[DEBUG] Loading Excel: {xlsx_file}")
        try:
            loader = UnstructuredExcelLoader(str(xlsx_file))
            loaded = loader.load()
            print(f"[DEBUG] Loaded {len(loaded)} Excel docs from {xlsx_file}")
            documents.extend(loaded)
        except Exception as e:
            print(f"[ERROR] Failed to load Excel {xlsx_file}: {e}")

    # Word files
    docx_files = list(data_path.glob('**/*.docx'))
    print(f"[DEBUG] Found {len(docx_files)} Word files: {[str(f) for f in docx_files]}")
    for docx_file in docx_files:
        print(f"[DEBUG] Loading Word: {docx_file}")
        try:
            loader = Docx2txtLoader(str(docx_file))
            loaded = loader.load()
            print(f"[DEBUG] Loaded {len(loaded)} Word docs from {docx_file}")
            documents.extend(loaded)
        except Exception as e:
            print(f"[ERROR] Failed to load Word {docx_file}: {e}")

    # JSON files
    json_files = list(data_path.glob('**/*.json'))
    print(f"[DEBUG] Found {len(json_files)} JSON files: {[str(f) for f in json_files]}")
    for json_file in json_files:
        print(f"[DEBUG] Loading JSON: {json_file}")
        try:
            loader = JSONLoader(str(json_file))
            loaded = loader.load()
            print(f"[DEBUG] Loaded {len(loaded)} JSON docs from {json_file}")
            documents.extend(loaded)
        except Exception as e:
            print(f"[ERROR] Failed to load JSON {json_file}: {e}")

    print(f"[DEBUG] Total loaded documents: {len(documents)}")
    return documents

In [3]:
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    """
    Split LangChain documents into smaller chunks for RAG.
    """ 
    print(f"[INFO] Splitting {len(documents)} documents...")

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )

    chunks = splitter.split_documents(documents)

    print(f"[INFO] Generated {len(chunks)} chunks from {len(documents)} documents.")

    # Ejemplo de chunk (solo para aprender)
    if chunks:
        print("\n[DEBUG] Example chunk:")
        print("Content:", chunks[0].page_content[:200], "...")
        print("Metadata:", chunks[0].metadata)

    return chunks

Embedding and VectorStoreDB

In [4]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [5]:
class EmbeddingManager:
    """Generar embeddings usando SentenceTransformer"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager

        Args:
            model_name: HuggingFace model name for sentence embeddings
        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generar embeddings para una lista de textos

        Args:
            texts: Lista de cadenas de texto para convertir a embeddings

        Returns:
            Arreglo numpy de embeddings con la forma (len(texts), embedding_dim)
        """
    
        if not self.model:
            raise ValueError("El modelo no está cargado")

        print(f"Generando embeddings para {len(texts)} textos...")
        
        embeddings = self.model.encode(texts, show_progress_bar=True)

        print(f"Embeddings generados con forma: {embeddings.shape}")

        return embeddings
    
#Inicializando el embeding
embedding_manager = EmbeddingManager()
embedding_manager
    

Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x22f6a206660>

VectorStore

In [6]:
class VectorStore:
    """Gestiona los embeddings de documentos en una base de datos vectorial ChromaDB"""

    def __init__(self, collection_name: str = "base_conocimiento", persist_directory: str = "../data/vector_store"):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """
        Inicializa el cliente de ChromaDB y crea/carga la colección.
        """
        try:
            # Usamos PersistentClient para guardar datos en disco
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            # Crea la colexion en caso no exista, si existe la usa. (Coleccion = Una caja donde van los textos + embeddings)
            self.collection = self.client.get_or_create_collection(name=self.collection_name)
            print(f"[INFO] Colección '{self.collection_name}' cargada correctamente en {self.persist_directory}")
        except Exception as e:
            print(f"[ERROR] Fallo al inicializar ChromaDB: {e}")
            raise

    def add_documents(self, chunks: List[Any], embeddings: np.ndarray):
        """
        Añade documentos (chunks) y sus embeddings a la base de datos.
        """
        if not chunks or len(chunks) == 0:
            print("[WARN] No hay documentos para agregar.")
            return

        print(f"[INFO] Añadiendo {len(chunks)} documentos a ChromaDB...")
        
        # Prepara los datos para Chroma
        ids = [str(uuid.uuid4()) for _ in range(len(chunks))] # IDs únicos para cada chunk
        documents = [chunk.page_content for chunk in chunks]  # Extraccion de texto de cada chunk
        metadatas = [chunk.metadata for chunk in chunks]      # Extraccion de metadatos (origen, página, etc.)
        
        # Chroma espera listas, y los embeddings deben ser listas de python, no numpy arrays a veces
        embeddings_list = embeddings.tolist() if isinstance(embeddings, np.ndarray) else embeddings

        try:
            self.collection.add(
                ids=ids,
                documents=documents,
                embeddings=embeddings_list,
                metadatas=metadatas
            )
            print(f"[INFO] Éxito: {len(chunks)} chunks guardados.")
        except Exception as e:
            print(f"[ERROR] Fallo al agregar documentos: {e}")

vectorstore = VectorStore()
vectorstore

[INFO] Colección 'base_conocimiento' cargada correctamente en ../data/vector_store


<__main__.VectorStore at 0x22f7c6b8ad0>

In [7]:
def search(self, query_embedding: np.ndarray, k: int = 5):
        """
        Busca los documentos más similares al embedding de la consulta.
        5 resultados
        """
        print(f"[INFO] Buscando los {k} fragmentos más relevantes...")
        try:
            # Convertir a lista (la pregunta del usuario) si es numpy
            query_list = query_embedding.tolist() if isinstance(query_embedding, np.ndarray) else query_embedding
            # Busca en ChromaDB los textos más parecidos a traves de los embedings
            results = self.collection.query(
                query_embeddings=query_list,
                n_results=k
            )
            # Devuelve los 5 resultados más parecidos
            return results
        except Exception as e:
            print(f"[ERROR] Fallo en la búsqueda: {e}")
            return None

In [8]:
docs = load_all_documents("../data")
chunks = split_documents(docs)
texts = [doc.page_content for doc in chunks ]
embeddings = embedding_manager.generate_embeddings(texts)
vectorstore.add_documents(chunks,embeddings)

[DEBUG] Data path: C:\Users\HP\Desktop\Practica\Proyectos\Chatbot-RAG-with-Langchain-\data
[DEBUG] Found 2 PDF files: ['C:\\Users\\HP\\Desktop\\Practica\\Proyectos\\Chatbot-RAG-with-Langchain-\\data\\pdf\\Cv_Derecho.pdf', 'C:\\Users\\HP\\Desktop\\Practica\\Proyectos\\Chatbot-RAG-with-Langchain-\\data\\pdf\\Frank Casana Casimiro CV.pdf']
[DEBUG] Loading PDF: C:\Users\HP\Desktop\Practica\Proyectos\Chatbot-RAG-with-Langchain-\data\pdf\Cv_Derecho.pdf
[DEBUG] Loaded 2 PDF docs from C:\Users\HP\Desktop\Practica\Proyectos\Chatbot-RAG-with-Langchain-\data\pdf\Cv_Derecho.pdf
[DEBUG] Loading PDF: C:\Users\HP\Desktop\Practica\Proyectos\Chatbot-RAG-with-Langchain-\data\pdf\Frank Casana Casimiro CV.pdf
[DEBUG] Loaded 2 PDF docs from C:\Users\HP\Desktop\Practica\Proyectos\Chatbot-RAG-with-Langchain-\data\pdf\Frank Casana Casimiro CV.pdf
[DEBUG] Found 1 TXT files: ['C:\\Users\\HP\\Desktop\\Practica\\Proyectos\\Chatbot-RAG-with-Langchain-\\data\\text_files\\IA.txt']
[DEBUG] Loading TXT: C:\Users\HP\De

Batches: 100%|██████████| 1/1 [00:02<00:00,  2.72s/it]


Embeddings generados con forma: (14, 384)
[INFO] Añadiendo 14 documentos a ChromaDB...
[INFO] Éxito: 14 chunks guardados.


Retriever pipeline from VectorStore 

In [9]:
class RAGRetriever:
    """Recupera informacion de la base vectorial deacuerdo a consultas"""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Args:
            vector_store: Base vectorial, contiene la informacion
            embedding_manager: Convierte la consulta en vectores gracias al embedding
        """
        self.vector_store = vector_store 
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Recupera informacion relevante deacuerdo a la pregunta
        
        Args:
            query: Consulta del usuario
            top_k: Numero de resultados que devolverá
            score_threshold: Filtro de similitud, 0 (Acepta todo),0.7(muy parecidos),0.9(Estrictamente parecidos): 0-1
            
        Returns:
           Lista de diccionarios de informacion y metadata.
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")
        
        # La consulta del usuario transformandola a embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        # Busqueda en la base de datos vectorial
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            
            # Procesar resultados, en esta lista se guardaron los resultados filtrados
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]
                # Zip = une elementos con el mismo indice 
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Conversion de distancias a similitud (ChromaDB usa distancias)
                    similarity_score = 1 - distance
                    # Filtro de Calidad, en "retrieved_docs" se guarda la data importante
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever=RAGRetriever(vectorstore,embedding_manager)