### RAG Pipeline - Data Ingestion to Vector DB Pipeline

### Data Ingestion

In [32]:
import os
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

In [33]:
### Leer todos los PDFs del directorio data/pdf/

def process_all_pdfs(pdf_directory):
    # Procesar todos los pdfs en el directorio especificado
    all_documents = []
    pdf_dir = Path(pdf_directory)

    # Find all PDF files in the directory
    pdf_files = list(pdf_dir.glob("*.pdf"))

    print(f"Found {len(pdf_files)} PDF files.")

    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file.name}")
        try:
            # Cargar el PDF usando PyMuPDFLoader
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            #Agregar la fuente de informacion al metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            
            all_documents.extend(documents)
            print(f"Loaded {len(documents)} pages from {pdf_file.name}")
        except  Exception as e:
            print(f"Error loading {pdf_file.name}: {e}")
    print(f"Total pages loaded: {len(all_documents)}")
    return all_documents


In [34]:
all_pdf_documents = process_all_pdfs("../data/pdf/")

Found 3 PDF files.
Processing file: STAR stories.pdf
Loaded 2 pages from STAR stories.pdf
Processing file: SRE DevOps Interview CheatSheet.pdf
Loaded 64 pages from SRE DevOps Interview CheatSheet.pdf
Processing file: Diego Ramos - Cover Letter.pdf
Loaded 1 pages from Diego Ramos - Cover Letter.pdf
Total pages loaded: 67


In [35]:
all_pdf_documents

[Document(metadata={'producer': 'Skia/PDF m143 Google Docs Renderer', 'creator': 'PyPDF', 'creationdate': '', 'title': 'STAR stories', 'source': '../data/pdf/STAR stories.pdf', 'total_pages': 2, 'page': 0, 'page_label': '1', 'source_file': 'STAR stories.pdf', 'file_type': 'pdf'}, page_content='Historia  1:  “Automatizando  la  Validación  de  CPLDs  con  \nIA\n \nLocal”\n \n(Principios:  Invent  and  Simplify  |  Deliver  Results  |  Ownership)  \nS  –  Situation  \nDurante  la  etapa  crítica  de  validación  del  nuevo  servidor  Xeon,  el  proceso  manual  de  \nvalidación\n \nde\n \nCPLDs\n \ntomaba\n \nmás\n \nde\n \n10\n \nhoras\n \npor\n \nbuild.\n \nCada\n \niteración\n \ndependía\n \nde\n \nscripts\n \nfragmentados\n \ny\n \nvalidaciones\n \nmanuales,\n \ngenerando\n \nerrores\n \nhumanos\n \ny\n \nretrasos\n \nsignificativos\n \njusto\n \nantes\n \ndel\n \nPower\n \nOn\n.\n \nT  –  Task  \nComo  ingeniero  senior  de  automatización,  debía  diseñar  un  sistema  que  redujer

### Text splitting into CHUNKS

In [36]:
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    """Split documents into smaller chunks for better RAG performance."""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks.")

    # Mostrar ejemplo de chunk
    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:50]}...")  # Mostrar los primeros 500 caracteres del primer chunk
        print(f"Metadata: {split_docs[0].metadata}")  # Mostrar los primeros 500 caracteres del primer chunk
    
    return split_docs


In [37]:
chunks = split_documents(all_pdf_documents)
chunks

Split 67 documents into 113 chunks.

Example chunk:
Content: Historia  1:  “Automatizando  la  Validación  de  ...
Metadata: {'producer': 'Skia/PDF m143 Google Docs Renderer', 'creator': 'PyPDF', 'creationdate': '', 'title': 'STAR stories', 'source': '../data/pdf/STAR stories.pdf', 'total_pages': 2, 'page': 0, 'page_label': '1', 'source_file': 'STAR stories.pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'Skia/PDF m143 Google Docs Renderer', 'creator': 'PyPDF', 'creationdate': '', 'title': 'STAR stories', 'source': '../data/pdf/STAR stories.pdf', 'total_pages': 2, 'page': 0, 'page_label': '1', 'source_file': 'STAR stories.pdf', 'file_type': 'pdf'}, page_content='Historia  1:  “Automatizando  la  Validación  de  CPLDs  con  \nIA\n \nLocal”\n \n(Principios:  Invent  and  Simplify  |  Deliver  Results  |  Ownership)  \nS  –  Situation  \nDurante  la  etapa  crítica  de  validación  del  nuevo  servidor  Xeon,  el  proceso  manual  de  \nvalidación\n \nde\n \nCPLDs\n \ntomaba\n \nmás\n \nde\n \n10\n \nhoras\n \npor\n \nbuild.\n \nCada\n \niteración\n \ndependía\n \nde\n \nscripts\n \nfragmentados\n \ny\n \nvalidaciones\n \nmanuales,\n \ngenerando\n \nerrores\n \nhumanos\n \ny\n \nretrasos\n \nsignificativos\n \njusto\n \nantes\n \ndel\n \nPower\n \nOn\n.\n \nT  –  Task  \nComo  ingeniero  senior  de  automatización,  debía  diseñar  un  sistema  que  redujer

### Embedding and Vector StoreDB

In [38]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
### Embeddingg and Vector StoreDB
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

### Embedding layer

In [39]:
# Creating class for Embedding Manager

class EmbeddingManager:

    # Handles document embeddings using SentenceTransformer
    # Models from HuggingFace
    # all-MiniLM-L6-v2 - Model responsible in converting text into vectors (embeddings) 


    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        """"
        Initialize the Embedding manager with a specified model.

        Args:
            model_name (str): Name of the SentenceTransformer model to use.

        """   

        self.model_name = model_name
        self.model = None
        self._load_model()
    
    def _load_model(self):
        """Load the SentenceTransformer model."""
        try:
            print(f"Loading embedding model: {self.model_name}s")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model {self.model_name} loaded successfully. Emedding dimension: {self.model.get_sentence_embedding_dimension()}s")
            
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise e
        
    def generate_embedding(self, texts: List[str]) -> np.ndarray:
        """
        Generate embedding for a given text.

        Args:
            text (str): List of text strings to be embedded.

        Returns:
            numpy array: Embedding vector for the input text. Wihth shape (len(texts), embedding_dim).
           
        """

        if not self.model:
            raise ValueError("Model is not loaded.")
        
        print(f"Generating embedding for text of length {len(texts)}")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embedding of shape: {embeddings.shape}")
        return embeddings

## Initialize Embedding Manager
embedding_manager = EmbeddingManager()
embedding_manager


Loading embedding model: all-MiniLM-L6-v2s
Model all-MiniLM-L6-v2 loaded successfully. Emedding dimension: 384s


<__main__.EmbeddingManager at 0x30abbd6a0>

### Vector Store

In [40]:
class VectorStoreDB:
    """Vector Store Database using ChromaDB to store and retrieve document embeddings."""

    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store_db"):
        """
        Initialize the Vector Store Database.

        Args:
            collection_name (str): Name of the ChromaDB collection.
            persist_directory (str): Directory to persist the ChromaDB database.
        """
        self.collection_name = collection_name
        # Use an absolute path in the current directory
        self.persist_directory = os.path.abspath(persist_directory)
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection."""
        try:
            # Create persist directory with full permissions
            os.makedirs(self.persist_directory, mode=0o777, exist_ok=True)
            print(f"Using database directory: {self.persist_directory}")
            
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF Document Embeddings for RAG"}
            )
            print(f"Vector StoreDB initialized with collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error initializing ChromaDB: {e}")
            raise e
    
    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the vector store.

        Args:
            documents: List of Langchain documents.
            embeddings: Corresponding embeddings for the documents.
        """
        
        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        documents_texts = []
        embedings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate a unique ID for each document
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            # Prepare metadata and text
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            # Document content
            documents_texts.append(doc.page_content)

            # Embedding
            embedings_list.append(embedding.tolist())

        try:
            # Add to ChromaDB collection
            self.collection.add(
                ids=ids,
                metadatas=metadatas,
                documents=documents_texts,
                embeddings=embedings_list
            )
            print(f"Succesfully added {len(documents)} documents to Vector StoreDB.")
            print(f"Total documents in collection now: {self.collection.count()}")

        except Exception as e:
            print(f"Error adding documents to Vector StoreDB: {e}")
            raise e
    
# Initialize Vector StoreDB
vector_store_db = VectorStoreDB()
vector_store_db

Using database directory: /Users/diegoolmedocr/Documents/RAG/second_repo/data/vector_store_db
Vector StoreDB initialized with collection: pdf_documents
Existing documents in collection: 452


<__main__.VectorStoreDB at 0x16d709e80>

In [41]:
# Add chunks to Vector StoreDB
texts = [doc.page_content for doc in chunks]

# Generate embeddings for the chunks
embeddings = embedding_manager.generate_embedding(texts)

#store into the vector database
vector_store_db.add_documents(chunks, embeddings)

Generating embedding for text of length 113


Batches: 100%|██████████| 4/4 [00:00<00:00,  6.69it/s]


Generated embedding of shape: (113, 384)
Succesfully added 113 documents to Vector StoreDB.
Total documents in collection now: 565


### RAG Retriever Pipeline from VectorStore

In [42]:
class RAGRetriever:
    """Class to handle Retrieval-Augmented Generation (RAG) operations."""
    
    def __init__(self, vector_store_db: VectorStoreDB, embedding_manager: EmbeddingManager):
        """
        Initialize RAG Retriever.

        Args:
            vector_store_db (VectorStoreDB): Vector store containing document embeddings.
            embedding_manager (EmbeddingManager): Manager for generating query embeddings.
        """
        self.vector_store_db = vector_store_db
        self.embedding_manager = embedding_manager
    
    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve top-k relevant documents for a given query.

        Args:
            query: the search query string.
            top_k: Number of top results to return.
            score_threshold: Minimum similarity score to consider a document relevant.
        
        Returns:
            List of dictionaries containing retrieved documents and metadata.  
        """
        print(f"Retrieving documents for query: {query}")
        print(f"Top K: {top_k}, Score Threshold: {score_threshold}")
        
        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embedding([query])[0]

        # Search in vector store
        try:
            results = self.vector_store_db.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )  

            # Process results
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance  # Convert distance to similarity score
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'document': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                print(f"Retrieved {len(retrieved_docs)} documents after applying score threshold.")
            else:
                print("No documents found for the given query.")

            return retrieved_docs

        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

# Initialize RAG Retriever
rag_retriever = RAGRetriever(vector_store_db, embedding_manager)
rag_retriever

<__main__.RAGRetriever at 0x30abbd550>

In [43]:
rag_retriever.retrieve("El Primer Lanzamiento a Tiempo en una Década")

Retrieving documents for query: El Primer Lanzamiento a Tiempo en una Década
Top K: 5, Score Threshold: 0.0
Generating embedding for text of length 1


Batches: 100%|██████████| 1/1 [00:00<00:00, 132.61it/s]

Generated embedding of shape: (1, 384)
Retrieved 5 documents after applying score threshold.





[{'id': 'doc_4d60b923_3',
  'document': 'Historia  2:  “El  Primer  Lanzamiento  a  Tiempo  en  una  \nDécada”\n \n(Principios:  Deliver  Results  |  Ownership  |  Earn  Trust  |  Dive  Deep)  \nS  –  Situation  \nEl  programa  de  la  5.ª  Generación  Xeon  enfrentaba  múltiples  retrasos  acumulados  en  \ndesarrollo\n \ny\n \npruebas.\n \nHistóricamente,\n \nninguna\n \ngeneración\n \nanterior\n \nhabía\n \nalcanzado\n \nel\n \ntime-to-market\n \nprometido.\n \nLos\n \ncuellos\n \nde\n \nbotella\n \nen\n \nautomatización\n \ny\n \ncomunicación\n \nentre\n \nequipos\n \nglobales\n \namenazaban\n \ncon\n \nrepetir\n \nel\n \npatrón.\n \nT  –  Task  \nComo  Product  Owner  y  Tech  Lead ,  debía  coordinar  equipos  de  diseño,  validación  y  \nmanufactura\n \nen\n \ntres\n \npaíses,\n \nmejorar\n \nla\n \neficiencia\n \nde\n \nlos\n \nflujos\n \nde\n \nautomatización\n \ny\n \nasegurar\n \nun\n \nentregable\n \na\n \ntiempo\n \npor\n \nprimera\n \nvez\n \nen\n \n10\n \naños.\n \nA  –

### Integration VectorDB Context Pipeline with LLM Output

In [None]:
# Simple RAG pipeline with GROQ LLM
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()

# Initialize GROQ LLM
groq_api_key = os.getenv("GROQ_API_KEY")

llm = ChatGroq(api_key=groq_api_key, model_name="llama-3.1-8b-instant", temperature=0.1, max_tokens=1024)

# Simple RAG function: retrieve + generate response
def rag_simple(query, retriever, llm, top_k=3):
    # Retrieve the context
    results = retriever.retrieve(query, top_k=top_k)
    context = "\n\n".join(doc['document'] for doc in results)
    if not context:
        return "No relevant documents found."
    
    # Generate response using Groq LLM
    prompt = """Use the following context to answer the question:
            Context:
            {context}
            
            Question: 
            {query}
            
            Answer:"""
    response = llm.invoke(prompt.format(context=context, query=query))
    return response.content
    

In [45]:
answer = rag_simple("Hablame sobre el Primer Lanzamiento a Tiempo en una Década", rag_retriever, llm)

print(answer)

Retrieving documents for query: Hablame sobre el Primer Lanzamiento a Tiempo en una Década
Top K: 3, Score Threshold: 0.0
Generating embedding for text of length 1


Batches: 100%|██████████| 1/1 [00:00<00:00, 134.37it/s]


Generated embedding of shape: (1, 384)
Retrieved 3 documents after applying score threshold.
La Historia 2: "El Primer Lanzamiento a Tiempo en una Década" es un relato sobre la experiencia de un equipo en la implementación de prácticas de Agile/Scrum en un entorno tradicionalmente rígido. El objetivo era mejorar la eficiencia de los flujos de automatización y asegurar un entregable a tiempo por primera vez en 10 años.

El programa de la 5.ª Generación Xeon enfrentaba múltiples retrasos acumulados en desarrollo y pruebas. Históricamente, ninguna generación anterior había alcanzado el time-to-market prometido. Los cuellos de botella en automatización y comunicación entre equipos globales amenazaban con repetir el patrón.

Como Product Owner y Tech Lead, el equipo debía coordinar equipos de diseño, validación y manufactura en tres países, mejorar la eficiencia de los flujos de automatización y asegurar un entregable a tiempo por primera vez en 10 años.

Para lograr este objetivo, se intro

### Enhanced RAG Pipeline Features

In [46]:
def rag_advanced(query, retriever, llm, top_k=5, min_score=0.0, return_context=False):

    results = retriever.retrieve(query, top_k=top_k, score_threshold=min_score)
    if not results:
        return {
            "answer": "No relevant documents found.",
            "sources": [],
            "confidence_score": 0.0,
            "context": ''
        }
    
    # Prepare context and sources
    context = "\n\n".join(doc['document'] for doc in results)

    sources = [{
        'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
        'page': doc['metadata'].get('page', 'unknown'),
        'score': doc['similarity_score'],
        'preview': doc['document'][:300] + "..."
    } for doc in results]


    # Generate answer
    prompt = """Use the following context to answer the question:
            Context:
            {context}
            Question: 
            {query}
            Answer:
            """
    
    response = llm.invoke(prompt.format(context=context, query=query))  

    output = {
        "answer": response.content,
        "sources": sources,
        "confidence_score": confidence
    }
    if return_context:
        output["context"] = context
    return output

In [47]:
from textwrap import dedent

def rag_advanced(query, retriever, llm, top_k=5, min_score=0.0, return_context=False):
    """
    Advanced RAG pipeline:
    - Retrieves docs
    - Builds clean context
    - Sends safe prompt
    - Returns answer, sources, confidence, and optional context
    """

    # 1. Retrieve documents
    results = retriever.retrieve(query, top_k=top_k, score_threshold=min_score)

    if not results:
        return {
            "answer": "No relevant documents found.",
            "sources": [],
            "confidence_score": 0.0,
            "context": "" if return_context else None
        }

    # 2. Build context
    context = "\n\n".join(doc["document"] for doc in results)

    # Escape { } to avoid .format() breaking the prompt if docs contain JSON/code
    safe_context = context.replace("{", "{{").replace("}", "}}")

    # 3. Build prompt (clean, no indentation issues)
    prompt_template = dedent("""
    Use the following context to answer the question:

    Context:
    {context}

    Question:
    {query}

    Answer:
    """)

    final_prompt = prompt_template.format(context=safe_context, query=query)

    # 4. Call the LLM
    response = llm.invoke(final_prompt)

    # Extract text depending on the model type
    if hasattr(response, "content"):
        answer_text = response.content
    elif hasattr(response, "message"):
        answer_text = response.message.content
    else:
        answer_text = str(response)

    # 5. Prepare sources
    sources = []
    for doc in results:
        meta = doc["metadata"]
        source_name = (
            meta.get("source_file")
            or meta.get("source")
            or meta.get("filename")
            or meta.get("file")
            or "unknown"
        )

        sources.append({
            "source": source_name,
            "page": meta.get("page", "unknown"),
            "score": doc["similarity_score"],
            "preview": doc["document"][:300] + "..."
        })

    # 6. Confidence score (average)
    confidence = sum(doc["similarity_score"] for doc in results) / len(results)

    # 7. Build output
    output = {
        "answer": answer_text,
        "sources": sources,
        "confidence_score": confidence,
    }

    if return_context:
        output["context"] = context

    return output



In [48]:
# Example usage
result = rag_advanced("Hablame sobre el Primer Lanzamiento a Tiempo en una Década", rag_retriever, llm, top_k=3, min_score=0.0, return_context=True)
print("Answer:", result['answer'])
print("Sources:", result['sources'])
print("Confindece:", result['confidence_score'])
print("Context Preview:", result['context'][:500])

Retrieving documents for query: Hablame sobre el Primer Lanzamiento a Tiempo en una Década
Top K: 3, Score Threshold: 0.0
Generating embedding for text of length 1


Batches: 100%|██████████| 1/1 [00:00<00:00, 106.63it/s]

Generated embedding of shape: (1, 384)
Retrieved 3 documents after applying score threshold.





Answer: "El Primer Lanzamiento a Tiempo en una Década" es una historia de éxito que describe cómo se logró superar múltiples retrasos acumulados en el desarrollo del programa de la 5.ª Generación Xeon. A continuación, te presento los detalles clave de esta historia:

**Situación**: El programa de la 5.ª Generación Xeon enfrentaba retrasos significativos en el desarrollo y pruebas, lo que amenazaba con repetir el patrón histórico de no alcanzar el time-to-market prometido. Además, los cuellos de botella en automatización y comunicación entre equipos globales dificultaban la coordinación y la eficiencia.

**Tarea**: Como Product Owner y Tech Lead, se me encargó coordinar equipos de diseño, validación y manufactura en tres países, mejorar la eficiencia de los flujos de automatización y asegurar un entregable a tiempo por primera vez en 10 años.

**Acción**: Para superar estos desafíos, introduje prácticas de Agile/Scrum en un entorno tradicionalmente rígido, implementando Jira y CA Agile 