### RAG with Python (Langchain)
![](https://miro.medium.com/v2/resize:fit:1100/format:webp/1*_Rjw0DOvOO6tfAotfKsG_g.png)
![](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F2496e7c6fedd7ffaa043895c23a4089638b0c21b-3840x2160.png&w=3840&q=75)
![](https://cohere.com/_next/image?url=https%3A%2F%2Flh7-us.googleusercontent.com%2FmY4nN_0I3bcslVlC-dlw8tWsMBqsA33ai2spUc4PSodgcQFr0hlLsazK4MVeAIvEqmp8yk6QgbnKW0MR5CfyibybSpW2A7aGd-UHE7V3XVX-gtQkN8gscwk8Q3gUK5EmXLAwjumTCqWpc-DuyPJNRF8&w=2048&q=75)

In [None]:
!pip install -U langchain langchain-openai langchain-community faiss-cpu rank_bm25 langchain-cohere

In [1]:
import logging
import os
from dotenv import load_dotenv
from langchain.document_loaders import TextLoader
from langchain.prompts import PromptTemplate
from langchain.retrievers import BM25Retriever
from langchain.vectorstores import FAISS
from langchain_cohere import CohereRerank
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import BaseDocumentCompressor
from langchain_core.retrievers import BaseRetriever
import tqdm as tqdm
from langchain.docstore.document import Document
import json
import re
import uuid
import warnings
from typing import Dict, List, Tuple
from pydantic import BaseModel
load_dotenv()
logging.disable(level=logging.INFO)
os.environ["TOKENIZERS_PARALLELISM"] = "true"

In [2]:
### .env
# OPENAI_API_KEY="your api key"
# COHERE_API_KEY="your api key"

In [3]:
llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
embeddings = OpenAIEmbeddings()
loader = TextLoader("./paul_graham_essay.txt")
documents = loader.load()
WHOLE_DCOUMENT = documents[0].page_content

In [4]:
prompt_document = PromptTemplate(
    input_variables=["WHOLE_DCOUMENT"], template="{WHOLE_DCOUMENT}"
)

prompt_chunk = PromptTemplate(
    input_variables=["CHUNK_CONTENT"], template="Here is the chunk we want to situate within the whole document\n\n{CHUNK_CONTENT}\n\n"
    "Please give a short succinct context to situate this chunk within the overall document for "
    "the purposes of improving search retrieval of the chunk. Answer only with the succinct context and nothing else.",
)

In [5]:
def split_text(text, chunk_size=256):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=200)
    doc_chunks = text_splitter.create_documents(text)
    for i, doc in enumerate(doc_chunks):
        doc.metadata = {"doc_id":f"doc_{i}"}
    return doc_chunks

def create_embedding_retriever(documents_):
    faiss_index = FAISS.from_documents(documents_, embedding=embeddings)
    return faiss_index.as_retriever(search_kwargs={"k": 4})

def create_bm25_retriever(documents_):
    bm25 = BM25Retriever.from_documents(documents_,language="en")
    return bm25

In [6]:
class EmbeddingBM25RerankerRetriever:
    def __init__(self,
                 embedding_retriever: BaseRetriever,
                 bm25_retriever: BaseRetriever,
                 reranker: CohereRerank):

        self.embedding_retriever = embedding_retriever
        self.bm25_retriever = bm25_retriever
        self.reranker = reranker

    def invoke(self, query):
        vector_docs = self.embedding_retriever.invoke(query)
        bm25_docs = self.bm25_retriever.invoke(query)

        combined_docs = vector_docs + [
            doc for doc in bm25_docs if doc not in vector_docs
        ]

        reranked_docs = self.reranker.compress_documents(combined_docs, query)

        return reranked_docs

In [7]:
# none contextual retriever
chunks = split_text(documents[0].page_content)
embedding_retriever = create_embedding_retriever(chunks)
bm25_retriever = create_bm25_retriever(chunks)
reranker= CohereRerank(top_n=3,model='rerank-english-v2.0')

embedding_bm25_retriever_rerank = EmbeddingBM25RerankerRetriever(
    embedding_retriever=embedding_retriever,
    bm25_retriever=bm25_retriever,
    reranker=reranker
)

In [8]:
# contextual retriever
def create_contextual_chunks(chunks_):
    contextual_chunks = []
    for chunk in tqdm.tqdm(chunks_):
        context = prompt_document.format(WHOLE_DCOUMENT=WHOLE_DCOUMENT)
        chunk_content = prompt_chunk.format(CHUNK_CONTENT=chunk)
        llm_response = llm.invoke(context + chunk_content).content
        page_content = f"""Text: {chunk.page_content}\nContext: {llm_response}"""
        doc = Document(page_content=page_content, metadata=chunk.metadata)
        contextual_chunks.append(doc)
    return contextual_chunks

contextual_chunks = create_contextual_chunks(chunks)

100%|██████████| 1004/1004 [23:36<00:00,  1.41s/it] 


In [9]:
contextual_embedding_retriever = create_embedding_retriever(contextual_chunks)
contextual_bm25_retriever = create_bm25_retriever(contextual_chunks)
embedding_bm25_retriever_rerank = EmbeddingBM25RerankerRetriever(
    embedding_retriever=contextual_embedding_retriever,
    bm25_retriever=contextual_bm25_retriever,
    reranker=reranker
)

In [22]:
## generate question-context paris
#123456789
#123
#456
#789
# llm - prompt
# given the {#123} generate question and answer pairs 
# test dataset

DEFAULT_QA_GENERATE_PROMPT_TMPL = """\
Context information is below.

---------------------
[Context]
{context_str}
---------------------

Given the context information and no prior knowledge.
generate only questions based on the below query.

Your task is to setup {num_questions_per_chunk} questions based on the [Context]. 
The questions should be diverse in nature \
across the document, and questions must be relevant to the [Context]. 
Restrict the questions to the [Context] information provided above. Do not fabricate questions outside of the [Context]"
"""

class QuestionContextEvalDataset(BaseModel):
    queries: Dict[str, str]
    corpus: Dict[str, str]
    relevant_docs: Dict[str, List[str]]
    mode: str = "text"

    @property
    def query_docid_pairs(self) -> List[Tuple[str, str]]:
        return [
            (query,self.relevant_docs[query_id])
            for query_id, query in self.queries.items()
        ]

    def save_json(self, path: str):
        with open(path, "w") as f:
            json.dump(self.dict(), f, indent=2)


    @classmethod
    def load_json(cls, path: str):
        with open(path, "r") as f:
            data = json.load(f)
        return cls(**data)

def generate_question_context_pairs(documents:List[Document],
                                    llm,
                                    qa_generate_prompt_tmpl,
                                    num_questions_per_chunk:int=5):
    doc_dict = {doc.metadata["doc_id"]:doc.page_content for doc in documents}
    queries = {}
    relevant_docs = {}
    for doc_id, text in tqdm.tqdm(doc_dict.items()):
        query = qa_generate_prompt_tmpl.format(
            context_str=text,
            num_questions_per_chunk=num_questions_per_chunk,
        )
        response = llm.invoke(query).content
        print(response)
        result = re.split(r"\n+",response.strip())
        print(result)
        questions = [
            re.sub(r"^\d+[\).\s]", "",question).strip() for question in result
        ]

        questions = [question for question in questions if len(question) > 0][:num_questions_per_chunk]
        for question in questions:
            question_id = str(uuid.uuid4())
            queries[question_id] = question
            relevant_docs[question_id] = doc_id

    return QuestionContextEvalDataset(
        queries=queries,
        corpus=doc_dict,
        relevant_docs=relevant_docs,
    )

In [None]:
qa_pairs = generate_question_context_pairs(
    documents=chunks,
    llm=llm,
    qa_generate_prompt_tmpl=DEFAULT_QA_GENERATE_PROMPT_TMPL,
    num_questions_per_chunk=2)

  0%|          | 0/1004 [00:00<?, ?it/s]

  0%|          | 1/1004 [00:02<48:25,  2.90s/it]

1. What does the letter "W" represent in the context provided?  
2. How might the context of "W" be interpreted in different scenarios?
['1. What does the letter "W" represent in the context provided?  ', '2. How might the context of "W" be interpreted in different scenarios?']


  0%|          | 2/1004 [00:04<38:28,  2.30s/it]

1. What does the letter "h" represent in the context provided?  
2. How might the single character "h" be interpreted in different contexts?
['1. What does the letter "h" represent in the context provided?  ', '2. How might the single character "h" be interpreted in different contexts?']


  0%|          | 3/1004 [00:05<27:36,  1.65s/it]

1. What does the letter "a" represent in the context provided?  
2. How might the single character "a" be interpreted in different contexts?
['1. What does the letter "a" represent in the context provided?  ', '2. How might the single character "a" be interpreted in different contexts?']


  0%|          | 4/1004 [00:06<23:59,  1.44s/it]

1. What does the letter "t" represent in the context provided?  
2. How might the single character "t" be interpreted in different contexts?
['1. What does the letter "t" represent in the context provided?  ', '2. How might the single character "t" be interpreted in different contexts?']


  0%|          | 5/1004 [00:07<21:03,  1.26s/it]

1. What does the letter "I" represent in the context provided?  
2. How might the single letter "I" be interpreted in different contexts or fields?
['1. What does the letter "I" represent in the context provided?  ', '2. How might the single letter "I" be interpreted in different contexts or fields?']


In [None]:
def compute_hit_rage(expected_ids,retrieved_ids):
    is_hit = any(id in expected_ids for id in retrieved_ids)
    return 1.0 if is_hit else 0.0

def compute_mrr(expected_ids,retrieved_ids):
    if i,id in enumerate(retrieved_ids):
        if id in expected_ids:
            return 1/(i+1)
    return 0.0

def compute_ndcg(expected_ids,retrieved_ids):
    dcg = 0.0
    idcg = 0.0
    for i,id in enumerate(retrieved_ids):
        if id in expected_ids:
            dcg += 1.0/(i+1)
        idcg += 1.0/(i+1)
    return dcg/idcg

In [None]:
def extract_queries(dataset):
    values = []
    for value in dataset.queries.values():
        values.append(value)
    return values

def extract_doc_ids(documents_):
    doc_ids = []
    for doc in documents_:
        doc_ids.append(doc.metadata["doc_id"])
    return doc_ids

def evaluate(retriever,dataset):
    hit_rate_result = []
    ndcg = []
    mrr = []
    for i in tqdm.tqdm(range(len(dataset.queries))):
        context = retriever.invoke(extract_queries(dataset)[i])
        
        expected_ids = dataset.relevant_docs[List(dataset.queries.keys())[i]]
        retrieved_ids = extract_doc_ids(context)
        mrr = compute_mrr(expected_ids,retrieved_ids)
        ndcg = compute_ndcg(expected_ids,retrieved_ids)
        hit_rate_result.append(compute_hit_rage(expected_ids,retrieved_ids))
        ndcg.append(ndcg)
        mrr.append(mrr)
        array = np.array([mrr,hit_rate_result,ndcg])
        mean_result = np.mean(array)
        mean_result.index = ["MRR","Hit Rate","NDCG"]
        return mean_result

In [None]:
embedding_bm25_rerank_results = evaluate(embedding_bm25_retriever_rerank,qa_pairs)

In [None]:
contextual_embedding_bm25_rerank_results = evaluate(contextual_embedding_bm25_retriever_rerank,qa_pairs)

In [None]:
embedding_retriever_results = evaluate(embedding_retriever,qa_pairs)

In [None]:
contextual_embedding_retriever_results = evaluate(contextual_embedding_retriever,qa_pairs)

In [None]:
bm25_results = evaluate(bm25_retriever,qa_pairs)

In [None]:
contextual_bm25_retriever_results = evaluate(contextual_bm25_retriever,qa_pairs)

In [None]:
def display_results(name,eval_result):
    metrics = ['MRR','Hit Rate','NDCG']
    
    columns = ["Retriever": [name],
               **{metric: val for metric,val in zip(metrics,eval_result.values)}]
    metrics_df = pd.DataFrame(columns)
    return metrics_df

pd.concat([display_results("embedding retriever",embedding_retriever_results),
           display_results("embedding bm25 retriever",embedding_bm25_rerank_results),
           display_results("Contextual embedding retriever",contextual_embedding_retriever_results),]
           ,ignore_index=True,axis=0)