# Corrective RAG Implementation

## Problem Statement:  
Traditional RAG systems fail when:
- Retrieved documents are irrelevant
- Knowledge gaps exist in the local corpus
- Answers require multi-document synthesis

## GenAI Solution:  
This notebook implements a self-correcting RAG system that:
1. Dynamically evaluates retrieval quality
2. Augments with web search when needed
3. Focuses on key facts through decomposition
4. Provides source-aware responses

## 1. Importing libraries

In [199]:
import os
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core import StorageContext
from qdrant_client import QdrantClient
from duckduckgo_search import DDGS
from google import genai
from llama_index.core.prompts import PromptTemplate
from typing import TypedDict, Literal
from google.genai import types
import json
from dotenv import load_dotenv
load_dotenv()

True

In [200]:
genai.__version__

'1.7.0'

In [201]:
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

client = genai.Client(api_key=GOOGLE_API_KEY)

In [202]:
DATA_DIR = 'data'
COLLECTION_NAME = "coffee-recipes"

## 1. Document Processing Pipeline

LlamaIndex will go to the `data` folder, load the PDFs and convert it to indexes which are basically vector embeddings.

- `VectorStoreIndex`: responsible for converting all the text into vectors and it'll index those vectors
- `SimpleDirectoryReader`: used to read from a directory

In [203]:
# load PDF 
reader = SimpleDirectoryReader(DATA_DIR)
documents = reader.load_data()

In [204]:
#contains metadata of the PDF
documents

[Document(id_='2a4ce260-26e6-4d9e-9dc9-a14617ed3d5e', embedding=None, metadata={'page_label': '1', 'file_name': 'Brochure_Basic-Creative-coffee-recipes.pdf', 'file_path': '/Users/aasth/Desktop/Linkedin Post/corrective_rag/data/Brochure_Basic-Creative-coffee-recipes.pdf', 'file_type': 'application/pdf', 'file_size': 926043, 'creation_date': '2025-04-20', 'last_modified_date': '2025-04-03'}, excluded_embed_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], excluded_llm_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], relationships={}, metadata_template='{key}: {value}', metadata_separator='\n', text_resource=MediaResource(embeddings=None, data=None, text='Ultimate coffee pleasure\nCOFFEE ACADEMY\n', path=None, url=None, mimetype=None), image_resource=None, audio_resource=None, video_resource=None, text_template='{metadata_str}\n\n{content}'),
 Document(id_='

## 2. Vector Knowledge Base Setup

In [205]:
#Loads a embedding model to convert text into vector format for similarity search
embed_model = FastEmbedEmbedding(model_name="BAAI/bge-base-en-v1.5")

#Initializes an in-memory Qdrant database to store and search vector embedding
qdrant_client = QdrantClient(location=":memory:")

#Connects the Qdrant client to LlamaIndex as a vector store using the specified collection name
vector_store = QdrantVectorStore(client=qdrant_client, collection_name=COLLECTION_NAME)

#Creates a storage context that LlamaIndex will use to manage and store indexed data.
#Tells llama-index how/where to store and retrieve data.
storage_context = StorageContext.from_defaults(vector_store=vector_store)

In [206]:
# Creates a vector index from the loaded documents by embedding them and storing them in the Qdrant vector store
index = VectorStoreIndex.from_documents(documents, storage_context=storage_context, embed_model=embed_model)

#Sets up a retriever to fetch the top 4 most similar documents from the index for any given query
retriever = VectorIndexRetriever(index=index, similarity_top_k=4)

## 3. Intelligent Query Processing

Enhances queries for better retrieval 

In [207]:
DEFAULT_TRANSFORM_QUERY_TEMPLATE = PromptTemplate(
    template="""Your task is to refine a query to ensure it is highly effective for retrieving relevant search results.

    Analyze the given input to grasp the core semantic intent or meaning.

    Original Query:
    -------
    {query_str}
    -------
    Your goal is to rephrase or enhance this query to improve its search performance. Ensure the revised query is concise and directly aligned with the intended search objective.

    Respond with the optimized query only:"""
)


In [208]:
def transform_query(original_query):
    prompt = DEFAULT_TRANSFORM_QUERY_TEMPLATE.format(query_str=original_query)
    response = client.models.generate_content(
        model= "gemini-1.5-flash-001",
        contents=prompt
    )
    return response.text.strip()


## 4. Key Fact Extraction

Extracts key facts from the context and that will be used later to answer the question

In [209]:
KEY_FACTS_PROMPT = """
Extract the most relevant facts from the context that directly answer the query.
Focus on concrete information and be faithful to the source.

Query: {query}

Context:
{context}

Return only the most relevant facts as bullet points:
"""

In [210]:
def extract_key_facts(context, query):
    prompt = KEY_FACTS_PROMPT.format(
        context=context,
        query=query
    )
    response = client.models.generate_content(
        model="gemini-1.5-flash-001",
        config=types.GenerateContentConfig(
            temperature=0.0
        ),        
        contents=prompt
    )
    return response.text.strip()

## 4. Self-Correcting Retrieval Flow

It evaluated whether a document is relevant to the query using a yes/no mechanism. This will later be used to:
- Retrieves top 4 documents
- Scores each for query relevance
- Confidence = (relevant docs) / (total retrieved)
- Fallback to web if confidence < 25%

In [211]:
DEFAULT_RELEVANCY_PROMPT_TEMPLATE = """
Analyze whether this document excerpt contains ANY information that could help answer the query. 
Respond ONLY with "yes" or "no".

Document Excerpt:
{context_str}

Query:
{query_str}
"""


In [212]:
def evaluate_retrieval_quality(nodes, query):
    relevant_nodes = []
    for node in nodes:
        prompt = DEFAULT_RELEVANCY_PROMPT_TEMPLATE.format(
            context_str=node.text[:2000],  # Increased length limit
            query_str=query  # Use original query, not transformed
        )
        response = client.models.generate_content(
            model="gemini-1.5-flash-001",
            contents=prompt
        )
        if response.text.strip().lower() == "yes":
            relevant_nodes.append(node)
    return relevant_nodes

## 5. Knowledge Augmentation

In case vector database knowledge is insufficient, we will use web search as a fallback option to enhance the final answer

In [213]:
def web_search(query):
    with DDGS() as ddgs:
        results = ddgs.text(query)
        return [r['body'] for r in results if 'body' in r][:3]

## 6. Response Generation

- Define a response schema
- Use System and CoT prompting and ask mode to return a JSON format
- Retrieve relevant documents
- Evaluate relevance using original query
- If document is relevant, create key facts from documents and generate answer from the key facts
- If document is irrelevant(i.e. low confidence score), use web search 
- Return answer in JSON format

In [214]:
class Answer(TypedDict):
    answer: str
    reasoning: str
    source_name: Literal["vector database", "web search"]
    citations: str

In [215]:
#system and CoT prompting with JSON format
STRUCTURED_ANSWER_PROMPT = """
You are a helpful assistant answering the user's question using the documents below. Respond in JSON format:

Documents:
{context}

User Question:
{query}

Example Response:
{{
  "answer": "your answer here",
  "reasoning": "step-by-step thought process here",
  "source_name": "vector database or web search",
  "citations": "Quoted content from web search or key facts"
}}

Let's think step by step.
"""


In [216]:
def answer_query(query):
    # Step 1: Retrieve and evaluate nodes
    transformed_query = transform_query(query)
    nodes = retriever.retrieve(transformed_query)
    
    # Print nodes for debugging
    # for i, node in enumerate(nodes):
    #     print(f"\n--- Node {i+1} ---\n{node.text[:500]}...")
    
    # Evaluate relevance using original query
    relevant_nodes = evaluate_retrieval_quality(nodes, query)
    confidence = len(relevant_nodes) / len(nodes) if nodes else 0.0
    print(f"Confidence score: {confidence}")
    
    # Step 2: Process relevant nodes
    if relevant_nodes:
        # Combine relevant nodes with metadata
        combined_context = "\n\n".join(
            [f"[Document {i+1}]\n{n.text}" 
             for i, n in enumerate(relevant_nodes)]
        )
        
        # Extract key facts first
        key_facts = extract_key_facts(combined_context, query)
        print(f"\nKey Facts:\n{key_facts}")
        
        # Generate answer from key facts
        final_prompt = STRUCTURED_ANSWER_PROMPT.format(
            context=f"[Key Facts]\n{key_facts}",
            query=query
        )
        response = client.models.generate_content(
            model="gemini-1.5-flash-001",
            config=types.GenerateContentConfig(
                response_mime_type='application/json',
                response_schema=Answer,
                temperature=0.0
            ),
            contents=final_prompt
        )
        return json.loads(response.text), key_facts
    
    # Step 3: Web fallback only if no relevant nodes
    web_results = web_search(query)
    web_context = "\n".join([f"[Web {i+1}]\n{r}" for i, r in enumerate(web_results)])
    
    final_prompt = STRUCTURED_ANSWER_PROMPT.format(
        context=f"[Web Results]\n{web_context}",
        query=query
    )
    response = client.models.generate_content(
        model="gemini-1.5-flash-001",
        config=types.GenerateContentConfig(
            response_mime_type='application/json',
            response_schema=Answer,
            temperature=0.0
        ),
        contents=final_prompt
    )
    return json.loads(response.text), web_context

In [223]:
question = "How is Flat white different from cappuccino?"
final_response,relevant_text = answer_query(question)
print("\nFinal Response:")
print(json.dumps(final_response, indent=4))

Confidence score: 0.75

Key Facts:
- Flat white is prepared by adding coffee to the cup first, followed by warm milk.
- The milk foam is prepared in the final stage and lies under the crema, taking on its color and taste. 
- Cappuccino is prepared by adding hot milk and milk foam first, then the coffee flows through the milk foam at the top.

Final Response:
{
    "answer": "The main difference between a flat white and a cappuccino is the order in which the ingredients are added. In a flat white, the coffee is added first, followed by warm milk and then a thin layer of milk foam. In a cappuccino, the hot milk and milk foam are added first, and then the coffee is poured on top, creating a layer of coffee on top of the milk foam.",
    "reasoning": "The key facts provide information about the order of ingredients in both drinks. The flat white has coffee added first, followed by warm milk and then milk foam. The cappuccino has hot milk and milk foam added first, followed by coffee on top

## 7. Model Evaluation using LLM-as-A-Judge architecture

Defined an evaluation prompt which selects a score from 1-5 based on:
- Groundedness (1-5)
- Completeness (1-5)
- Fluency (1-5)

In [218]:
import enum

# Define the evaluation prompt for RAG
# role prompting
RAG_EVAL_PROMPT = """\
# Instruction
You are an expert evaluator for Retrieval-Augmented Generation (RAG) systems. Your task is to evaluate the quality of the AI-generated response based on the given user prompt and the retrieved context.

You will assess how well the response:
- Follows the instructions in the user prompt
- Is grounded in the context
- Is complete and provides a helpful answer
- Is fluent and easy to read

Please give step-by-step reasoning and assign a score using the Rating Rubric.

# Evaluation
## Metric Definition
You will assess question answering quality in a RAG setting, where the model is expected to answer the user's query using only the provided context. Responses should be relevant, well-structured, and avoid hallucinations.

## Criteria
- Instruction Following: Does the response fulfill the prompt's requirements (e.g., format, answer type, word limits)?
- Groundedness: Does the response rely solely on the context provided? No outside or hallucinated info?
- Completeness: Does it fully and correctly answer the user query using the context?
- Fluency: Is the response well-written, clear, and grammatically correct?

## Rating Rubric
5: (Very good) Follows instructions, grounded, complete, and fluent.
4: (Good) Mostly grounded and complete, minor issues in fluency or relevance.
3: (Fair) Partially complete, some hallucination or vague phrasing, moderate fluency issues.
2: (Bad) Lacks completeness or relevance; possible hallucinations or missed instructions.
1: (Very bad) Hallucinated, off-topic, ignores prompt/context.

## Evaluation Steps
STEP 1: Review the prompt, context, and response.
STEP 2: Assess the 4 criteria.
STEP 3: Justify your score.
STEP 4: Select a score from 5, 4, 3, 2, or 1.

# User Inputs and AI-generated Response
## User Prompt
{prompt}

## Retrieved Context
{context}

## AI-generated Response
{response}
"""


def eval_rag_response(prompt, context, ai_response):
    """Evaluate a RAG-generated response using Gemini chat."""
    chat = client.chats.create(model='gemini-2.0-flash')

    # Construct the full evaluation prompt
    full_prompt = RAG_EVAL_PROMPT.format(prompt=prompt, context=context, response=ai_response)

    # Get explanation and verbose score
    response = chat.send_message(message=full_prompt)
    verbose_eval = response.text


    return verbose_eval


In [219]:
text_eval = eval_rag_response(
    prompt=question,
    context=relevant_text,
    ai_response=final_response['answer']
)

print(text_eval)   


STEP 1: Review the prompt, context, and response.
The prompt asks for the difference between a flat white and a cappuccino. The context provides information about how each drink is prepared, specifically the order of ingredients. The response summarizes these differences.

STEP 2: Assess the 4 criteria.
- Instruction Following: The response directly answers the question.
- Groundedness: The response is based entirely on the provided context.
- Completeness: The response provides a complete answer based on the information given in the context.
- Fluency: The response is fluent and easy to read.

STEP 3: Justify your score.
The response is well-written, accurately answers the prompt based on the provided context, and follows instructions. Therefore, it deserves a high score.

STEP 4: Select a score from 5, 4, 3, 2, or 1.
5



## 8. More examples of queries passed to CRAG

In [222]:
final_response,relevant_text = answer_query("What are the brewing processes used by JURA")
print("\nFinal Response:")
print(json.dumps(final_response, indent=4))

Confidence score: 0.75

Key Facts:
- JURA automatic speciality coffee machines use three different brewing processes: Standard brewing process, Pulse Extraction Process (P.E.P.®), and One-Touch Lungo function.
- The Standard brewing process is used for classic coffee drinks with medium cup sizes, such as café crème.
- The I.P .B.A.S.© (Intelligent Pre-Brew Aroma System) makes classic speciality coffees highly aromatic.
- P.E.P.® is used when preparing short speciality coffees such as ristretto, espresso, macchiato and cortado.
- P.E.P.® pulses the water through the ground coffee at the optimum frequency for the amount of water.
- The One-Touch Lungo function is ideal for preparing long speciality coffees.
- The One-Touch Lungo function prevents over-extraction by mixing hot water with long coffees during the preparation phase.
- The Cold Extraction Process is used for preparing genuine cold brew specialities.
- The Cold Extraction Process uses cold water, pulsed at high pressure, to ex

In [225]:
final_response,relevant_text = answer_query("Who is the current president of USA in 2025")
print("\nFinal Response:")
print(json.dumps(final_response, indent=4))

Confidence score: 0.0

Final Response:
{
    "answer": "Donald J. Trump is the current president of the United States in 2025.",
    "reasoning": "The provided documents state that Donald Trump was sworn in as the 47th President of the United States on January 20, 2025.  Since he took office in 2025, he is the current president.",
    "source_name": "web search",
    "citations": "Donald Trump became the 47th president of the United States on January 20, 2025 (Credit: The Trump White House, Public Domain/ Wikimedia Commons) On January 20, 2025, Donald Trump was sworn in as the 47th President of the United States. He is only the second President to serve non-consecutive terms since Grover Cleveland in 1893. At 78 years ..."
}
