# LLM Contextual ETL + RAG Pipeline

This ETL pipeline is designed to ingest unstructured documents and prepare
LLM-ready context for a personal AI assistant using retrieval-augmented generation (RAG).

## Source Code
The source code for this ETL demo is also maintained on GitHub:
https://github.com/AaljinAntony/Fairy_Assistant


**Pipeline Stages:**
1.  **Extract**: Load unstructured text data.
2.  **Transform**: Clean and chunk the text into manageable pieces.
3.  **Load**: Generate vector embeddings and store them in a vector database.
4.  **Retrieval Test**: Verify the pipeline by querying the database.

In [None]:
# @title Setup & Installation
# Install necessary libraries
!pip install sentence-transformers chromadb numpy

: 

In [None]:
import numpy as np
import chromadb
from sentence_transformers import SentenceTransformer
import re
from typing import List, Dict

## 1. Extract

In a real-world scenario, this stage would involve parsing PDFs, scraping websites, or connecting to APIs. For this demonstration, we use a sample raw text simulating a technical article.

In [None]:
# Sample Raw Text Data (Simulating a technical document)
raw_text = """
Deep learning is a subset of machine learning, which is essentially a neural network with three or more layers.
These neural networks attempt to simulate the behavior of the human brain—albeit far from matching its ability—allowing it to "learn" from large amounts of data.
While a neural network with a single layer can still make approximate predictions, additional hidden layers can help to optimize and refine for accuracy.

Deep learning drives many artificial intelligence (AI) applications and services that improve automation, performing analytical and physical tasks without human intervention.
Deep learning technology lies behind everyday products and services (such as digital assistants, voice-enabled TV remotes, and credit card fraud detection) as well as emerging technologies (such as self-driving cars).

RAG (Retrieval-Augmented Generation) is a technique for enhancing the accuracy and reliability of generative AI models with facts fetched from external sources.
By constructing a prompt that includes both the user's query and relevant retrieved data, RAG allows the model to generate responses that are grounded in specific, up-to-date information.
"""

print(f"Extracted {len(raw_text)} characters of raw text.")

## 2. Transform (Cleaning & Chunking)

Context windows in LLMs are limited. We must split (chunk) the text into smaller segments. We use **recursive chunking**, which tries to split by meaningful separators (paragraphs, newlines, sentences) to preserve semantic context.

In [None]:
def clean_text(text: str) -> str:
    """Basic text cleaning to normalize whitespace."""
    # Replace multiple newlines with a single newline marker for structure, or just space
    # Here we perform simple space normalization
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def recursive_chunker(text: str, chunk_size: int = 150, chunk_overlap: int = 20) -> List[str]:
    """
    Splits text into chunks of approximately `chunk_size` characters.
    Prioritizes splitting by separators to keep sentences intact.
    Ensures splits happen at word boundaries.
    """
    separators = ["\n\n", "\n", ". ", " ", ""]
    chunks = []
    start = 0
    
    while start < len(text):
        end = start + chunk_size
        if end >= len(text):
            chunks.append(text[start:])
            break
            
        # Find the best separator near the end of the chunk
        best_split = end
        found_separator = False
        
        # Look backwards from 'end' to find a safe split point
        for sep in separators:
            if sep == "": continue # fallback
            # Search only within the last 50% of the chunk to avoid too small chunks
            search_start = max(start, end - chunk_size // 2)
            # Find separator using rfind
            split_idx = text.rfind(sep, search_start, end)
            
            if split_idx != -1:
                best_split = split_idx + len(sep)
                found_separator = True
                break
        
        # If no separator found, try to find the nearest space to avoid cutting words
        if not found_separator:
             # Look primarily for space
             space_idx = text.rfind(" ", max(start, end - 50), end)
             if space_idx != -1:
                 best_split = space_idx + 1
        
        chunk = text[start:best_split]
        chunks.append(chunk.strip())
        
        # Calculate next start position with overlap
        next_start = best_split - chunk_overlap
        
        # Adjust next_start to align with a word boundary (space)
        # We search backwards from next_start to find the nearest space
        if next_start > 0 and next_start < len(text):
            space_idx = text.rfind(" ", max(0, next_start - 20), next_start)
            if space_idx != -1:
                next_start = space_idx + 1
                
        # Ensure valid start for next loop
        start = max(start + 1, next_start)
    
    return chunks

# Execution
cleaned_text = clean_text(raw_text)
chunks = recursive_chunker(cleaned_text, chunk_size=200, chunk_overlap=30)

print(f"Generated {len(chunks)} chunks:")
for i, c in enumerate(chunks):
    print(f"Chunk {i+1}: {c[:50]}...")

## 3. Load (Embeddings & Vector DB)

We convert text chunks into numerical vectors (embeddings) using a pre-trained model (`all-MiniLM-L6-v2`) and store them in **ChromaDB**, an open-source vector database.

In [None]:
# 1. Initialize Embedding Model
print("Loading embedding model...")
model = SentenceTransformer('all-MiniLM-L6-v2')

# 2. Generate Embeddings
embeddings = model.encode(chunks)
print(f"Generated embeddings shape: {embeddings.shape}")

# 3. Initialize ChromaDB (Ephemeral - In-memory)
client = chromadb.Client()

# Handle case where collection already exists (for re-running notebook)
try:
    client.delete_collection(name="demo_knowledge_base")
except Exception:
    pass

collection = client.create_collection(name="demo_knowledge_base")

# 4. Load Data into Vector DB
ids = [f"id_{i}" for i in range(len(chunks))]
metadatas = [{"source": "sample_text", "chunk_index": i} for i in range(len(chunks))]

collection.add(
    documents=chunks,
    embeddings=embeddings.tolist(),
    metadatas=metadatas,
    ids=ids
)

print(f"Successfully loaded {len(chunks)} items into ChromaDB.")

## 4. Retrieval Test

We verify the pipeline by simulating a user query. The system embeds the query and finds the most similar chunks (Nearest Neighbors).

In [None]:
query = "What is RAG?"

# 1. Embed Query
query_embedding = model.encode([query]).tolist()

# 2. Search Database
results = collection.query(
    query_embeddings=query_embedding,
    n_results=2
)

# 3. Display Results
print(f"Query: '{query}'\n")
print("Top Retrieved Results:")
for i, doc in enumerate(results['documents'][0]):
    print(f"--- Result {i+1} ---")
    print(f"Text: {doc}")
    print(f"Distance: {results['distances'][0][i]:.4f}")