# Let's go PRO!

Advanced RAG Techniques!

Let's start by digging into ingest:

1. No LangChain! Just native for maximum flexibility
2. Let's use an LLM to divide up chunks in a sensible way
3. Let's use the best chunk size and encoder from yesterday
4. Let's also have the LLM rewrite chunks in a way that's most useful ("document pre-processing")

In [1]:
from pathlib import Path
from openai import OpenAI
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from chromadb import PersistentClient
from tqdm import tqdm
from litellm import completion
import numpy as np
from sklearn.manifold import TSNE
import plotly.graph_objects as go

load_dotenv(override=True)

# FIXED: added provider prefix
MODEL = "nvidia_nim/meta/llama-3.1-8b-instruct"




DB_NAME = "preprocessed_db"
collection_name = "docs"
embedding_model = "text-embedding-3-large"
KNOWLEDGE_BASE_PATH = Path("knowledge-base")
AVERAGE_CHUNK_SIZE = 400

#openai = OpenAI()
# 4) quick test call
messages = [{"role":"user", "content":"Say hi and identify the model used in one short sentence."}]

resp = completion(model=MODEL, messages=messages)

print("Status:", resp.choices[0].message.content)


Status: Hi! I'm a large language model, specifically based on the transformer architecture.


In [2]:
import os
from dotenv import load_dotenv
load_dotenv(override=True)
print("NVIDIA_API_KEY:", bool(os.getenv("NVIDIA_API_KEY")))  # prints True if present, False if missing


NVIDIA_API_KEY: True


In [3]:
# Inspired by LangChain's Document - let's have something similar

class Result(BaseModel):
    page_content: str
    metadata: dict

In [4]:
# A class to perfectly represent a chunk

class Chunk(BaseModel):
    headline: str = Field(description="A brief heading for this chunk, typically a few words, that is most likely to be surfaced in a query")
    summary: str = Field(description="A few sentences summarizing the content of this chunk to answer common questions")
    original_text: str = Field(description="The original text of this chunk from the provided document, exactly as is, not changed in any way")

    def as_result(self, document):
        metadata = {"source": document["source"], "type": document["type"]}
        return Result(page_content=self.headline + "\n\n" + self.summary + "\n\n" + self.original_text,metadata=metadata)


class Chunks(BaseModel):
    chunks: list[Chunk]

## Three steps:

1. Fetch documents from the knowledge base, like LangChain did
2. Call an LLM to turn documents into Chunks
3. Store the Chunks in Chroma

That's it!

### Let's start with Step 1

In [5]:
def fetch_documents():
    """A homemade version of the LangChain DirectoryLoader"""

    documents = []

    for folder in KNOWLEDGE_BASE_PATH.iterdir():
        doc_type = folder.name
        for file in folder.rglob("*.md"):
            with open(file, "r", encoding="utf-8") as f:
                documents.append({"type": doc_type, "source": file.as_posix(), "text": f.read()})

    print(f"Loaded {len(documents)} documents")
    return documents

In [6]:
documents = fetch_documents()

Loaded 30 documents


### Donezo! On to Step 2 - make the chunks

In [7]:
def make_prompt(document):
    how_many = (len(document["text"]) // AVERAGE_CHUNK_SIZE) + 1
    return f"""
You are an expert RAG pre-processor whose job is to split documents into high-quality, overlapping chunks for Ayush's Personal Knowledge Base.

**Owner:** Ayush Tyagi  
**Document Type:** {document["type"]}  
**Source Location:** {document["source"]}

---

### ðŸ“Œ Your Goal
Split the document into clean, retrieval-optimized chunks that will be used by Ayushâ€™s personal chatbot to answer questions about his life, projects, interests, and background.

### ðŸ“Œ Required Guidelines
1. **Cover the entire document. Do not omit any content.**
2. The document should ideally be split into **~{how_many} chunks** (but choose more or fewer if needed for quality).
3. Use **overlapping chunks**:
   - About **25% overlap**, OR
   - About **50 words** of shared text.
4. Each chunk must include:
   - **Chunk Title / Headline**
   - **Chunk Summary** (2â€“4 sentences)
   - **Chunk Text** (the exact original text of the chunk)

### ðŸ“Œ Quality Requirements
- Chunks must be coherent and readable.
- Do not cut sentences in unnatural places.
- Ensure important sections appear in multiple chunks via overlap.
- Maintain the original order of the document.
- Do NOT invent or alter information. Use only the text given.

---

### ðŸ“„ Document to Chunk:

{document["text"]}

---

### ðŸ“Œ Output Format (IMPORTANT)

Produce the chunks in the following structure:

CHUNK 1  
Headline: <title>  
Summary: <summary>  
Text: <original chunked text>

CHUNK 2  
Headline: <title>  
Summary: <summary>  
Text: <original chunked text>

(Continue for all chunks)

---

Now create the chunks.
"""


In [8]:
print(make_prompt(documents[9]))


You are an expert RAG pre-processor whose job is to split documents into high-quality, overlapping chunks for Ayush's Personal Knowledge Base.

**Owner:** Ayush Tyagi  
**Document Type:** extras  
**Source Location:** knowledge-base/extras/achievements.md

---

### ðŸ“Œ Your Goal
Split the document into clean, retrieval-optimized chunks that will be used by Ayushâ€™s personal chatbot to answer questions about his life, projects, interests, and background.

### ðŸ“Œ Required Guidelines
1. **Cover the entire document. Do not omit any content.**
2. The document should ideally be split into **~6 chunks** (but choose more or fewer if needed for quality).
3. Use **overlapping chunks**:
   - About **25% overlap**, OR
   - About **50 words** of shared text.
4. Each chunk must include:
   - **Chunk Title / Headline**
   - **Chunk Summary** (2â€“4 sentences)
   - **Chunk Text** (the exact original text of the chunk)

### ðŸ“Œ Quality Requirements
- Chunks must be coherent and readable.
- Do not

In [9]:
def make_messages(document): # call thsi and put in lisrt of dics
    return [
        {"role": "user", "content": make_prompt(document)},
    ]

In [10]:
make_messages(documents[9])

[{'role': 'user',
  'content': "\nYou are an expert RAG pre-processor whose job is to split documents into high-quality, overlapping chunks for Ayush's Personal Knowledge Base.\n\n**Owner:** Ayush Tyagi  \n**Document Type:** extras  \n**Source Location:** knowledge-base/extras/achievements.md\n\n---\n\n### ðŸ“Œ Your Goal\nSplit the document into clean, retrieval-optimized chunks that will be used by Ayushâ€™s personal chatbot to answer questions about his life, projects, interests, and background.\n\n### ðŸ“Œ Required Guidelines\n1. **Cover the entire document. Do not omit any content.**\n2. The document should ideally be split into **~6 chunks** (but choose more or fewer if needed for quality).\n3. Use **overlapping chunks**:\n   - About **25% overlap**, OR\n   - About **50 words** of shared text.\n4. Each chunk must include:\n   - **Chunk Title / Headline**\n   - **Chunk Summary** (2â€“4 sentences)\n   - **Chunk Text** (the exact original text of the chunk)\n\n### ðŸ“Œ Quality Requir

In [16]:
import asyncio
from litellm import acompletion

async def process_document_async(document):
    messages = [
        {
            "role": "system",
            "content": """
You MUST respond ONLY with valid JSON in the following exact format:

{
  "chunks": [
    {
      "headline": "...",
      "summary": "...",
      "text": "..."
    }
  ]
}

No explanations, no extra text, no formatting outside the JSON.
If the content is too long, split it cleanly into multiple chunks.
"""
        },
        *make_messages(document)
    ]

    response = await acompletion(
        model=MODEL,
        messages=messages,
        response_format={"type": "json_object"}
    )

    reply = response.choices[0].message.content
    return Chunks.model_validate_json(reply).chunks


In [17]:
from litellm import completion
import os

model = "moonshot/kimi-k2"

try:
    print("Testing MOONSHOT_API_KEY...")
    os.environ["MOONSHOT_API_KEY"] = "nvapi-JHaTD3YaBlSBBmJQ-V_LOM8fAnOWXtDoML6bypoQCRVYm"
    resp = completion(model=model, messages=[{"role": "user", "content": "hello"}])
    print("MOONSHOT_API_KEY WORKS!")
    print(resp.choices[0].message.content)
except Exception as e:
    print("FAILED:", e)

print("-" * 40)

try:
    print("Testing kimi_k2_api_key...")
    os.environ["MOONSHOT_API_KEY"] = "nvapi-JHaTD3YaBlSBBmJQ-V_LOM8fAnOWXtDoML6bypoGHYItOnaRqGOw5lYm"
    resp = completion(model=model, messages=[{"role": "user", "content": "hello"}])
    print("kimi_k2_api_key WORKS!")
    print(resp.choices[0].message.content)
except Exception as e:
    print("FAILED:", e)


Testing MOONSHOT_API_KEY...

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m

FAILED: litellm.AuthenticationError: AuthenticationError: MoonshotException - Invalid Authentication
----------------------------------------
Testing kimi_k2_api_key...

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m

FAILED: litellm.AuthenticationError: AuthenticationError: MoonshotException - Invalid Authentication


In [18]:
async def create_chunks_parallel(documents):
    tasks = [process_document_async(doc) for doc in documents]
    results = await asyncio.gather(*tasks)

    # flatten
    all_chunks = [chunk for doc_chunks in results for chunk in doc_chunks]
    return all_chunks


In [None]:
# process_document(documents[0])

In [None]:
# def create_chunks(documents):
#     chunks = []
#     for doc in tqdm(documents):
#         chunks.extend(process_document(doc))
#     return chunks

In [None]:
#chunks = create_chunks(documents)

In [19]:
chunks = await create_chunks_parallel(documents)
chunks[:2]



[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.



  return compile(source, filename, mode, flags,


RateLimitError: litellm.RateLimitError: RateLimitError: Nvidia_nimException - Error code: 429 - {'status': 429, 'title': 'Too Many Requests'}


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.




[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mGive Fee

In [None]:
print(len(chunks))

### Well that was easy! If a bit slow.

In the python module version, I sneakily use the multi-processing Pool to run this in parallel,
but if you get a Rate Limit Error you can turn this off in the code.

### Finally, Step 3 - save the embeddings

In [None]:
def create_embeddings(chunks):
    chroma = PersistentClient(path=DB_NAME)
    if collection_name in [c.name for c in chroma.list_collections()]:
        chroma.delete_collection(collection_name)

    texts = [chunk.page_content for chunk in chunks]
    emb = openai.embeddings.create(model=embedding_model, input=texts).data
    vectors = [e.embedding for e in emb]

    collection = chroma.get_or_create_collection(collection_name)

    ids = [str(i) for i in range(len(chunks))]
    metas = [chunk.metadata for chunk in chunks]

    collection.add(ids=ids, embeddings=vectors, documents=texts, metadatas=metas)
    print(f"Vectorstore created with {collection.count()} documents")

In [None]:
create_embeddings(chunks)

# Nothing more to do here... right?

Wait! Didja think I'd forget??

In [None]:
chroma = PersistentClient(path=DB_NAME)
collection = chroma.get_or_create_collection(collection_name)
result = collection.get(include=['embeddings', 'documents', 'metadatas'])
vectors = np.array(result['embeddings'])
documents = result['documents']
metadatas = result['metadatas']
doc_types = [metadata['type'] for metadata in metadatas]
colors = [['blue', 'green', 'red', 'orange'][['products', 'employees', 'contracts', 'company'].index(t)] for t in doc_types]

In [None]:
tsne = TSNE(n_components=2, random_state=42)
reduced_vectors = tsne.fit_transform(vectors)

# Create the 2D scatter plot
fig = go.Figure(data=[go.Scatter(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    mode='markers',
    marker=dict(size=5, color=colors, opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(title='2D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x',yaxis_title='y'),
    width=800,
    height=600,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show()

In [None]:
tsne = TSNE(n_components=3, random_state=42)
reduced_vectors = tsne.fit_transform(vectors)

# Create the 3D scatter plot
fig = go.Figure(data=[go.Scatter3d(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    z=reduced_vectors[:, 2],
    mode='markers',
    marker=dict(size=5, color=colors, opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='3D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x', yaxis_title='y', zaxis_title='z'),
    width=900,
    height=700,
    margin=dict(r=10, b=10, l=10, t=40)
)

fig.show()

## And now - let's build an Advanced RAG!

We will use these techniques:

1. Reranking - reorder the rank results
2. Query re-writing

In [None]:
class RankOrder(BaseModel): #pydantic object
    order: list[int] = Field(
        description="The order of relevance of chunks, from most relevant to least relevant, by chunk id number"
    )

In [None]:
def rerank(question, chunks):
    system_prompt = """
You are a document re-ranker.
You are provided with a question and a list of relevant chunks of text from a query of a knowledge base.
The chunks are provided in the order they were retrieved; this should be approximately ordered by relevance, but you may be able to improve on that.
You must rank order the provided chunks by relevance to the question, with the most relevant chunk first.
Reply only with the list of ranked chunk ids, nothing else. Include all the chunk ids you are provided with, reranked.
"""
    user_prompt = f"The user has asked the following question:\n\n{question}\n\nOrder all the chunks of text by relevance to the question, from most relevant to least relevant. Include all the chunk ids you are provided with, reranked.\n\n"
    user_prompt += "Here are the chunks:\n\n"
    for index, chunk in enumerate(chunks):
        user_prompt += f"# CHUNK ID: {index + 1}:\n\n{chunk.page_content}\n\n" #kisting uou chunks with list id 
    user_prompt += "Reply only with the list of ranked chunk ids, nothing else."
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ]
    response = completion(model=MODEL, messages=messages, response_format=RankOrder)
    reply = response.choices[0].message.content
    order = RankOrder.model_validate_json(reply).order
    print(order)
    return [chunks[i - 1] for i in order]

In [None]:
RETRIEVAL_K = 10

def fetch_context_unranked(question):
    query = openai.embeddings.create(model=embedding_model, input=[question]).data[0].embedding
    results = collection.query(query_embeddings=[query], n_results=RETRIEVAL_K)
    chunks = []
    for result in zip(results["documents"][0], results["metadatas"][0]):
        chunks.append(Result(page_content=result[0], metadata=result[1]))
    return chunks

In [None]:
question = "Who won the IIOTY award?"
chunks = fetch_context_unranked(question)

In [None]:
for chunk in chunks:
    print(chunk.page_content[:15]+"...")

In [None]:
reranked = rerank(question, chunks)

In [None]:
for chunk in reranked:
    print(chunk.page_content[:15]+"...")

In [None]:
question = "Who went to Manchester University?"
RETRIEVAL_K = 20
chunks = fetch_context_unranked(question)
for index, c in enumerate(chunks):
    if "manchester" in c.page_content.lower():
        print(index)

In [None]:
reranked = rerank(question, chunks)

In [None]:
for index, c in enumerate(reranked):
    if "manchester" in c.page_content.lower():
        print(index)

In [None]:
reranked[0].page_content

In [None]:
def fetch_context(question):
    chunks = fetch_context_unranked(question)
    return rerank(question, chunks)

In [None]:
SYSTEM_PROMPT = """
You are a knowledgeable, friendly assistant representing the company Insurellm.
You are chatting with a user about Insurellm.
Your answer will be evaluated for accuracy, relevance and completeness, so make sure it only answers the question and fully answers it.
If you don't know the answer, say so.
For context, here are specific extracts from the Knowledge Base that might be directly relevant to the user's question:
{context}

With this context, please answer the user's question. Be accurate, relevant and complete.
"""

In [None]:
# In the context, include the source of the chunk

def make_rag_messages(question, history, chunks):
    context = "\n\n".join(f"Extract from {chunk.metadata['source']}:\n{chunk.page_content}" for chunk in chunks)
    system_prompt = SYSTEM_PROMPT.format(context=context)
    return [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": question}]

In [None]:
def rewrite_query(question, history=[]):
    """Rewrite the user's question to be a more specific question that is more likely to surface relevant content in the Knowledge Base."""
    message = f"""
You are in a conversation with a user, answering questions about the company Insurellm.
You are about to look up information in a Knowledge Base to answer the user's question.

This is the history of your conversation so far with the user:
{history}

And this is the user's current question:
{question}

Respond only with a single, refined question that you will use to search the Knowledge Base.
It should be a VERY short specific question most likely to surface content. Focus on the question details.
Don't mention the company name unless it's a general question about the company.
IMPORTANT: Respond ONLY with the knowledgebase query, nothing else.
"""
    response = completion(model=MODEL, messages=[{"role": "system", "content": message}])
    return response.choices[0].message.content

In [None]:
rewrite_query("Who won the IIOTY award?", [])

In [None]:
def answer_question(question: str, history: list[dict] = []) -> tuple[str, list]:
    """
    Answer a question using RAG and return the answer and the retrieved context
    """
    query = rewrite_query(question, history)
    print(query)
    chunks = fetch_context(query)
    messages = make_rag_messages(question, history, chunks)
    response = completion(model=MODEL, messages=messages)
    return response.choices[0].message.content, chunks

In [None]:
answer_question("Who won the IIOTY award?", [])

In [None]:
answer_question("Who went to Manchester University?", [])