# Building Custom RAG Components from Scratch

This notebook demonstrates how to create your own custom RAG pipeline components by implementing the base class interfaces. This is useful when you want to:

- Integrate a custom embedding model or API
- Use a different vector database backend
- Implement specialized chunking strategies
- Create custom reranking logic


In [1]:
from typing import List, Dict, Any, Optional
import numpy as np

# Base classes to extend
from rag_search.parameter_impls.embedding_impls.embedding_base import BaseEmbedding
from rag_search.parameter_impls.storage_impls.storage_base import BaseStorage

# RAGClient and built-in components we'll mix with our custom ones
from rag_search.rag_client import RAGClient
from rag_search.parameter_impls.ingestion_impls import SimpleIngestion
from rag_search.parameter_impls.chunking_impls import SlidingWindowChunking
from rag_search.parameter_impls.retriever_impls import SimpleRetriever

## 1. Custom Embedding: Wrapping Sentence Transformers

Let's create a custom embedding component that wraps the `sentence-transformers` library directly. This shows how you can integrate any embedding model or API.

**Required methods:**
- `embed_text(text: str) -> List[float]` - Embed a single text
- `embed_batch(texts: List[str]) -> List[List[float]]` - Embed multiple texts efficiently

In [2]:
from sentence_transformers import SentenceTransformer


class CustomSentenceTransformerEmbedding(BaseEmbedding):
    """Custom embedding implementation using sentence-transformers directly.
    
    This demonstrates how to wrap any embedding model or API to work
    with the RAG pipeline.
    """
    
    def __init__(self, model_name: str = "all-mpnet-base-v2"):
        """Initialize with a sentence-transformers model.
        
        Args:
            model_name: Name of the sentence-transformers model to use.
                       See https://www.sbert.net/docs/pretrained_models.html
        """
        self.model_name = model_name
        self.model = SentenceTransformer(model_name)
        print(f"Loaded embedding model: {model_name}")
        print(f"Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
    
    def embed_text(self, text: str) -> List[float]:
        """Embed a single text string."""
        embedding = self.model.encode(text, convert_to_numpy=True)
        return embedding.tolist()
    
    def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """Embed multiple texts efficiently in a batch."""
        embeddings = self.model.encode(texts, convert_to_numpy=True)
        return embeddings.tolist()


# Test the custom embedding
custom_embedding = CustomSentenceTransformerEmbedding(model_name="all-MiniLM-L6-v2")

test_text = "This is a test sentence for embedding."
embedding = custom_embedding.embed_text(test_text)
print(f"\nEmbedding for: '{test_text}'")
print(f"Vector length: {len(embedding)}")
print(f"First 5 values: {embedding[:5]}")

  from .autonotebook import tqdm as notebook_tqdm


Loaded embedding model: all-MiniLM-L6-v2
Embedding dimension: 384

Embedding for: 'This is a test sentence for embedding.'
Vector length: 384
First 5 values: [0.027824116870760918, 0.0017025501001626253, 0.08005548268556595, 0.046662840992212296, 0.03852203115820885]


## 2. Custom Storage: FAISS Vector Store

Now let's create a custom storage backend using [FAISS](https://github.com/facebookresearch/faiss) for efficient similarity search. FAISS is optimized for fast nearest-neighbor search on large vector datasets.

**Required methods:**
- `add_documents(chunks, embeddings, metadata)` - Store documents with their embeddings
- `search(query_embedding, top_k, ...)` - Find similar documents

In [3]:
import faiss


class FAISSStorage(BaseStorage):
    """Custom storage backend using FAISS for efficient similarity search.
    
    This demonstrates how to integrate a custom vector database or
    search backend with the RAG pipeline.
    """
    
    def __init__(self, use_gpu: bool = False):
        """Initialize FAISS storage.
        
        Args:
            use_gpu: Whether to use GPU acceleration (requires faiss-gpu)
        """
        self.use_gpu = use_gpu
        self.index = None
        self.dimension = None
        self.documents: List[Dict[str, Any]] = []
    
    def add_documents(
        self,
        chunks: List[str],
        embeddings: List[List[float]],
        metadata: Optional[List[Dict[str, Any]]] = None
    ) -> None:
        """Add documents to the FAISS index."""
        if not chunks:
            return
        
        # Convert embeddings to numpy array
        vectors = np.array(embeddings, dtype=np.float32)
        
        # Initialize index on first call
        if self.index is None:
            self.dimension = vectors.shape[1]
            # Using IndexFlatIP for inner product (cosine similarity with normalized vectors)
            self.index = faiss.IndexFlatIP(self.dimension)
            if self.use_gpu and faiss.get_num_gpus() > 0:
                self.index = faiss.index_cpu_to_gpu(faiss.StandardGpuResources(), 0, self.index)
        
        # Normalize vectors for cosine similarity
        faiss.normalize_L2(vectors)
        
        # Add vectors to index
        self.index.add(vectors)
        
        # Store document metadata
        start_idx = len(self.documents)
        for i, chunk in enumerate(chunks):
            doc = {
                "text": chunk,
                "chunk_id": start_idx + i,
            }
            if metadata and i < len(metadata):
                doc.update(metadata[i])
            self.documents.append(doc)
        
        print(f"Added {len(chunks)} documents. Total: {len(self.documents)}")
    
    def search(
        self,
        query_embedding: List[float],
        top_k: int = 5,
        similarity_func: str = "cosine",
        filter_metadata: Optional[Dict[str, Any]] = None,
        **kwargs: Any
    ) -> List[Dict[str, Any]]:
        """Search for similar documents using FAISS."""
        if self.index is None or len(self.documents) == 0:
            return []
        
        # Prepare query vector
        query_vector = np.array([query_embedding], dtype=np.float32)
        faiss.normalize_L2(query_vector)
        
        # Search
        scores, indices = self.index.search(query_vector, min(top_k, len(self.documents)))
        
        # Build results
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx < 0:  # FAISS returns -1 for empty slots
                continue
            
            doc = self.documents[idx].copy()
            doc["score"] = float(score)
            
            # Apply metadata filter if provided
            if filter_metadata:
                if all(doc.get(k) == v for k, v in filter_metadata.items()):
                    results.append(doc)
            else:
                results.append(doc)
        
        return results
    
    def clear(self) -> None:
        """Clear all documents from storage."""
        self.index = None
        self.documents = []
    
    def count(self) -> int:
        """Return the number of documents in storage."""
        return len(self.documents)


# Test the custom storage
custom_storage = FAISSStorage()
print(f"FAISS storage initialized")

FAISS storage initialized


## 3. Sample Corpus

Let's define a sample document corpus for testing our custom components.

In [4]:
documents = [
    {
        "doc_id": "doc_1",
        "text": "Retrieval-Augmented Generation (RAG) combines information retrieval with text generation to improve factual accuracy. It works by first retrieving relevant documents from a knowledge base, then using those documents as context for a language model to generate responses."
    },
    {
        "doc_id": "doc_2",
        "text": "Vector databases store embeddings and enable efficient similarity search for retrieval systems. They use specialized indexing structures like HNSW or IVF to find nearest neighbors in high-dimensional spaces quickly."
    },
    {
        "doc_id": "doc_3",
        "text": "Chunking strategies such as sliding windows affect recall and precision in RAG pipelines. Smaller chunks provide more precise retrieval but may lose context, while larger chunks preserve context but may include irrelevant information."
    },
    {
        "doc_id": "doc_4",
        "text": "Embedding models map text into dense vector representations used for semantic search. Models like BERT, sentence-transformers, and OpenAI embeddings capture semantic meaning, allowing retrieval based on meaning rather than exact keyword matches."
    },
    {
        "doc_id": "doc_5",
        "text": "FAISS (Facebook AI Similarity Search) is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that possibly do not fit in RAM."
    },
]

print(f"Corpus contains {len(documents)} documents")

Corpus contains 5 documents


## 4. Building the Pipeline with Custom Components

Now we'll wire up our custom components with the built-in ones to create a complete RAG pipeline.

In [5]:
# Create fresh instances of our custom components
custom_embedding = CustomSentenceTransformerEmbedding(model_name="all-MiniLM-L6-v2")
custom_storage = FAISSStorage()

# Create a retriever using our custom components
custom_retriever = SimpleRetriever(
    storage=custom_storage,
    embedding=custom_embedding,
    top_k=3,
    reranker=None  # No reranking for this example
)

# Build the RAGClient mixing custom and built-in components
client = RAGClient(
    ingestion=SimpleIngestion(),           # Built-in: simple text ingestion
    chunking=SlidingWindowChunking(        # Built-in: sliding window chunking
        chunk_size=256,
        chunk_overlap=50
    ),
    embedding=custom_embedding,             # Custom: our sentence-transformers wrapper
    storage=custom_storage,                 # Custom: our FAISS storage
    retriever=custom_retriever,             # Uses our custom embedding + storage
)

print("\nRAGClient created with custom embedding and storage!")

Loaded embedding model: all-MiniLM-L6-v2
Embedding dimension: 384

RAGClient created with custom embedding and storage!


In [6]:
# Upload documents to the pipeline
print("Uploading documents...\n")
client.upload_documents(documents)
print(f"\nTotal chunks in storage: {custom_storage.count()}")

Uploading documents...

Added 10 documents. Total: 10

Total chunks in storage: 10


## 5. Test Retrieval

Let's test our custom pipeline with some queries.

In [7]:
test_queries = [
    "What is RAG and how does it work?",
    "How do vector databases enable fast search?",
    "What is FAISS used for?",
]

for query in test_queries:
    print(f"Query: {query}")
    print("-" * 60)
    
    results = client.retrieve(query)
    
    for i, result in enumerate(results, 1):
        score = result.get('score', 0)
        text = result.get('text', '')[:100] + "..." if len(result.get('text', '')) > 100 else result.get('text', '')
        doc_id = result.get('doc_id', 'N/A')
        print(f"  {i}. [Score: {score:.4f}] ({doc_id}) {text}")
    
    print()

Query: What is RAG and how does it work?
------------------------------------------------------------
  1. [Score: 0.3457] (doc_1) Retrieval-Augmented Generation (RAG) combines information retrieval with text generation to improve ...
  2. [Score: 0.3354] (doc_3) Chunking strategies such as sliding windows affect recall and precision in RAG pipelines. Smaller ch...
  3. [Score: 0.1150] (doc_4) ning rather than exact keyword matches.

Query: How do vector databases enable fast search?
------------------------------------------------------------
  1. [Score: 0.6646] (doc_2) Vector databases store embeddings and enable efficient similarity search for retrieval systems. They...
  2. [Score: 0.4354] (doc_5) FAISS (Facebook AI Similarity Search) is a library for efficient similarity search and clustering of...
  3. [Score: 0.3953] (doc_4) Embedding models map text into dense vector representations used for semantic search. Models like BE...

Query: What is FAISS used for?
-------------------