In [1]:
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_groq import ChatGroq
from langchain_community.vectorstores import FAISS

from langchain_text_splitters import (
    TokenTextSplitter, 
    RecursiveCharacterTextSplitter
)
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_chroma import Chroma
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.retrievers.document_compressors import EmbeddingsFilter
from langchain.chains import (
    RetrievalQAWithSourcesChain, 
    RetrievalQA
)
from langchain.retrievers.document_compressors import LLMChainFilter
from langchain_community.document_transformers import LongContextReorder
from langchain.retrievers.merger_retriever import MergerRetriever

from dotenv import load_dotenv, find_dotenv
import os
from langchain.chains import RetrievalQA
import langchain
langchain.debug = False


load_dotenv(
    find_dotenv()
)

os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")

model_name = [
    "llama3-70b-8192", 
    "gemma2-9b-it", 
    "qwen-qwq-32b"
]

llm_model = ChatGroq(
    model=model_name[1], 
    temperature=0.4, 
    max_tokens=512, 
    model_kwargs={
        "top_p" : 0.9,
    },
)


def printDocuments(docs) :
    print(
        f"\n{'-' * 100}\n".join(
            [
                f"Document {index + 1}:- \n\n" + d.page_content for index, d in enumerate(docs)
            ]
        )
    )

In [None]:
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_groq import ChatGroq
from langchain_community.vectorstores import FAISS

from langchain_text_splitters import (
    TokenTextSplitter, 
    RecursiveCharacterTextSplitter
)
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_chroma import Chroma
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.retrievers.document_compressors import EmbeddingsFilter
from langchain.chains import (
    RetrievalQAWithSourcesChain, 
    RetrievalQA
)
from langchain.retrievers.document_compressors import LLMChainFilter
from langchain_community.document_transformers import LongContextReorder
from langchain.retrievers.merger_retriever import MergerRetriever

from dotenv import load_dotenv, find_dotenv
import os
from langchain.chains import RetrievalQA
import langchain
langchain.debug = False


load_dotenv(
    find_dotenv()
)

os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")

model_name = [
    "llama3-70b-8192", 
    "gemma2-9b-it", 
    "qwen-qwq-32b"
]

llm_model = ChatGroq(
    model=model_name[1], 
    temperature=0.4, 
    max_tokens=512, 
    model_kwargs={
        "top_p" : 0.9,
    },
)


def printDocuments(docs) :
    print(
        f"\n{'-' * 100}\n".join(
            [
                f"Document {index + 1}:- \n\n" + d.page_content for index, d in enumerate(docs)
            ]
        )
    )


def DataAugmentationWithDualRetriever(
    data_path, embedding_provider, chunk_size, chunk_overlap, 
    faiss_retriever_search_type, faiss_retriever_k_documents, 
    chroma_retriever_search_type, chroma_retriever_k_documents, **kwargs) :
    if kwargs["execute_function"] :
        print("\n[INFO] ----> Running the Data Augmentation Pipeline for RAG, please wait....\n")
    
        documents = PyMuPDFLoader(
            data_path,
        ).load()
        print("\n[INFO] ----> Total Pages in the original document  are :- {}".format(len(documents)))

        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size, 
            chunk_overlap=chunk_overlap,
            length_function=len,
        )

        document_chunks = splitter.split_documents(
            documents
        )

        print("[INFO] ----> Total Document Chunks Created are :- {}\n".format(len(document_chunks)))

        if embedding_provider == "Google" :
            print("[INFO] ----> Using the Google-AI Embedding model...")
            embeddings = GoogleGenerativeAIEmbeddings(
                model="models/embedding-001"
            )
            print("[INFO] ----> Google-AI Embedding model loaded.\n")

        elif embedding_provider == "OpenAI" :
            print("[INFO] ----> Using the Open-AI Embedding model....")
            embeddings = OpenAIEmbeddings()
            print("[INFO] ----> OpenAI Embedding model loaded.\n")

        print("[INFO] ----> Creating the FAISS Vectorstore, please wait.....")
        faissRetriever = FAISS.from_documents(
            document_chunks, embeddings).as_retriever(
                search_type=faiss_retriever_search_type, 
                search_kwargs={
                    "k" : faiss_retriever_k_documents
                }
            )
        print("[INFO] ----> FAISS Retriever Created successfully.....\n")

        print("[INFO] ----> Creating the Chroma Vectorstore, please wait.....")
        chromaRetriever = Chroma.from_documents(
            document_chunks, 
            embeddings,
        ).as_retriever(
            search_type=chroma_retriever_search_type, 
            search_kwargs={
                "k" : chroma_retriever_k_documents, 
            }
        )
        print("[INFO] ----> Chroma Retriever Created successfully.....\n")
        print("[INFO] ----> RAG Data Augmentation completed......")

        return faissRetriever, chromaRetriever, embeddings
    else :
        print("[INFO] ----> Change the execute_function argument to True and then run...")
        return None, None


def EnsembleContextualCompressionRetriever(**kwargs) :
    if kwargs["execute_pipeline"] :
        redundant_filter = EmbeddingsRedundantFilter(
            embeddings=kwargs['embeddings'],
        )
        reordering = LongContextReorder()

        relevant_filter_faiss = EmbeddingsFilter(
            embeddings=kwargs['embeddings'], 
            similarity_threshold=kwargs['faiss_embeddingfilter_threshold'],
        )
        relevant_filter_chroma = EmbeddingsFilter(
            embeddings=kwargs['embeddings'], 
            similarity_threshold=kwargs['chroma_embeddingfilter_threshold'],
        )

        document_pipeline_compressor_faiss = DocumentCompressorPipeline(
            transformers=[
                redundant_filter, 
                relevant_filter_faiss, 
                reordering,
            ]
        )
        document_pipeline_compressor_chroma = DocumentCompressorPipeline(
            transformers=[
                redundant_filter, 
                relevant_filter_chroma,
                reordering,
            ]
        )

        # First Compression Retriever with FAISS
        compression_retriever_with_faiss = ContextualCompressionRetriever(
            base_compressor=document_pipeline_compressor_faiss, 
            base_retriever=kwargs['faissRetriever'],

        )
        # Second Compression Retriever with Chroma
        compression_retriever_with_chroma = ContextualCompressionRetriever(
            base_compressor=document_pipeline_compressor_chroma, 
            base_retriever=kwargs['chromaRetriever'],
        )

        lotrCompression = MergerRetriever(
            retrievers=[
                compression_retriever_with_faiss, 
                compression_retriever_with_chroma
            ]
        )
        return lotrCompression
    else :
        print("[INFO] ----> Change the execute_function argument to True and then run...")
        return None


def generateAnswerFunction(
    question, llm_model, retriever_function, 
    verbose=-1, **kwargs) :
    if kwargs["parse_function"] :
        rag_pipeline_type = kwargs["ragPipelineConfig"]
        if rag_pipeline_type == "RetrievalQAWithSourcesChain" :
            print("[INFO] ----> Running the RetrievalQAWithSourcesChain Pipeline.....\n")
            rag_pipeline = RetrievalQAWithSourcesChain.from_chain_type(
                llm = llm_model,
                chain_type = kwargs["chain_type"], 
                retriever = retriever_function,
                return_source_documents=True
            )
            print("[INFO] ----> Generating the answer for:- \nQuestion:- {}".format(question))
            generated_answer = rag_pipeline.invoke(question)
            result = generated_answer["answer"]
            print("[INFO] ----> Result Answer generated.....\n")
            if verbose > -1 :
                print("Result :- {}{}".format("\n", result))
            return result

        elif rag_pipeline_type == "RetrievalQAChain" :
            print("[INFO] ----> Running the RetrievalQAChain Pipeline.....\n")
            rag_pipeline = RetrievalQA.from_chain_type(
                llm = llm_model,
                chain_type = kwargs["chain_type"], 
                retriever = retriever_function,
                return_source_documents=True
            )
            print("[INFO] ----> Generating the answer for:- \nQuestion:- {}".format(question))
            generated_answer = rag_pipeline.invoke(question)
            result = generated_answer["result"]
            print("[INFO] ----> Result Answer generated.....\n")
            if verbose > -1 :
                print("Result :- {}{}".format("\n", result))
            return result



dataPath = "Data\\ReAct.pdf"
embedding = "Google"

faissRetriever, chromaRetriever, embeddings = DataAugmentationWithDualRetriever(
    data_path = dataPath,
    embedding_provider = embedding,
    chunk_size = 512,
    chunk_overlap = 128,
    faiss_retriever_search_type = "similarity",
    faiss_retriever_k_documents = 3,
    chroma_retriever_search_type = "similarity",
    chroma_retriever_k_documents = 4,
    execute_function = True
)

retriever = EnsembleContextualCompressionRetriever(
    faiss_embeddingfilter_threshold = 0.75,
    chroma_embeddingfilter_threshold = 0.6,
    embeddings=embeddings, 
    execute_pipeline=True,
    faissRetriever=faissRetriever, 
    chromaRetriever=chromaRetriever,
)


query = "Explain in details about the Chain of thought prompting as mentioned in ReAct Paper."

answer = generateAnswerFunction(
    question=query,
    llm_model=llm_model, 
    retriever_function=retriever,
    parse_function=True, 
    chain_type="stuff",

    # Choices are "RetrievalQAWithSourcesChain" or "RetrievalQAChain"
    ragPipelineConfig="RetrievalQAWithSourcesChain"
)

print("\n")
print("===" * 100)
print("Final Answer :- {}{}".format("\n", answer))



[INFO] ----> Running the Data Augmentation Pipeline for RAG, please wait....


[INFO] ----> Total Pages in the original document  are :- 33
[INFO] ----> Total Document Chunks Created are :- 293

[INFO] ----> Using the Google-AI Embedding model...
[INFO] ----> Google-AI Embedding model loaded.

[INFO] ----> Creating the FAISS Vectorstore, please wait.....
[INFO] ----> FAISS Retriever Created successfully.....

[INFO] ----> Creating the Chroma Vectorstore, please wait.....
[INFO] ----> Chroma Retriever Created successfully.....

[INFO] ----> RAG Data Augmentation completed......


In [4]:
query = "Explain in details about the Chain of thought prompting as mentioned in ReAct Paper."

answer = generateAnswerFunction(
    question=query,
    llm_model=llm_model, 
    retriever_function=retriever,
    parse_function=True, 
    chain_type="stuff",

    # Choices are "RetrievalQAWithSourcesChain" or "RetrievalQAChain"
    ragPipelineConfig="RetrievalQAWithSourcesChain"
)

print("\n")
print("===" * 100)
print("Final Answer :- {}{}".format("\n", answer))


[INFO] ----> Running the RetrievalQAWithSourcesChain Pipeline.....

[INFO] ----> Generating the answer for:- 
Question:- Explain in details about the Chain of thought prompting as mentioned in ReAct Paper.
[INFO] ----> Result Answer generated.....



Final Answer :- 
Chain-of-thought prompting, as mentioned in the ReAct paper, involves providing the model with a sequence of thoughts leading to a solution. 

The paper describes it as a "static black box" because while the model uses chain-of-thought reasoning, the process is not transparent. We don't know exactly how the model arrives at each step in the chain.

The ReAct paper proposes a new prompting method called "ReAct" that builds upon chain-of-thought prompting by incorporating both reasoning (thoughts) and actions taken by the model. This allows for a more dynamic and interpretable understanding of the model's decision-making process.




In [1]:
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader

from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_groq import ChatGroq

from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.retrievers import ContextualCompressionRetriever
from langchain_chroma import Chroma
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain.retrievers.document_compressors import EmbeddingsFilter
from langchain_community.document_transformers import LongContextReorder
from langchain.retrievers.merger_retriever import MergerRetriever
from langchain.chains import RetrievalQAWithSourcesChain, RetrievalQA

from dotenv import load_dotenv, find_dotenv
import os


class EnsembleMergerRetriever:
    def __init__(self, load_env=True):
        """
        Initialize the EnsembleMergerRetriever class.
        
        Args:
            load_env (bool): Whether to load environment variables from .env file
        """
        if load_env:
            load_dotenv(find_dotenv())
            os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
            os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
        
        self.faiss_retriever = None
        self.chroma_retriever = None
        self.embeddings = None
        self.merger_retriever = None
        self.llm_model = None
        self.documents = None
        self.document_chunks = None
    
    def setup_llm(self, model_name="gemma2-9b-it", temperature=0.4, max_tokens=512, top_p=0.9):
        """
        Set up the LLM model.
        
        Args:
            model_name (str): Model name (options: "llama3-70b-8192", "gemma2-9b-it", "qwen-qwq-32b")
            temperature (float): Temperature for generation
            max_tokens (int): Maximum tokens to generate
            top_p (float): Top-p value for generation
            
        Returns:
            EnsembleMergerRetriever: self instance for method chaining
        """
        self.llm_model = ChatGroq(
            model=model_name,
            temperature=temperature,
            max_tokens=max_tokens,
            model_kwargs={
                "top_p": top_p,
            },
        )
        return self
    
    def load_and_split_documents(self, data_path, chunk_size=512, chunk_overlap=128):
        """
        Load and split documents for retrieval.
        
        Args:
            data_path (str): Path to the PDF document
            chunk_size (int): Size of chunks for splitting
            chunk_overlap (int): Overlap between chunks
            
        Returns:
            EnsembleMergerRetriever: self instance for method chaining
        """
        print("\n[INFO] ----> Loading and splitting the document, please wait....\n")
        
        # Load documents
        self.documents = PyMuPDFLoader(data_path).load()
        print("\n[INFO] ----> Total Pages in the original document are: {}".format(len(self.documents)))
        
        # Split documents
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
        )
        
        self.document_chunks = splitter.split_documents(self.documents)
        print("[INFO] ----> Total Document Chunks Created are: {}\n".format(len(self.document_chunks)))
        
        return self
    
    def setup_embeddings(self, embedding_provider="Google"):
        """
        Set up embeddings for vectorstores.
        
        Args:
            embedding_provider (str): Provider for embeddings ("Google" or "OpenAI")
            
        Returns:
            EnsembleMergerRetriever: self instance for method chaining
        """
        if embedding_provider == "Google":
            print("[INFO] ----> Using the Google-AI Embedding model...")
            self.embeddings = GoogleGenerativeAIEmbeddings(
                model="models/embedding-001"
            )
            print("[INFO] ----> Google-AI Embedding model loaded.\n")
        
        elif embedding_provider == "OpenAI":
            print("[INFO] ----> Using the Open-AI Embedding model...")
            self.embeddings = OpenAIEmbeddings()
            print("[INFO] ----> OpenAI Embedding model loaded.\n")
        
        return self
    
    def create_retrievers(self, 
                         faiss_search_type="similarity", 
                         faiss_k_documents=3,
                         chroma_search_type="similarity", 
                         chroma_k_documents=4):
        """
        Create FAISS and Chroma retrievers.
        
        Args:
            faiss_search_type (str): Search type for FAISS retriever
            faiss_k_documents (int): Number of documents to retrieve with FAISS
            chroma_search_type (str): Search type for Chroma retriever
            chroma_k_documents (int): Number of documents to retrieve with Chroma
            
        Returns:
            EnsembleMergerRetriever: self instance for method chaining
        """
        if self.document_chunks is None or self.embeddings is None:
            print("[ERROR] ----> Documents and embeddings must be set up first")
            return self
            
        # Create FAISS retriever
        print("[INFO] ----> Creating the FAISS Vectorstore, please wait.....")
        self.faiss_retriever = FAISS.from_documents(
            self.document_chunks, 
            self.embeddings
        ).as_retriever(
            search_type=faiss_search_type,
            search_kwargs={
                "k": faiss_k_documents
            }
        )
        print("[INFO] ----> FAISS Retriever Created successfully.....\n")
        
        # Create Chroma retriever
        print("[INFO] ----> Creating the Chroma Vectorstore, please wait.....")
        self.chroma_retriever = Chroma.from_documents(
            self.document_chunks,
            self.embeddings,
        ).as_retriever(
            search_type=chroma_search_type,
            search_kwargs={
                "k": chroma_k_documents,
            }
        )
        print("[INFO] ----> Chroma Retriever Created successfully.....\n")
        
        return self
    
    def build_merger_retriever(self, 
                              faiss_embedding_filter_threshold=0.75,
                              chroma_embedding_filter_threshold=0.6):
        """
        Build the ensemble merger retriever with contextual compression.
        
        Args:
            faiss_embedding_filter_threshold (float): Similarity threshold for FAISS embeddings filter
            chroma_embedding_filter_threshold (float): Similarity threshold for Chroma embeddings filter
            
        Returns:
            EnsembleMergerRetriever: self instance for method chaining
        """
        if self.faiss_retriever is None or self.chroma_retriever is None:
            print("[ERROR] ----> FAISS and Chroma retrievers must be created first")
            return self
            
        print("[INFO] ----> Building the ensemble merger retriever, please wait.....")
        
        # Create shared filters
        redundant_filter = EmbeddingsRedundantFilter(embeddings=self.embeddings)
        reordering = LongContextReorder()
        
        # Create filters for FAISS
        relevant_filter_faiss = EmbeddingsFilter(
            embeddings=self.embeddings,
            similarity_threshold=faiss_embedding_filter_threshold,
        )
        
        # Create filters for Chroma
        relevant_filter_chroma = EmbeddingsFilter(
            embeddings=self.embeddings,
            similarity_threshold=chroma_embedding_filter_threshold,
        )
        
        # Create compressor pipelines
        document_pipeline_compressor_faiss = DocumentCompressorPipeline(
            transformers=[
                redundant_filter,
                relevant_filter_faiss,
                reordering,
            ]
        )
        
        document_pipeline_compressor_chroma = DocumentCompressorPipeline(
            transformers=[
                redundant_filter,
                relevant_filter_chroma,
                reordering,
            ]
        )
        
        # Create compression retrievers
        compression_retriever_with_faiss = ContextualCompressionRetriever(
            base_compressor=document_pipeline_compressor_faiss,
            base_retriever=self.faiss_retriever,
        )
        
        compression_retriever_with_chroma = ContextualCompressionRetriever(
            base_compressor=document_pipeline_compressor_chroma,
            base_retriever=self.chroma_retriever,
        )
        
        # Create merger retriever
        self.merger_retriever = MergerRetriever(
            retrievers=[
                compression_retriever_with_faiss,
                compression_retriever_with_chroma
            ]
        )
        
        print("[INFO] ----> Ensemble merger retriever built successfully.....")
        return self
    
    def generate_answer(self, question, pipeline_type="RetrievalQAWithSourcesChain", chain_type="stuff", verbose=False):
        """
        Generate answer using the RAG pipeline.
        
        Args:
            question (str): Question to answer
            pipeline_type (str): Type of RAG pipeline ("RetrievalQAWithSourcesChain" or "RetrievalQAChain")
            chain_type (str): Type of chain for retrieval QA
            verbose (bool): Whether to print the result
            
        Returns:
            str: Generated answer
        """
        if self.merger_retriever is None or self.llm_model is None:
            print("[ERROR] ----> Merger retriever and LLM must be set up first")
            return None
            
        print(f"[INFO] ----> Running the {pipeline_type} Pipeline.....\n")
        
        if pipeline_type == "RetrievalQAWithSourcesChain":
            rag_pipeline = RetrievalQAWithSourcesChain.from_chain_type(
                llm=self.llm_model,
                chain_type=chain_type,
                retriever=self.merger_retriever,
                return_source_documents=True
            )
            print(f"[INFO] ----> Generating the answer for:\nQuestion: {question}")
            generated_answer = rag_pipeline.invoke(question)
            result = generated_answer["answer"]
            
        elif pipeline_type == "RetrievalQAChain":
            rag_pipeline = RetrievalQA.from_chain_type(
                llm=self.llm_model,
                chain_type=chain_type,
                retriever=self.merger_retriever,
                return_source_documents=True
            )
            print(f"[INFO] ----> Generating the answer for:\nQuestion: {question}")
            generated_answer = rag_pipeline.invoke(question)
            result = generated_answer["result"]
        
        print("[INFO] ----> Result Answer generated.....\n")
        
        if verbose:
            print(f"Result:\n{result}")
            
        return result
    
    def print_documents(self, docs):
        """
        Print documents for inspection.
        
        Args:
            docs (list): List of documents to print
        """
        print(
            f"\n{'-' * 100}\n".join(
                [
                    f"Document {index + 1}:- \n\n" + d.page_content for index, d in enumerate(docs)
                ]
            )
        )


# Example usage:
if __name__ == "__main__":
    # Create the ensemble retriever
    retriever = EnsembleMergerRetriever()
    
    # Set up the LLM model
    retriever.setup_llm(model_name="gemma2-9b-it")
    
    # Load and split documents
    retriever.load_and_split_documents(
        data_path="Data\\ReAct.pdf",
        chunk_size=512,
        chunk_overlap=128
    )
    
    # Set up embeddings
    retriever.setup_embeddings(embedding_provider="Google")
    
    # Create retrievers
    retriever.create_retrievers(
        faiss_search_type="similarity",
        faiss_k_documents=3,
        chroma_search_type="similarity",
        chroma_k_documents=4
    )
    
    # Build the merger retriever
    retriever.build_merger_retriever(
        faiss_embedding_filter_threshold=0.75,
        chroma_embedding_filter_threshold=0.6
    )
    
    # Generate answer
    query = "Explain in details about the Chain of thought prompting as mentioned in ReAct Paper."
    answer = retriever.generate_answer(
        question=query,
        pipeline_type="RetrievalQAWithSourcesChain",
        chain_type="stuff",
        verbose=True
    )
    
    print("\n")
    print("===" * 100)
    print(f"Final Answer:\n{answer}")


[INFO] ----> Loading and splitting the document, please wait....


[INFO] ----> Total Pages in the original document are: 33
[INFO] ----> Total Document Chunks Created are: 293

[INFO] ----> Using the Google-AI Embedding model...
[INFO] ----> Google-AI Embedding model loaded.

[INFO] ----> Creating the FAISS Vectorstore, please wait.....
[INFO] ----> FAISS Retriever Created successfully.....

[INFO] ----> Creating the Chroma Vectorstore, please wait.....
[INFO] ----> Chroma Retriever Created successfully.....

[INFO] ----> Building the ensemble merger retriever, please wait.....
[INFO] ----> Ensemble merger retriever built successfully.....
[INFO] ----> Running the RetrievalQAWithSourcesChain Pipeline.....

[INFO] ----> Generating the answer for:
Question: Explain in details about the Chain of thought prompting as mentioned in ReAct Paper.
[INFO] ----> Result Answer generated.....

Result:
The ReAct paper proposes a new prompting method called ReAct that encourages models to generate 

In [4]:
query = "Explain about LLMs with reference to the ReAct Algorithm"

answer = retriever.generate_answer(
    question=query,
    pipeline_type="RetrievalQAWithSourcesChain",
    chain_type="stuff",
    verbose=True
)

[INFO] ----> Running the RetrievalQAWithSourcesChain Pipeline.....

[INFO] ----> Generating the answer for:
Question: Explain about LLMs with reference to the ReAct Algorithm
[INFO] ----> Result Answer generated.....

Result:
ReAct is a paradigm that combines reasoning and acting with language models (LLMs) to solve diverse language reasoning and decision-making tasks. 

Unlike other methods that rely on expensive datasets and human feedback for policy learning, ReAct learns a policy in a more cost-effective way by prompting LLMs to generate both verbal reasoning traces and actions. 

This approach is inspired by the observation that humans often verbalize their reasoning process before taking action. 

ReAct's key contributions include:

* Demonstrating the feasibility of combining reasoning and action with LLMs in an interactive environment within a closed-loop system.
* Exploring the value of internal reasoning versus external feedback for improving task performance.
* Providing a g

In [9]:
retriever = EnsembleMergerRetriever()

retriever.setup_llm() \
    .load_and_split_documents("Data\\Attention.pdf") \
    .setup_embeddings() \
    .create_retrievers() \
    .build_merger_retriever()


answer = retriever.generate_answer("Explain the concept of Self-Attention mechanism")


[INFO] ----> Loading and splitting the document, please wait....


[INFO] ----> Total Pages in the original document are: 15
[INFO] ----> Total Document Chunks Created are: 108

[INFO] ----> Using the Google-AI Embedding model...
[INFO] ----> Google-AI Embedding model loaded.

[INFO] ----> Creating the FAISS Vectorstore, please wait.....
[INFO] ----> FAISS Retriever Created successfully.....

[INFO] ----> Creating the Chroma Vectorstore, please wait.....
[INFO] ----> Chroma Retriever Created successfully.....

[INFO] ----> Building the ensemble merger retriever, please wait.....
[INFO] ----> Ensemble merger retriever built successfully.....
[INFO] ----> Running the RetrievalQAWithSourcesChain Pipeline.....

[INFO] ----> Generating the answer for:
Question: Explain the concept of Self-Attention mechanism
[INFO] ----> Result Answer generated.....



In [7]:
print(answer)

Self-attention is an attention mechanism that relates different positions of a single sequence to compute a representation of the sequence. 

It allows the model to weigh the importance of different words in a sentence when understanding its meaning. 

Self-attention has been successfully used in various natural language processing tasks, such as reading comprehension and abstractive summarization.




In [None]:
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader

from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_groq import ChatGroq

from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.retrievers import ContextualCompressionRetriever
from langchain_chroma import Chroma
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain.retrievers.document_compressors import EmbeddingsFilter
from langchain_community.document_transformers import LongContextReorder
from langchain.retrievers.merger_retriever import MergerRetriever
from langchain.chains import RetrievalQAWithSourcesChain, RetrievalQA

from dotenv import load_dotenv, find_dotenv
import os
import asyncio
from typing import List, Dict, Any, Optional


class AsyncEnsembleMergerRetriever:
    def __init__(self, load_env=True):
        """
        Initialize the AsyncEnsembleMergerRetriever class.
        
        Args:
            load_env (bool): Whether to load environment variables from .env file
        """
        if load_env:
            load_dotenv(find_dotenv())
            os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
            os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
        
        self.faiss_retriever = None
        self.chroma_retriever = None
        self.embeddings = None
        self.merger_retriever = None
        self.llm_model = None
        self.documents = None
        self.document_chunks = None
    
    async def setup_llm(self, model_name="gemma2-9b-it", temperature=0.4, max_tokens=512, top_p=0.9):
        """
        Set up the LLM model asynchronously.
        
        Args:
            model_name (str): Model name (options: "llama3-70b-8192", "gemma2-9b-it", "qwen-qwq-32b")
            temperature (float): Temperature for generation
            max_tokens (int): Maximum tokens to generate
            top_p (float): Top-p value for generation
            
        Returns:
            AsyncEnsembleMergerRetriever: self instance for method chaining
        """
        # Create the LLM model
        self.llm_model = ChatGroq(
            model=model_name,
            temperature=temperature,
            max_tokens=max_tokens,
            model_kwargs={
                "top_p": top_p,
            },
        )
        return self
    
    async def load_and_split_documents(self, data_path, chunk_size=512, chunk_overlap=128):
        """
        Load and split documents for retrieval asynchronously.
        
        Args:
            data_path (str): Path to the PDF document
            chunk_size (int): Size of chunks for splitting
            chunk_overlap (int): Overlap between chunks
            
        Returns:
            AsyncEnsembleMergerRetriever: self instance for method chaining
        """
        print("\n[INFO] ----> Loading and splitting the document, please wait....\n")
        
        # Create loader first
        loader = PyMuPDFLoader(data_path)
        
        # Load documents (run in a separate thread to not block the event loop)
        try:
            self.documents = await asyncio.to_thread(loader.load)
        except Exception as e:
            print(f"[ERROR] ----> Error loading document: {e}")
            # Fallback to synchronous loading if needed
            self.documents = loader.load()
            
        print("\n[INFO] ----> Total Pages in the original document are: {}".format(len(self.documents)))
        
        # Split documents (run in a separate thread)
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
        )
        
        try:
            self.document_chunks = await asyncio.to_thread(splitter.split_documents, self.documents)
        except Exception as e:
            print(f"[ERROR] ----> Error splitting documents: {e}")
            # Fallback to synchronous splitting if needed
            self.document_chunks = splitter.split_documents(self.documents)
            
        print("[INFO] ----> Total Document Chunks Created are: {}\n".format(len(self.document_chunks)))
        
        return self
    
    async def setup_embeddings(self, embedding_provider="Google"):
        """
        Set up embeddings for vectorstores asynchronously.
        
        Args:
            embedding_provider (str): Provider for embeddings ("Google" or "OpenAI")
            
        Returns:
            AsyncEnsembleMergerRetriever: self instance for method chaining
        """
        if embedding_provider == "Google":
            print("[INFO] ----> Using the Google-AI Embedding model...")
            self.embeddings = GoogleGenerativeAIEmbeddings(
                model="models/embedding-001"
            )
            print("[INFO] ----> Google-AI Embedding model loaded.\n")
        
        elif embedding_provider == "OpenAI":
            print("[INFO] ----> Using the Open-AI Embedding model...")
            self.embeddings = OpenAIEmbeddings()
            print("[INFO] ----> OpenAI Embedding model loaded.\n")
        
        return self
    
    async def create_retrievers(self, 
                         faiss_search_type="similarity", 
                         faiss_k_documents=3,
                         chroma_search_type="similarity", 
                         chroma_k_documents=4):
        """
        Create FAISS and Chroma retrievers asynchronously.
        
        Args:
            faiss_search_type (str): Search type for FAISS retriever
            faiss_k_documents (int): Number of documents to retrieve with FAISS
            chroma_search_type (str): Search type for Chroma retriever
            chroma_k_documents (int): Number of documents to retrieve with Chroma
            
        Returns:
            AsyncEnsembleMergerRetriever: self instance for method chaining
        """
        if self.document_chunks is None or self.embeddings is None:
            print("[ERROR] ----> Documents and embeddings must be set up first")
            return self
        
        # Create both retrievers concurrently for better performance
        print("[INFO] ----> Creating FAISS and Chroma Vectorstores, please wait.....")
        
        # FAISS task
        async def create_faiss():
            try:
                faiss_db = await asyncio.to_thread(
                    FAISS.from_documents,
                    self.document_chunks, 
                    self.embeddings
                )
                return faiss_db
            except Exception as e:
                print(f"[ERROR] ----> Error creating FAISS vectorstore: {e}")
                # Fallback to synchronous creation
                return FAISS.from_documents(self.document_chunks, self.embeddings)
        
        # Chroma task
        async def create_chroma():
            try:
                chroma_db = await asyncio.to_thread(
                    Chroma.from_documents,
                    self.document_chunks,
                    self.embeddings,
                )
                return chroma_db
            except Exception as e:
                print(f"[ERROR] ----> Error creating Chroma vectorstore: {e}")
                # Fallback to synchronous creation
                return Chroma.from_documents(self.document_chunks, self.embeddings)
        
        # Run both tasks concurrently
        faiss_task = create_faiss()
        chroma_task = create_chroma()
        
        # Wait for both tasks to complete
        faiss_db, chroma_db = await asyncio.gather(faiss_task, chroma_task)
        
        # Set up the retrievers
        self.faiss_retriever = faiss_db.as_retriever(
            search_type=faiss_search_type,
            search_kwargs={
                "k": faiss_k_documents
            }
        )
        print("[INFO] ----> FAISS Retriever Created successfully.....\n")
        
        self.chroma_retriever = chroma_db.as_retriever(
            search_type=chroma_search_type,
            search_kwargs={
                "k": chroma_k_documents,
            }
        )
        print("[INFO] ----> Chroma Retriever Created successfully.....\n")
        
        return self
    
    async def build_merger_retriever(self, 
                              faiss_embedding_filter_threshold=0.75,
                              chroma_embedding_filter_threshold=0.6):
        """
        Build the ensemble merger retriever with contextual compression asynchronously.
        
        Args:
            faiss_embedding_filter_threshold (float): Similarity threshold for FAISS embeddings filter
            chroma_embedding_filter_threshold (float): Similarity threshold for Chroma embeddings filter
            
        Returns:
            AsyncEnsembleMergerRetriever: self instance for method chaining
        """
        if self.faiss_retriever is None or self.chroma_retriever is None:
            print("[ERROR] ----> FAISS and Chroma retrievers must be created first")
            return self
            
        print("[INFO] ----> Building the ensemble merger retriever, please wait.....")
        
        # Create shared filters
        redundant_filter = EmbeddingsRedundantFilter(embeddings=self.embeddings)
        reordering = LongContextReorder()
        
        # Create filters for FAISS
        relevant_filter_faiss = EmbeddingsFilter(
            embeddings=self.embeddings,
            similarity_threshold=faiss_embedding_filter_threshold,
        )
        
        # Create filters for Chroma
        relevant_filter_chroma = EmbeddingsFilter(
            embeddings=self.embeddings,
            similarity_threshold=chroma_embedding_filter_threshold,
        )
        
        # Create compressor pipelines
        document_pipeline_compressor_faiss = DocumentCompressorPipeline(
            transformers=[
                redundant_filter,
                relevant_filter_faiss,
                reordering,
            ]
        )
        
        document_pipeline_compressor_chroma = DocumentCompressorPipeline(
            transformers=[
                redundant_filter,
                relevant_filter_chroma,
                reordering,
            ]
        )
        
        # Create compression retrievers
        compression_retriever_with_faiss = ContextualCompressionRetriever(
            base_compressor=document_pipeline_compressor_faiss,
            base_retriever=self.faiss_retriever,
        )
        
        compression_retriever_with_chroma = ContextualCompressionRetriever(
            base_compressor=document_pipeline_compressor_chroma,
            base_retriever=self.chroma_retriever,
        )
        
        # Create merger retriever
        self.merger_retriever = MergerRetriever(
            retrievers=[
                compression_retriever_with_faiss,
                compression_retriever_with_chroma
            ]
        )
        
        print("[INFO] ----> Ensemble merger retriever built successfully.....")
        return self
    
    async def generate_answer(self, question, pipeline_type="RetrievalQAWithSourcesChain", chain_type="stuff", verbose=False):
        """
        Generate answer using the RAG pipeline asynchronously.
        
        Args:
            question (str): Question to answer
            pipeline_type (str): Type of RAG pipeline ("RetrievalQAWithSourcesChain" or "RetrievalQAChain")
            chain_type (str): Type of chain for retrieval QA
            verbose (bool): Whether to print the result
            
        Returns:
            str: Generated answer
        """
        if self.merger_retriever is None or self.llm_model is None:
            print("[ERROR] ----> Merger retriever and LLM must be set up first")
            return None
            
        print(f"[INFO] ----> Running the {pipeline_type} Pipeline.....\n")
        
        try:
            if pipeline_type == "RetrievalQAWithSourcesChain":
                rag_pipeline = RetrievalQAWithSourcesChain.from_chain_type(
                    llm=self.llm_model,
                    chain_type=chain_type,
                    retriever=self.merger_retriever,
                    return_source_documents=True
                )
                print(f"[INFO] ----> Generating the answer for:\nQuestion: {question}")
                # Run in a separate thread to not block the event loop
                try:
                    generated_answer = await asyncio.to_thread(rag_pipeline.invoke, question)
                except RuntimeError as e:
                    print(f"[WARNING] ----> Runtime error in async execution: {e}. Falling back to synchronous.")
                    generated_answer = rag_pipeline.invoke(question)
                
                result = generated_answer["answer"]
                
            elif pipeline_type == "RetrievalQAChain":
                rag_pipeline = RetrievalQA.from_chain_type(
                    llm=self.llm_model,
                    chain_type=chain_type,
                    retriever=self.merger_retriever,
                    return_source_documents=True
                )
                print(f"[INFO] ----> Generating the answer for:\nQuestion: {question}")
                # Run in a separate thread to not block the event loop
                try:
                    generated_answer = await asyncio.to_thread(rag_pipeline.invoke, question)
                except RuntimeError as e:
                    print(f"[WARNING] ----> Runtime error in async execution: {e}. Falling back to synchronous.")
                    generated_answer = rag_pipeline.invoke(question)
                    
                result = generated_answer["result"]
            
            print("[INFO] ----> Result Answer generated.....\n")
            
            if verbose:
                print(f"Result:\n{result}")
                
            return result
        except Exception as e:
            print(f"[ERROR] ----> Error generating answer: {e}")
            return f"Failed to generate answer due to: {str(e)}"
    
    async def print_documents(self, docs):
        """
        Print documents for inspection.
        
        Args:
            docs (list): List of documents to print
        """
        print(
            f"\n{'-' * 100}\n".join(
                [
                    f"Document {index + 1}:- \n\n" + d.page_content for index, d in enumerate(docs)
                ]
            )
        )
    
    @classmethod
    async def create(cls, 
                    data_path, 
                    embedding_provider="Google",
                    model_name="gemma2-9b-it", 
                    chunk_size=512, 
                    chunk_overlap=128,
                    faiss_search_type="similarity", 
                    faiss_k_documents=3,
                    chroma_search_type="similarity", 
                    chroma_k_documents=4,
                    faiss_embedding_filter_threshold=0.75,
                    chroma_embedding_filter_threshold=0.6):
        """
        Factory method to create and setup the retriever in one go.
        
        Args:
            data_path (str): Path to the PDF document
            embedding_provider (str): Provider for embeddings ("Google" or "OpenAI")
            model_name (str): Model name for LLM
            chunk_size (int): Size of chunks for splitting
            chunk_overlap (int): Overlap between chunks
            faiss_search_type (str): Search type for FAISS retriever
            faiss_k_documents (int): Number of documents to retrieve with FAISS
            chroma_search_type (str): Search type for Chroma retriever
            chroma_k_documents (int): Number of documents to retrieve with Chroma
            faiss_embedding_filter_threshold (float): Similarity threshold for FAISS embeddings
            chroma_embedding_filter_threshold (float): Similarity threshold for Chroma embeddings
            
        Returns:
            AsyncEnsembleMergerRetriever: Fully configured instance
        """
        instance = cls()
        
        # Setup in parallel
        await instance.setup_llm(model_name=model_name)
        await instance.load_and_split_documents(
            data_path=data_path,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        await instance.setup_embeddings(embedding_provider=embedding_provider)
        
        # These operations depend on the previous ones
        await instance.create_retrievers(
            faiss_search_type=faiss_search_type,
            faiss_k_documents=faiss_k_documents,
            chroma_search_type=chroma_search_type,
            chroma_k_documents=chroma_k_documents
        )
        
        await instance.build_merger_retriever(
            faiss_embedding_filter_threshold=faiss_embedding_filter_threshold,
            chroma_embedding_filter_threshold=chroma_embedding_filter_threshold
        )
        
        return instance


# Example usage:
async def main():
    # Method 1: Step by step
    retriever = AsyncEnsembleMergerRetriever()
    
    # Set up components asynchronously
    await retriever.setup_llm(model_name="gemma2-9b-it")
    await retriever.load_and_split_documents(
        data_path="Data\\ReAct.pdf",
        chunk_size=512,
        chunk_overlap=128
    )

    await retriever.setup_embeddings(embedding_provider="Google")

    await retriever.create_retrievers(
        faiss_search_type="similarity",
        faiss_k_documents=3,
        chroma_search_type="mmr",
        chroma_k_documents=5
    )

    await retriever.build_merger_retriever(
        faiss_embedding_filter_threshold=0.75,
        chroma_embedding_filter_threshold=0.75,
    )
    
    # Generate answer
    query = "Explain in details about the Chain of thought prompting as mentioned in ReAct Paper."
    answer = await retriever.generate_answer(
        question=query,
        pipeline_type="RetrievalQAWithSourcesChain",
        chain_type="stuff",
        verbose=True
    )
    
    print("\n")
    print("===" * 100)
    print(f"Final Answer:\n{answer}")
    
    # Method 2: Using the factory method
    print("\n\nCreating a new retriever using the factory method...")
    retriever2 = await AsyncEnsembleMergerRetriever.create(
        data_path="Data\\ReAct.pdf",
        embedding_provider="Google",
        model_name="gemma2-9b-it"
    )
    
    answer2 = await retriever2.generate_answer(
        question=query,
        verbose=True
    )

def run_async_retriever():
    """
    Helper function to safely run the async code.
    This handles cases where it might be called from inside another event loop.
    """
    try:
        # Try to run using asyncio.run() (works when no event loop is running)
        asyncio.run(main())
    except RuntimeError:
        # If we're already in an event loop, get the current loop and run the coroutine
        loop = asyncio.get_event_loop()
        loop.create_task(main())
        # Note: In a Jupyter notebook or interactive environment, 
        # you may need additional handling

# if __name__ == "__main__":
#     run_async_retriever()

In [None]:
# Create and use the retriever asynchronously
retriever = await AsyncEnsembleMergerRetriever.create(
    data_path="Data\\ReAct.pdf",
    embedding_provider="Google",
)


[INFO] ----> Loading and splitting the document, please wait....


[INFO] ----> Total Pages in the original document are: 33
[INFO] ----> Total Document Chunks Created are: 293

[INFO] ----> Using the Google-AI Embedding model...
[INFO] ----> Google-AI Embedding model loaded.

[INFO] ----> Creating FAISS and Chroma Vectorstores, please wait.....
[INFO] ----> FAISS Retriever Created successfully.....

[INFO] ----> Chroma Retriever Created successfully.....

[INFO] ----> Building the ensemble merger retriever, please wait.....
[INFO] ----> Ensemble merger retriever built successfully.....


In [19]:
import langchain
langchain.debug=True

answer = await retriever.generate_answer("Explain with detail about the ReAct")
print("\n\n", answer)

[INFO] ----> Running the RetrievalQAWithSourcesChain Pipeline.....

[INFO] ----> Generating the answer for:
Question: Explain with detail about the ReAct
[32;1m[1;3m[chain/start][0m [1m[chain:RetrievalQAWithSourcesChain] Entering Chain run with input:
[0m{
  "question": "Explain with detail about the ReAct"
}
[32;1m[1;3m[chain/start][0m [1m[chain:RetrievalQAWithSourcesChain > chain:StuffDocumentsChain] Entering Chain run with input:
[0m[inputs]
[32;1m[1;3m[chain/start][0m [1m[chain:RetrievalQAWithSourcesChain > chain:StuffDocumentsChain > chain:LLMChain] Entering Chain run with input:
[0m{
  "question": "Explain with detail about the ReAct",
  "summaries": "Content: when ﬁnetuning is enabled, and in Section 4 how ReAct performance is robust to prompt selections.\nD) Human aligned and controllable: ReAct promises an interpretable sequential decision making\nand reasoning process where humans can easily inspect reasoning and factual correctness. Moreover,\nhumans can also 

 ReAct is a human-aligned and controllable sequential decision-making and reasoning system. 

It offers an interpretable process that allows humans to inspect the reasoning and factual correctness behind the agent's decisions. 

Furthermore, humans can control or correct the agent's behavior in real-time through a technique called "thought editing," as demonstrated in Figure 5.

