Source: From Search to Synthesis: Enhancing RAG with BM25 and Reciprocal Rank Fusion

- BM25
- Reciprocal Rank Fusion (RRF)
- Sparse Priming Representation (SPR)

link: https://medium.com/@kachari.bikram42/from-search-to-synthesis-enhancing-rag-with-bm25-and-reciprocal-rank-fusion-872d21dc4ca7

In [None]:
OPENAI_API_KEY = "sk-proj---"

In [1]:
# pip install -r requirements.txt

# need to run "bash /Applications/Python*/Install\ Certificates.command" in terminal

In [None]:
from typing import List, Dict
from dataclasses import dataclass
from pydantic import validator

from langchain.chat_models import ChatOpenAI
from langchain.chat_models.base import BaseChatModel

import warnings
warnings.filterwarnings('ignore')

In [3]:
@dataclass
class Llm():
    llm: BaseChatModel
    llm_name: str
    llm_args: dict

    class Config:
        arbitrary_types_allowed = True

    def __str__(self):
        return f"llm: {self.llm_name} \n llm_args: {self.llm_args}"

In [4]:
class OpenAI(ChatOpenAI):
    model_name: str = "gpt-3.5-turbo"
    temperature: float = 0
    openai_api_key: str 
    streaming: bool = True

    @staticmethod
    def get_display_name():
        return "OpenAI"
    
    @staticmethod
    def get_valid_model_names():
        valid_model_names = {"gpt-3.5-turbo", "gpt-3.5-turbo-0125", "gpt-3.5-turbo-16k", "gpt-3.5-turbo-0613", "gpt-3.5-turbo-16k-0613", "gpt-4", "gpt-4-0613", "gpt-4-32k-0613", "gpt-4-32k"}
        return valid_model_names

    @validator("model_name")
    def valid_model_name(cls, request):
        valid_model_names = cls.get_valid_model_names()
        if request not in valid_model_names:
            raise ValueError(f"invalid model name given - {request}, valid ones are {valid_model_names}")
        return request

/var/folders/y3/lt1n9cfd58g_3nf7mrytx7_r0000gp/T/ipykernel_47143/1778150020.py:16: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator("model_name")


In [5]:
class LangChainLlms:

    def __init__(self):
        self.__llms = { "OpenAI":{ "llm":OpenAI, "schema": OpenAI } }
    
    def get_llm(self, llm_name: str, **llm_kwargs) -> Llm:
        if llm_name not in self.__llms:
            raise ValueError(f"invalid llm name given - {llm_name}, must be one of {list(self.__llms.keys)}")
        
        llm = self.__llms[llm_name]["llm"]
        print(llm_kwargs)

        llm_args = self.__llms[llm_name]["schema"](**llm_kwargs)
        llm_obj = llm(**dict(llm_args))

        return Llm(llm = llm_obj, llm_args=dict(llm_args), llm_name=llm_name)

In [6]:
from langchain_community.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings

from langchain.document_loaders import DirectoryLoader, PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter

from rank_bm25 import BM25Okapi

In [None]:
CHROMA_PATH = "chroma_data"

embeddings = OpenAIEmbeddings(openai_api_key = OPENAI_API_KEY)

# prepare chroma vector db
vector_db = Chroma("langchain_store", embeddings, persist_directory=CHROMA_PATH)

  embeddings = OpenAIEmbeddings(openai_api_key = openai_api_key)
  vector_db = Chroma("langchain_store", embeddings, persist_directory=CHROMA_PATH)
Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given
Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given


In [9]:
# prepare bm25 corpus

DATA_PATH_1 = "01_data/batch_1/"
DATA_PATH_2 = "01_data/batch_2/"
DATA_PATH_3 = "01_data/batch_3/"

# split it into chunks
text_splitter = CharacterTextSplitter(chunk_size=3000, chunk_overlap=50)

# prepare data in resp. batch
pdf_docs_1 = DirectoryLoader(DATA_PATH_1, glob = './*.pdf', loader_cls = PyPDFLoader).load()
chunk_1 = text_splitter.split_documents(pdf_docs_1)

pdf_docs_2 = DirectoryLoader(DATA_PATH_2, glob = './*.pdf', loader_cls = PyPDFLoader).load()
chunk_2 = text_splitter.split_documents(pdf_docs_2)

pdf_docs_3 = DirectoryLoader(DATA_PATH_3, glob = './*.pdf', loader_cls = PyPDFLoader).load()
chunk_3 = text_splitter.split_documents(pdf_docs_3)


bm25_corpus = [doc.page_content for doc in chunk_1] + [doc.page_content for doc in chunk_2] + [doc.page_content for doc in chunk_3]

Ignoring wrong pointing object 6 0 (offset 0)
Ignoring wrong pointing object 8 0 (offset 0)
Ignoring wrong pointing object 12 0 (offset 0)
Ignoring wrong pointing object 16 0 (offset 0)
Ignoring wrong pointing object 208 0 (offset 0)
Ignoring wrong pointing object 583 0 (offset 0)
Ignoring wrong pointing object 585 0 (offset 0)
Ignoring wrong pointing object 589 0 (offset 0)
Ignoring wrong pointing object 591 0 (offset 0)
Ignoring wrong pointing object 6 0 (offset 0)
Ignoring wrong pointing object 12 0 (offset 0)
Ignoring wrong pointing object 15 0 (offset 0)
Ignoring wrong pointing object 6 0 (offset 0)
Ignoring wrong pointing object 8 0 (offset 0)
Ignoring wrong pointing object 10 0 (offset 0)
Ignoring wrong pointing object 12 0 (offset 0)
Ignoring wrong pointing object 17 0 (offset 0)
Ignoring wrong pointing object 18 0 (offset 0)
Ignoring wrong pointing object 20 0 (offset 0)
Ignoring wrong pointing object 22 0 (offset 0)
Ignoring wrong pointing object 24 0 (offset 0)
Ignoring wron

In [11]:
class VectorDbWithBM25:
    def __init__(self):
        self.__vector_db = vector_db
        self.__bm25_corpus = bm25_corpus

        tokenized_corpus = [doc.split(" ") for doc in bm25_corpus]
        self.__bm25 = BM25Okapi(tokenized_corpus)

    def vector_db_search(self, query: str, k = 3) -> Dict[str, float]:
        search_result = dict()
        docs_and_scores = self.__vector_db.similarity_search_with_relevance_scores(query=query, k=k)
        for doc, score in docs_and_scores:
            search_result[doc.page_content] = score

        return {doc: score for doc, score in sorted(search_result.items(), key = lambda x: x[1], reverse=True)}
    
    def bm25_search(self, query: str, k = 3) -> Dict[str, float]:
        tokenized_query = query.split(" ")
        doc_scores = self.__bm25.get_scores(tokenized_query)
        docs_with_scores = dict(zip(self.__bm25_corpus, doc_scores))
        sorted_docs_with_scores = sorted(docs_with_scores.items(), key = lambda x: x[1], reverse=True)
        return dict(sorted_docs_with_scores[:k])
    
    def combine_results(self, vector_db_search_results: Dict[str, float], bm25_search_results: Dict[str, float]) -> Dict[str, float]:
        
        def normalize_dict(input_dict):
            epsilon = 0.05
            min_value = min(input_dict.values())
            max_value = max(input_dict.values())
            a, b = 0.05, 1

            if max_value == min_value:
                print("min-max values are the same!")
                return {k: b if max_value > 0.5 else a for k in input_dict.keys()}
            
            return {k: a + ((v - min_value) / (max_value - min_value)) * (b - a) for k,v in input_dict.items()}
        
        norm_vector_db_search_results = normalize_dict(vector_db_search_results)
        norm_bm25_search_results = normalize_dict(bm25_search_results)

        # combine the dictionaries
        combined_dict = {}
        for k, v in norm_vector_db_search_results.items():
            combined_dict[k] = v

        for k, v in norm_bm25_search_results.items():
            if k in combined_dict:
                combined_dict[k] = max(combined_dict[k], v)
            else:
                combined_dict[k] = v
            
        return combined_dict

    def search(self, query: str, k=3, do_bm25_search = True) -> Dict[str, float]:
        
        print()
        print('Query: ', query)
        vector_db_search_results = self.vector_db_search(query, k=k)

        print()
        print(" --------vector db search results --------")
        print(vector_db_search_results)

        if do_bm25_search:

            bm25_search_results = self.bm25_search(query, k=k)

            print()
            print(" --------bm25 search results --------")
            print(bm25_search_results)

            if bm25_search_results:
                combined_search_results = self.combine_results(vector_db_search_results, bm25_search_results)
                sorted_docs_with_scores = sorted(combined_search_results.items(), key = lambda x: x[1], reverse=True)
                return dict(sorted_docs_with_scores)
            
        return vector_db_search_results

In [12]:
vector_db_with_bm25 = VectorDbWithBM25()
LangChain_llm = LangChainLlms()

In [13]:
import re

from langchain.schema import (
    AIMessage,
    HumanMessage,
    SystemMessage,
    LLMResult
)

In [14]:
# this is for the LLM generated 4 search queries
def remove_bullet_points(text):
    lines = text.strip().split('\n')
    cleaned_lines = [re.sub(r'^[\d\.\-\*\s]+', '', line).strip() for line in lines]
    return cleaned_lines

In [None]:
class RagFusion:
    def __init__(self, vector_store):
        self.__vectorstore = vector_store
        self.__llm = LangChain_llm.get_llm("OpenAI",
                                           openai_api_key = OPENAI_API_KEY, model_name = "gpt-3.5-turbo-16k").llm
        
    def generate_queries(self, query: str) -> List[str]:
        system_prompt = "You are a helpful assistant that generates multiple search queries based on a single input query."
        human_message = f"Generate 3 search queries related to: {query}"
        
        messages = []
        messages.append(SystemMessage(content = system_prompt))
        messages.append(HumanMessage(content = human_message))

        response = self.__llm.generate(messages=[messages])

        if response and isinstance(response, LLMResult):
            generations = response.flatten()
            llm_result = generations[0].generations[0][0].text
            parsed_result = remove_bullet_points(llm_result)
            return parsed_result
        return []
    
    def rewrite_query(self, query: str) -> str:
        prompt = f"""Provide a better search query for web search engine to answer the given question.\
            End the queries with '**'. Question: ``` {query} ``` """
        messages = [HumanMessage(content=prompt)]

        response = self.__llm.generate(messages=[messages])

        if response and isinstance(response, LLMResult):
            generations = response.flatten()
            llm_result = generations[0].generations[0][0].text
            return llm_result.strip("**")
        return ""
    
    def vector_db_search(self, query: str, k=3) -> Dict[str, float]:
        search_result = dict()

        docs_and_scores = self.__vectorstore.search(query, do_bm25_search = True, k=k)
        for doc, score in docs_and_scores.items():
            search_result[doc] = score

        query_search_result = {f'The answer of the question {query} might be in the following content: {k}': v for k, v in search_result.items()}

        return {doc: score for doc, score in sorted(query_search_result.items(), key=lambda x: x[1], reverse=True)}
    
    def retrieve_multiple_responses(self, similar_queries: List[str], k=3) -> Dict[str, Dict[str, float]]:
        all_results = dict()
        for query in similar_queries:
            search_results = self.vector_db_search(query, k=k)
            all_results[query] = search_results
        
        return all_results
    
    def reciprocal_rank_fusion(self, search_results_dict, k=60) -> Dict[str, float]:
        # k=60 is taken from the paper https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf
        
        fused_scores = {}
        for query, doc_scores in search_results_dict.items():
            for rank, (doc, score) in enumerate(sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)):
                if doc not in fused_scores:
                    fused_scores[doc] = 0
                
                previous_score = fused_scores[doc]
                fused_scores[doc] += 1/(rank + k)
        
        reranked_results = {doc: score for doc, score in sorted(fused_scores.items())}
        return reranked_results
    
    def run_spr(self, content_to_compress: str) -> str:
        spr_system_prompt = """
            # MISSION
            You are a Sparse Priming Representation (SPR) writer.
            An SPR is a particular kind of use of language for advanced NLP, NLU, and NLG tasks, particularly useful for the latest generation Large Language Models (LLMs). 
            You will be given information by the USER which you are to render as an SPR.

            # THEORY
            LLMs are a kind of deep neural network.
            They have been demonstrated to embed knowledge, abilities, and concepts, ranging from reasoning to planning, and even to theory of mind.
            These are called latent abilities and latent content, collectively referred to as latent space. 
            The latent space of a LLM can be activated with the correct series of words as input, which will create a useful internal state of the neural network.
            This is not unlike how the right shorthand cues can prime a human mind to think in a certain way.
            Like human minds, LLMs are associative, meaning you only need to use the correct associations to "prime" another model to think in the same way.

            # METHODOLOGY
            Render the input as a distilled list of succinct statements, assertions, associations, concepts, analogies, and metaphors. 
            The idea is to capture as much, conceptually, as possible but with as few words as possible.
            Write it in a way that makes sense to you, as the future audience will be another language model, not a human.
            """
        
        human_message = f"this is the input content that you need to distill - ``` {content_to_compress} ``` "

        messages = []
        messages.append(SystemMessage(content=spr_system_prompt))
        messages.append(HumanMessage(content=human_message))

        response = self.__llm.generate(messages=[messages])

        if response and isinstance(response, LLMResult):
            generations = response.flatten()
            llm_result = generations[0].generations[0][0].text
            return llm_result
        return ""
    
    def form_final_result(self, spr_results: List[str], original_query: str) -> str:

        spr_results = "\n ****************** \n".join(spr_results)

        prompt = f"""Answer the user's question based only on the following context:
        
            <context>
            {spr_results}
            </context>

            Question: ``` {original_query} ```
            DO NOT MAKE UP ANY FALSE INFORMATION. USE ONLY THE GIVEN CONTEXT
        """

        messages = [HumanMessage(content=prompt)]

        response = self.__llm.generate(messages=[messages])
        if response and isinstance(response, LLMResult):
            generations = response.flatten()
            llm_result = generations[0].generations[0][0].text
            return llm_result
        return ""
    
    def arun(self, query: str, rewrite_original_query=False):

        if rewrite_original_query:
            rephrased_query = self.rewrite_query(query)
            if rephrased_query:
                query = rephrased_query
                print("rephrased_query: ", rephrased_query)
                print()

        similar_queries_list = self.generate_queries(query)
        print("similar_queries_list: ", similar_queries_list)
        print()

        if similar_queries_list:
            search_results = self.retrieve_multiple_responses(similar_queries_list)
            print('hi')
            reranked_results = self.reciprocal_rank_fusion(search_results)

            # here I am using all the reranked results, you can select the top N
            spr_tasks = []
            spr_results = []

            print('--------- reranked results ---------')
            print(reranked_results)

            for result, score in reranked_results.items():
                print('--- before spr ---')
                print(result)
                spr_result = self.run_spr(result)
                spr_results.append(spr_result)

            if spr_results:
                for spr_content in spr_results:
                    print('---- spr content ---')
                    print(spr_content)
                    print()
                print("*" * 100)

                final_result = self.form_final_result(spr_results, query)
                print("final result: ")
                print(final_result)

In [16]:
rag = RagFusion(vector_store = vector_db_with_bm25)

{'openai_api_key': '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', 'model_name': 'gpt-3.5-turbo-16k'}


In [17]:
query = "Who is Mr. Noppadol Pinsupa?"

rag.arun(query, rewrite_original_query=True)

# total runtime = 1m 9s; query = "Who is Mr. Noppadol Pinsupa?",
# total runtime = 1m 31s; query = "Who is Chief Operating Officer (Downstream Petroleum Business Group) of PTT?"

rephrased_query:  1. "Biography of Mr. Noppadol Pinsupa"**
2. "Background information on Mr. Noppadol Pinsupa"**
3. "Mr. Noppadol Pinsupa profile"

similar_queries_list:  ['"Life story of Mr. Noppadol Pinsupa"', '"Personal history of Mr. Noppadol Pinsupa"', '"Mr. Noppadol Pinsupa background and achievements"']


Query:  "Life story of Mr. Noppadol Pinsupa"


Failed to send telemetry event CollectionQueryEvent: capture() takes 1 positional argument but 3 were given



 --------vector db search results --------
{'Education/ CertificationWorking Experiences in the 5 Preceding Years (As of December 31, 2023)\nTime PeriodPositionOrganization/ Company\n• Certiﬁcate, Executive Program in Energy  Literacy for a Sustainable Future  (Class 16), Thailand Energy Academy (TEA)•  Diploma, The National Defence Course  (Class 62), National Defence College\nRelevant Important Positions in Non-Listed Company (In the Previous Year)\nJanuary 11, 2023 - PresentChairman, PTT LNG Company Limited \nJuly 20, 2021 - December 31, 2022Chairman, B.GRIMM Power LNG JV Company Limited\nOther Experiences/ Other Activities/ Other Organizations (In the Previous Year)\nOctober 1, 2022 - PresentExecutive Director, Thailand Energy Academy\nMr. Noppadol PinsupaChief Operating Ofﬁcer, Downstream Petroleum Business GroupAge (Year) 59\n•  Chief Operating Ofﬁcer,  Downstream Petroleum Business Group (Appointed on October 1, 2022)\nFamily Relationship among Directors and Executives\n-None-\