### Sharded & Scattered Retrieval
As a knowledge base grows from thousands to millions or billions of documents …

A single, monolithic vector store becomes a major bottleneck. Search latency increases, and the index becomes unwieldy to manage and update.

<p align="center">
  <img src="../../figures/shard_retrieval.png" width="800">
</p>

The architectural solution for this issue is Sharded & Scattered Retrieval. The core idea is that, instead of having one massive index, we partition (or shard) our knowledge base into multiple, smaller, independent vector stores.

These shards can be organized by any logical division, such as topic, date, or data source. When a user query arrives, a central orchestrator “scatters” the query to all shards, which perform their searches in parallel. The results are then gathered and re-ranked to find the globally best documents.

We will build a simulated two-shard system (Engineering vs. Marketing) and compare it to a monolithic system to understand the benefits in both latency and answer quality.

In [None]:
from dotenv import load_dotenv
load_dotenv()

True

In [None]:
from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace

llm = ChatHuggingFace(
    llm=HuggingFaceEndpoint(
        model="Qwen/Qwen3-4B-Instruct-2507"
    )
)

  from .autonotebook import tqdm as notebook_tqdm


### Creating the Knowledge Base Shards
We will create two separate vector stores to simulate our shards. Each will contain domain-specific information.

In [3]:
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document

# Engineering KB Documents
eng_docs = [
    Document(page_content="The QuantumLeap V3 processor utilizes a 3nm process node and features a dedicated AI accelerator core with 128 tensor units. API endpoint `/api/v3/status` provides real-time thermal throttling data.", metadata={"source": "eng-kb"}),
    Document(page_content="Firmware update v2.1 for the Aura Smart Ring optimizes the photoplethysmography (PPG) sensor algorithm for more accurate sleep stage detection. The update is deployed via the mobile app.", metadata={"source": "eng-kb"}),
    Document(page_content="The Smart Mug's heating element is a nickel-chromium coil controlled by a PID controller. It maintains temperature within +/- 1 degree Celsius. Battery polling is done via the `getBattery` function.", metadata={"source": "eng-kb"})
]

# Marketing KB Documents
mkt_docs = [
    Document(page_content="Press Release: Unveiling the QuantumLeap V3, the AI processor that redefines speed. 'It's a game-changer for creative professionals,' says CEO Jane Doe. Available Q4.", metadata={"source": "mkt-kb"}),
    Document(page_content="Product Page: The Aura Smart Ring is your personal wellness companion. Crafted from aerospace-grade titanium, it empowers you to unlock your full potential by understanding your body's signals.", metadata={"source": "mkt-kb"}),
    Document(page_content="Blog Post: 'Five Ways Our Smart Mug Supercharges Your Morning Routine.' The perfect temperature, from the first sip to the last, means your coffee is always perfect.", metadata={"source": "mkt-kb"})
]

# Create embedding model
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

# Create the two vector store shards
eng_vectorstore = FAISS.from_documents(eng_docs, embedding=embeddings)
mkt_vectorstore = FAISS.from_documents(mkt_docs, embedding=embeddings)

eng_retriever = eng_vectorstore.as_retriever(search_kwargs={"k": 2})
mkt_retriever = mkt_vectorstore.as_retriever(search_kwargs={"k": 2})

print(f"Knowledge Base shards created: Engineering KB ({len(eng_docs)} docs), Marketing KB ({len(mkt_docs)} docs).")

  embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")


Knowledge Base shards created: Engineering KB (3 docs), Marketing KB (3 docs).


### The Baseline - A Monolithic RAG System
To establish a baseline, we'll first create a traditional RAG system with a single, combined knowledge base. We will add a simulated latency to its retrieval step to mimic searching a much larger index.

In [4]:
import time
from langchain_core.runnables import RunnableLambda

# 1. Create the monolithic vector store
all_docs = eng_docs + mkt_docs
monolithic_vectorstore = FAISS.from_documents(all_docs, embedding=embeddings)
monolithic_retriever = monolithic_vectorstore.as_retriever(search_kwargs={"k": 4})

# 2. Simulate the increased latency of a large index
def slow_retrieval(query):
    print("--- [Monolithic Retriever] Searching large index... (simulating high latency) ---")
    time.sleep(2.5) # Simulate latency
    return monolithic_retriever.invoke(query)

slow_monolithic_retriever = RunnableLambda(slow_retrieval)

# 3. Create the monolithic RAG chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

generator_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are an expert technical and marketing support agent. Answer the user's question based *only* on the provided context.\n\nContext:\n{context}"),
    ("human", "Question: {question}")
])

def format_docs(docs):
    return "\n\n".join(f"[Source: {doc.metadata.get('source', 'N/A')}] {doc.page_content}" for doc in docs)

monolithic_rag_chain = (
    {"context": slow_monolithic_retriever | format_docs, "question": RunnablePassthrough()}
    | generator_prompt
    | llm
    | StrOutputParser()
)

### Building the Sharded RAG Graph
Now, let's build the superior, sharded system. The core of this system is a node that scatters the query to our two shards in parallel.

In [5]:
from typing import TypedDict, List
from langchain_core.documents import Document
from concurrent.futures import ThreadPoolExecutor
from langchain_core.runnables import RunnableConfig

class ShardedRAGState(TypedDict):
    question: str
    retrieved_docs: List[Document]
    final_answer: str

# Node 1: Parallel Retrieval (Scatter-Gather)
def parallel_retrieval_node(state: ShardedRAGState):
    """Scatters the query to all shards and gathers the results."""
    print("--- [Meta-Retriever] Scattering query to Engineering and Marketing shards in parallel... ---")
    
    # We'll use a ThreadPool to run retrievals concurrently
    with ThreadPoolExecutor(max_workers=2) as executor:
        # P_retrieval function to add a delay to each shard search
        def p_retrieval(retriever):
            time.sleep(0.5) # Simulate network hop and smaller index search time
            return retriever.invoke(state['question'])
        
        futures = [executor.submit(p_retrieval, retriever) for retriever in [eng_retriever, mkt_retriever]]
        
        all_docs = []
        for future in futures:
            all_docs.extend(future.result())
    
    # In a real system, you'd add a re-ranking step here. For now, we'll just deduplicate.
    unique_docs = list({doc.page_content: doc for doc in all_docs}.values())
    print(f"--- [Meta-Retriever] Gathered {len(unique_docs)} unique documents from 2 shards. ---")
    return {"retrieved_docs": unique_docs}

# Node 2: Generation Node (same as before)
def generation_node(state: ShardedRAGState):
    """Synthesizes the final answer from the gathered documents."""
    print("--- [Generator] Synthesizing final answer... ---")
    context = format_docs(state['retrieved_docs'])
    answer = (
        generator_prompt 
        | llm 
        | StrOutputParser()
    ).invoke({"context": context, "question": state['question']})
    return {"final_answer": answer}

In [6]:
from langgraph.graph import StateGraph, END

workflow = StateGraph(ShardedRAGState)
workflow.add_node("parallel_retrieval", parallel_retrieval_node)
workflow.add_node("generate_answer", generation_node)

workflow.set_entry_point("parallel_retrieval")
workflow.add_edge("parallel_retrieval", "generate_answer")
workflow.add_edge("generate_answer", END)

sharded_rag_app = workflow.compile()

### Head-to-Head Comparison
Now we will ask both systems a question that requires information from both the engineering and marketing knowledge bases to be answered completely and accurately. The monolithic system may struggle to find the less-dominant but still relevant context.

In [7]:
# This query has strong marketing keywords ('game-changer', 'creative professionals')
# but also a specific technical question ('API status endpoint').
user_query = "I heard the new QuantumLeap V3 is a 'game-changer for creative professionals'. Can you tell me more about it, and is there an API endpoint to check its status?"

### Running the Monolithic RAG System

In [8]:
print("--- [MONOLITHIC RAG] Starting run... ---")
start_time = time.time()

# We'll capture the context to inspect it
retrieved_context_mono = ""
def capture_context_mono(docs):
    global retrieved_context_mono
    retrieved_context_mono = format_docs(docs)
    return retrieved_context_mono

monolithic_rag_chain_instrumented = (
    {"context": slow_monolithic_retriever | capture_context_mono, "question": RunnablePassthrough()}
    | generator_prompt
    | llm
    | StrOutputParser()
)
monolithic_answer = monolithic_rag_chain_instrumented.invoke(user_query)
monolithic_time = time.time() - start_time

print("="*60)
print("               MONOLITHIC RAG SYSTEM OUTPUT")
print("="*60 + "\n")
print("Retrieved Context:")
print(retrieved_context_mono + "\n")
print("Final Answer:")
print(monolithic_answer)

--- [MONOLITHIC RAG] Starting run... ---
--- [Monolithic Retriever] Searching large index... (simulating high latency) ---


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


               MONOLITHIC RAG SYSTEM OUTPUT

Retrieved Context:
[Source: mkt-kb] Press Release: Unveiling the QuantumLeap V3, the AI processor that redefines speed. 'It's a game-changer for creative professionals,' says CEO Jane Doe. Available Q4.

[Source: eng-kb] The QuantumLeap V3 processor utilizes a 3nm process node and features a dedicated AI accelerator core with 128 tensor units. API endpoint `/api/v3/status` provides real-time thermal throttling data.

[Source: eng-kb] Firmware update v2.1 for the Aura Smart Ring optimizes the photoplethysmography (PPG) sensor algorithm for more accurate sleep stage detection. The update is deployed via the mobile app.

[Source: mkt-kb] Product Page: The Aura Smart Ring is your personal wellness companion. Crafted from aerospace-grade titanium, it empowers you to unlock your full potential by understanding your body's signals.

Final Answer:
Yes, the QuantumLeap V3 is described as a "game-changer for creative professionals" by CEO Jane Doe, hi

### Running the Sharded RAG System

In [9]:
print("--- [SHARDED RAG] Starting run... ---")
start_time = time.time()
inputs = {"question": user_query}
sharded_result = None
for output in sharded_rag_app.stream(inputs, stream_mode="values"):
    sharded_result = output
sharded_time = time.time() - start_time

retrieved_context_sharded = format_docs(sharded_result['retrieved_docs'])
sharded_answer = sharded_result['final_answer']

print("="*60)
print("                 SHARDED RAG SYSTEM OUTPUT")
print("="*60 + "\n")
print("Retrieved Context:")
print(retrieved_context_sharded + "\n")
print("Final Answer:")
print(sharded_answer)

--- [SHARDED RAG] Starting run... ---
--- [Meta-Retriever] Scattering query to Engineering and Marketing shards in parallel... ---
--- [Meta-Retriever] Gathered 4 unique documents from 2 shards. ---
--- [Generator] Synthesizing final answer... ---
                 SHARDED RAG SYSTEM OUTPUT

Retrieved Context:
[Source: eng-kb] The QuantumLeap V3 processor utilizes a 3nm process node and features a dedicated AI accelerator core with 128 tensor units. API endpoint `/api/v3/status` provides real-time thermal throttling data.

[Source: eng-kb] Firmware update v2.1 for the Aura Smart Ring optimizes the photoplethysmography (PPG) sensor algorithm for more accurate sleep stage detection. The update is deployed via the mobile app.

[Source: mkt-kb] Press Release: Unveiling the QuantumLeap V3, the AI processor that redefines speed. 'It's a game-changer for creative professionals,' says CEO Jane Doe. Available Q4.

[Source: mkt-kb] Product Page: The Aura Smart Ring is your personal wellness compa

### Analysis

In [11]:
# The query contains strong marketing keywords ('game-changer') and a specific technical question ('API status endpoint').
user_query = "I heard the new QuantumLeap V3 is a 'game-changer for creative professionals'. Can you tell me more about it, and is there an API endpoint to check its status?"

# --- Run Monolithic RAG ---
print("--- [MONOLITHIC RAG] Starting run... ---")
start_time = time.time()

monolithic_answer = monolithic_rag_chain.invoke(user_query)
monolithic_time = time.time() - start_time

# --- Run Sharded RAG ---
print("\n--- [SHARDED RAG] Starting run... ---")
start_time = time.time()
inputs = {"question": user_query}
sharded_rag_app.invoke(inputs)
sharded_time = time.time() - start_time

# --- Final Analysis ---
print("\n" + "="*60)
print("                      ACCURACY & RECALL ANALYSIS")
print("="*60 + "\n")


print("="*60)
print("                      PERFORMANCE ANALYSIS")
print("="*60 + "\n")
print(f"Monolithic RAG Total Time: {monolithic_time:.2f} seconds")
print(f"Sharded RAG Total Time: {sharded_time:.2f} seconds\n")
latency_improvement = ((monolithic_time - sharded_time) / monolithic_time) * 100
print(f"Latency Improvement: {latency_improvement:.0f}%\n")

--- [MONOLITHIC RAG] Starting run... ---
--- [Monolithic Retriever] Searching large index... (simulating high latency) ---

--- [SHARDED RAG] Starting run... ---
--- [Meta-Retriever] Scattering query to Engineering and Marketing shards in parallel... ---
--- [Meta-Retriever] Gathered 4 unique documents from 2 shards. ---
--- [Generator] Synthesizing final answer... ---

                      ACCURACY & RECALL ANALYSIS

                      PERFORMANCE ANALYSIS

Monolithic RAG Total Time: 5.39 seconds
Sharded RAG Total Time: 3.00 seconds

Latency Improvement: 44%

