crag.svg

# Package Installation and Imports

In [None]:
# Install required packages
!pip install langchain langchain-openai faiss-cpu pypdf langchain_community ddgs

In [3]:
import os
import sys
from langchain.vectorstores import FAISS
from langchain.document_loaders import PyPDFLoader
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.text_splitter import RecursiveCharacterTextSplitter
from ddgs import DDGS
from typing import List, Dict, Any, Tuple
import json
from ddgs import DDGS
from dotenv import load_dotenv

# Load environment variables from a .env file
load_dotenv()

# Set the OpenAI API key environment variable
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')

In [4]:
# Download required data files
import os
os.makedirs('data', exist_ok=True)

# Download the PDF document used in this notebook
!wget -O data/Understanding_Climate_Change.pdf https://raw.githubusercontent.com/NirDiamant/RAG_TECHNIQUES/main/data/Understanding_Climate_Change.pdf

path = "data/Understanding_Climate_Change.pdf"

--2025-07-14 14:31:14--  https://raw.githubusercontent.com/NirDiamant/RAG_TECHNIQUES/main/data/Understanding_Climate_Change.pdf
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 206372 (202K) [application/octet-stream]
Saving to: ‘data/Understanding_Climate_Change.pdf’


2025-07-14 14:31:14 (5.77 MB/s) - ‘data/Understanding_Climate_Change.pdf’ saved [206372/206372]



### Create a vector store

In [5]:
def replace_t_with_space(list_of_documents):
    """
    Replaces all tab characters ('\t') with spaces in the page content of each document

    Args:
        list_of_documents: A list of document objects, each with a 'page_content' attribute.

    Returns:
        The modified list of documents with tab characters replaced by spaces.
    """

    for doc in list_of_documents:
        doc.page_content = doc.page_content.replace('\t', ' ')  # Replace tabs with spaces
    return list_of_documents

def encode_pdf(path, chunk_size=600, chunk_overlap=200):
    # Load PDF documents
    loader = PyPDFLoader(path)
    documents = loader.load()

    # Split documents into chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
    )
    texts = text_splitter.split_documents(documents)
    cleaned_texts = replace_t_with_space(texts)

    # Create embeddings and vector store
    embeddings = OpenAIEmbeddings()
    vectorstore = FAISS.from_documents(cleaned_texts, embeddings)

    return vectorstore

vectorstore = encode_pdf(path)

### Initialize OpenAI language model and search tool

In [6]:
llm = ChatOpenAI(model="gpt-4o-mini", max_tokens=1000, temperature=0)
search = DDGS()

### Define retrieval evaluator, knowledge refinement and query rewriter llm chains

In [7]:
# Retrieval Evaluator
class RetrievalEvaluatorInput(BaseModel):
    relevance_score: float = Field(..., description="The relevance score of the document to the query. the score should be between 0 and 1.")
def retrieval_evaluator(query: str, document: str) -> float:
    prompt = PromptTemplate(
        input_variables=["query", "document"],
        template="On a scale from 0 to 1, how relevant is the following document to the query? Query: {query}\nDocument: {document}\nRelevance score:"
    )
    chain = prompt | llm.with_structured_output(RetrievalEvaluatorInput)
    input_variables = {"query": query, "document": document}
    result = chain.invoke(input_variables).relevance_score
    return result

# Knowledge Refinement
class KnowledgeRefinementInput(BaseModel):
    key_points: str = Field(..., description="The document to extract key information from.")
def knowledge_refinement(document: str):
    prompt = PromptTemplate(
        input_variables=["document"],
        template="Extract the key information from the following document in bullet points:\n{document}\nKey points:"
    )
    chain = prompt | llm.with_structured_output(KnowledgeRefinementInput)
    input_variables = {"document": document}
    result = chain.invoke(input_variables).key_points
    return [point.strip() for point in result.split('\n') if point.strip()]

# Web Search Query Rewriter
class QueryRewriterInput(BaseModel):
    query: str = Field(..., description="The query to rewrite.")
def rewrite_query(query: str) -> str:
    prompt = PromptTemplate(
        input_variables=["query"],
        template="Rewrite the following query to make it more suitable for a web search:\n{query}\nRewritten query:"
    )
    chain = prompt | llm.with_structured_output(QueryRewriterInput)
    input_variables = {"query": query}
    return chain.invoke(input_variables).query.strip()

### Helper function to parse search results

In [8]:
def parse_search_results(results_string: str) -> List[Tuple[str, str]]:
    """
    Parse a JSON string of search results into a list of title-link tuples.

    Args:
        results_string (str): A JSON-formatted string containing search results.

    Returns:
        List[Tuple[str, str]]: A list of tuples, where each tuple contains the title and link of a search result.
                               If parsing fails, an empty list is returned.
    """
    try:
        # Attempt to parse the JSON string
        results = json.dumps(results_string)
        results = json.loads(results)
        # Extract and return the title and link from each result
        return [(result.get('title', 'Untitled'), result.get('href', '')) for result in results]
    except json.JSONDecodeError:
        # Handle JSON decoding errors by returning an empty list
        print("Error parsing search results. Returning empty list.")
        return []

### Define sub functions for the CRAG process

In [9]:
def retrieve_documents(query: str, faiss_index: FAISS, k: int = 3) -> List[str]:
    """
    Retrieve documents based on a query using a FAISS index.

    Args:
        query (str): The query string to search for.
        faiss_index (FAISS): The FAISS index used for similarity search.
        k (int): The number of top documents to retrieve. Defaults to 3.

    Returns:
        List[str]: A list of the retrieved document contents.
    """
    docs = faiss_index.similarity_search(query, k=k)
    return [doc.page_content for doc in docs]

def evaluate_documents(query: str, documents: List[str]) -> List[float]:
    """
    Evaluate the relevance of documents based on a query.

    Args:
        query (str): The query string.
        documents (List[str]): A list of document contents to evaluate.

    Returns:
        List[float]: A list of relevance scores for each document.
    """
    return [retrieval_evaluator(query, doc) for doc in documents]

def perform_web_search(query: str) -> Tuple[List[str], List[Tuple[str, str]]]:
    """
    Perform a web search based on a query.

    Args:
        query (str): The query string to search for.

    Returns:
        Tuple[List[str], List[Tuple[str, str]]]:
            - A list of refined knowledge obtained from the web search.
            - A list of tuples containing titles and links of the sources.
    """
    rewritten_query = rewrite_query(query)
    web_results = search.text(rewritten_query, max_results=3)
    web_knowledge = knowledge_refinement(web_results)
    sources = parse_search_results(web_results)
    return web_knowledge, sources

def generate_response(query: str, knowledge: str, sources: List[Tuple[str, str]]) -> str:
    """
    Generate a response to a query using knowledge and sources.

    Args:
        query (str): The query string.
        knowledge (str): The refined knowledge to use in the response.
        sources (List[Tuple[str, str]]): A list of tuples containing titles and links of the sources.

    Returns:
        str: The generated response.
    """
    response_prompt = PromptTemplate(
        input_variables=["query", "knowledge", "sources"],
        template="Based on the following knowledge, answer the query. Include the sources with their links (if available) at the end of your answer:\nQuery: {query}\nKnowledge: {knowledge}\nSources: {sources}\nAnswer:"
    )
    input_variables = {
        "query": query,
        "knowledge": knowledge,
        "sources": "\n".join(f"{title}: {link}" if link else title for link, title in sources)
    }
    response_chain = response_prompt | llm
    return response_chain.invoke(input_variables).content

### CRAG process

In [19]:
def crag_process(query: str, faiss_index: FAISS) -> str:
    """
    Process a query by retrieving, evaluating, and using documents or performing a web search to generate a response.

    Args:
        query (str): The query string to process.
        faiss_index (FAISS): The FAISS index used for document retrieval.

    Returns:
        str: The generated response based on the query.
    """
    print(f"\nProcessing query: {query}")

    # Retrieve and evaluate documents
    retrieved_docs = retrieve_documents(query, faiss_index)
    eval_scores = evaluate_documents(query, retrieved_docs)

    print(f"\nRetrieved {len(retrieved_docs)} documents")
    print(f"Evaluation scores: {eval_scores}")

    max_score = max(eval_scores)
    min_score = min(eval_scores)
    final_knowledge= ""
    sources = []

    if min_score > 0.7:
        print("\nAll high quality - Using all retrieved documents directly")
        final_knowledge = "\n".join(retrieved_docs)
    elif max_score < 0.3:
        print("\nAll low quality - Using web search only")
        final_knowledge, sources = perform_web_search(query)
    else:
        print("\nMixed quality - Combining different treatments")
        refined_knowledge=[]
        direct_knowledge=[]

        for doc, score in zip(retrieved_docs, eval_scores):
            if score >= 0.7:
              direct_knowledge.append(doc)
            elif 0.3 < score < 0.7:
                refined = knowledge_refinement(doc)
                refined_knowledge.append(refined)
            web_knowledge, web_sources = perform_web_search(query)
            final_knowledge = "\n".join(direct_knowledge+refined_knowledge+web_knowledge)
            sources =  web_sources


    print("\nFinal knowledge:")
    print(final_knowledge)

    print("\nSources:")
    for title, link in sources:
        print(f"{title}: {link}" if link else title)

    # Generate response
    print("\nGenerating response...")
    response = generate_response(query, final_knowledge, sources)

    print("\nResponse generated")
    return response

### Example query with high relevance to the document


In [20]:
query = "What are the main causes of climate change?"
result = crag_process(query, vectorstore)
print(f"Query: {query}")
print(f"Answer: {result}")


Processing query: What are the main causes of climate change?





Retrieved 3 documents
Evaluation scores: [0.9, 0.9, 0.7]

Mixed quality - Combining different treatments




[]




[]




[]

Final knowledge:
Chapter 2: Causes of Climate Change 
Greenhouse Gases 
The primary cause of recent climate change is the increase in greenhouse gases in the 
atmosphere. Greenhouse gases, such as carbon dioxide (CO2), methane (CH4), and nitrous 
oxide (N2O), trap heat from the sun, creating a "greenhouse effect." This effect is essential 
for life on Earth, as it keeps the planet warm enough to support life. However, human 
activities have intensified this natural process, leading to a warmer climate. 
Fossil Fuels 
Burning fossil fuels for energy releases large amounts of CO2. This includes coal, oil, and
and extreme weather events. The Intergovernmental Panel on Climate Change (IPCC) has 
documented these changes extensively. Ice core samples, tree rings, and ocean sediments 
provide a historical record that scientists use to understand past climate conditions and 
predict future trends. The evidence overwhelmingly shows that recent changes are primarily 
driven by human activit

### Example query with low relevance to the document

In [18]:
query = "how did harry beat quirrell?"
result = crag_process(query, vectorstore)
print(f"Query: {query}")
print(f"Answer: {result}")


Processing query: how did harry beat quirrell?





Retrieved 3 documents
Evaluation scores: [0.0, 0.0, 0.0]

All low quality - Using web search only





Final knowledge:
["- In 'Sorcerer's Stone', Harry destroys Professor Quirrell, who is possessed by Voldemort, with his touch.", "- Quirrell and Harry fought for possession of the stone in the film, and Quirrell is defeated by Harry's physical touch, which burns him.", "- Harry's ability to burn Quirrell is linked to Lily Potter's magic, which prevents Voldemort from touching Harry."]

Sources:
Why Couldn't Harry Destroy the Horcruxes With His Bare ...: https://scifi.stackexchange.com/questions/7953/why-couldnt-harry-destroy-the-horcruxes-with-his-bare-hands
Quirinus Quirrell | Harry Potter Wiki | Fandom: https://harrypotter.fandom.com/wiki/Quirinus_Quirrell
How was Harry Potter able to burn Professor Quirrell's ...: https://www.quora.com/How-was-Harry-Potter-able-to-burn-Professor-Quirrells-body-with-his-hands-in-Harry-Potter-and-the-Sorcerers-Philosophers-Stone

Generating response...

Response generated
Query: how did harry beat quirrell?
Answer: Harry Potter defeated Professor Quir