In [1]:
import logging
from dotenv import load_dotenv
import os
from fastembed import SparseTextEmbedding, TextEmbedding
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import SparseVector
from typing import List, Union
from sentence_transformers import CrossEncoder

from typing import List
import pprint
from colorama import Fore, Back, Style

from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI 
from llama_index.agent.openai import OpenAIAgent

# Load environment variables
load_dotenv()
Qdrant_API_KEY = os.getenv('Qdrant_API_KEY')
Qdrant_URL = os.getenv('Qdrant_URL')
Collection_Name = os.getenv('Collection_Name')

from pydantic import BaseModel


In [102]:
#  Search Strategy Interface
class SearchStrategy:
    def search(self, query: str, metadata_filter: None) -> List[str]:
        raise NotImplementedError

In [36]:


class SemanticSearch(SearchStrategy):
    def query_semantic_search(self, query: str, metadata_filter: models.Filter) -> List[str]:
        # Load the dense embedding model
        embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

        # Initialize the Qdrant client
        qdrant_client = QdrantClient(
            url=Qdrant_URL,
            api_key=Qdrant_API_KEY,
            timeout=30
        )

        # Embed the query using the dense embedding model
        dense_query = embedding_model.encode([query]).tolist()[0]

        # Perform the semantic search
        results = qdrant_client.search(
            collection_name=Collection_Name,
            query_vector=dense_query,
            query_filter=metadata_filter,
            limit=4,
        )

        return results

In [None]:
from sentence_transformers import CrossEncoder

# Define the reranker models
class SentenceTransformerRerank:
    def __init__(self, model, top_n):
        self.model = CrossEncoder(model)
        self.top_n = top_n

    def rerank(self, query, documents):
        # Compute the similarity scores between the query and each document
        scores = self.model.predict([(query, doc) for doc in documents])

        # Sort the documents based on their similarity scores
        ranked_documents = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)

        # Select the top documents
        top_documents = [doc for doc, score in ranked_documents[:self.top_n]]

        return top_documents

# Dictionary of reranker models
RERANKERS = {
    "WithoutReranker": None,
    "CrossEncoder": SentenceTransformerRerank(model='cross-encoder/ms-marco-MiniLM-L-6-v2', top_n=2),
    "bge-reranker-base": SentenceTransformerRerank(model="BAAI/bge-reranker-base", top_n=2),
    "bge-reranker-large": SentenceTransformerRerank(model="BAAI/bge-reranker-large", top_n=2)
}

# ReRankingAgent function
def ReRankingAgent(query, documents, reranking_model: str):
    # Get the reranker model based on user preference
    reranker = RERANKERS.get(reranking_model)

    if reranker is None:
        # If no reranker is specified, return the documents as is
        return documents

    # Perform reranking
    top_documents = reranker.rerank(query, documents)

    return top_documents



In [109]:
class HybridSearch(SearchStrategy):
    def query_hybrid_search(self, query: str) -> List[str]:

        embedding_model = TextEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")
        sparse_embedding_model = SparseTextEmbedding(model_name="Qdrant/bm42-all-minilm-l6-v2-attentions")
        qdrant_client = QdrantClient(
            url=Qdrant_URL,
            api_key=Qdrant_API_KEY,
            timeout=30
        )

        # Embed the query using the dense embedding model
        dense_query = list(embedding_model.embed([query]))[0].tolist()

        # Embed the query using the sparse embedding model
        sparse_query = list(sparse_embedding_model.embed([query]))[0]

        results = qdrant_client.query_points(
            collection_name=Collection_Name,
            prefetch=[
                models.Prefetch(
                    query=models.SparseVector(indices=sparse_query.indices.tolist(), values=sparse_query.values.tolist()),
                    using="sparse",
                    limit=4,
                ),
                models.Prefetch(
                    query=dense_query,
                    using="dense",
                    limit=4,
                ),
            ],
            query=models.FusionQuery(fusion=models.Fusion.RRF), #Reciprocal Rerank Fusion
        )
        
        # Extract the text from the payload of each scored point
        documents = [point.payload['text'] for point in results.points]

        return documents

In [110]:
# Factory Function to Get the Appropriate Search Strategy
def get_search_strategy(search_type: str) -> SearchStrategy:
    if search_type == 'semantic':
        return SemanticSearch()
    elif search_type == 'hybrid':
        return HybridSearch()
    else:
        raise ValueError("Invalid search type")

In [111]:
def retriever(search_type: str, query: str, reranking_model: str):
        """
        Perform the search and retrieval process based on the specified search type, query, metadata filter, and reranking model.
        """
        print("Starting the search and retrieval process")
        search_strategy = get_search_strategy(search_type)
        documents = search_strategy.query_hybrid_search(query)
        print("Search and retrieval process completed")
        reranked_documents = ReRankingAgent(query, documents, reranking_model)
        print("Reranking of the retrieved documents is complete")

        return reranked_documents

In [None]:
docs = retriever('hybrid', 'what is self-RAG?', reranking_model='CrossEncoder')
docs

In [93]:
from qdrant_client.http.models import Filter
from pydantic import BaseModel, validator
from qdrant_client.http.models import Filter
import json
from typing import Any

# RetrieverAgent function
def RetrieverAgent(state: dict) -> OpenAIAgent:
    '''        
    class MyModel(BaseModel):
       
        search_type: str
        query: str
        metadata_filter: Any
        reranking_model: str

        class Config:
            arbitrary_types_allowed = True
    '''

    def retriever(search_type: str, query: str, metadata_filter: Any, reranking_model: str):
        """
        Perform the search and retrieval process based on the specified search type, query, metadata filter, and reranking model.
        """
        print("Starting the search and retrieval process")
        search_strategy = get_search_strategy(search_type)
        documents = search_strategy.search(query, metadata_filter, reranking_model)
        print("Search and retrieval process completed")
        reranked_documents = ReRankingAgent(query, documents, reranking_model)
        print("Reranking of the retrieved documents is complete")

        return reranked_documents

    def done() -> None:
        """
        Signal that the retrieval process is complete and update the state.
        """
        logging.info("Retrieval process is complete and updating the state")
        state["current_speaker"] = None
        state["just_finished"] = True

    tools = [
        
        FunctionTool.from_defaults(fn=retriever),
        FunctionTool.from_defaults(fn=done),
    ]

    system_prompt = (f"""
    You are a helpful assistant that is performing search and retrieval tasks for a retrieval-augmented generation (RAG) system.
    Your task is to retrieve documents based on the user's query, search type, metadata filter, and reranking model.
    To do this, you need to know the search type, query, metadata filter, and reranking model.
    You can ask the user to supply these details.
    If the user supplies the necessary information, then call the tool "retriever" using the provided details to perform the search and retrieval process.
    But remember for search() function do not use search_type argument and for ReRankingAgent function don't use metadata_filter and search arguments 
    The current user state is:
    {pprint.pformat(state, indent=4)}
    When you have completed the retrieval process, call the tool "done" to signal that you are done.
    If the user asks to do anything other than retrieve documents, call the tool "done" to signal that some other agent should help.
    """)

    return OpenAIAgent.from_tools(
        tools,
        llm=OpenAI(model="gpt-3.5-turbo"),
        system_prompt=system_prompt,
    )



In [94]:
state = {}
agent = RetrieverAgent(state = state)
response = agent.chat("I want to query what is self-RAG? with hybrid search type and SELF-RAG.pdf as add metadata filter, following with CrossEncoder Reranking model")

Starting the search and retrieval process
Starting the search and retrieval process
Starting the search and retrieval process
Starting the search and retrieval process


In [95]:
response

AgentChatResponse(response='It seems there is still an issue with the retrieval process. Let me try to retrieve the documents again with the correct arguments.', sources=[ToolOutput(content='Error: SearchStrategy.search() takes 3 positional arguments but 4 were given', tool_name='retriever', raw_input={'kwargs': {'search_type': 'hybrid', 'query': 'what is self-RAG?', 'metadata_filter': 'SELF-RAG.pdf', 'reranking_model': 'CrossEncoder'}}, raw_output=TypeError('SearchStrategy.search() takes 3 positional arguments but 4 were given'), is_error=False), ToolOutput(content='Error: SearchStrategy.search() takes 3 positional arguments but 4 were given', tool_name='retriever', raw_input={'kwargs': {'search_type': 'hybrid', 'query': 'what is self-RAG?', 'metadata_filter': 'SELF-RAG.pdf', 'reranking_model': 'CrossEncoder'}}, raw_output=TypeError('SearchStrategy.search() takes 3 positional arguments but 4 were given'), is_error=False), ToolOutput(content='None', tool_name='done', raw_input={'args':

In [None]:
"I encountered an error while trying to retrieve the documents.
 Let me attempt the retrieval process again by specifying the search type, query, metadata filter, 
 and reranking model all at once.
"

In [1]:
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core import PromptTemplate
from llama_index.core import Settings
from llama_index.core.query_engine import CustomQueryEngine
from retriever_agent import Retriever
from llama_index.llms.openai import OpenAI
from dotenv import load_dotenv
from llama_index.core.response_synthesizers import BaseSynthesizer
import os

load_dotenv()

  return self.fget.__get__(instance, owner)()


True

In [7]:

def prompt_template():
    """
    Define the prompt template for generating explanations based on the context and query.
    """
    prompt_str = """
    You are an AI assistant specializing in explaining complex topics related to Retrieval-Augmented Generation(RAG).
    Your task is to provide a clear, concise, and informative explanation based on the following context and query.

    Context:
    {context_str}

    Query: {query_str}

    Please follow these guidelines in your response:
    1. Start with a brief overview of the concept mentioned in the query.
    2. Provide at least one concrete example or use case to illustrate the concept.
    3. If there are any limitations or challenges associated with this concept, briefly mention them.
    4. Conclude with a sentence about the potential future impact or applications of this concept.

    Your explanation should be informative yet accessible, suitable for someone with a basic understanding of RAG.
    If the query asks for information not present in the context, please state that you don't have enough information to provide a complete answer,
    and only respond based on the given context.

    Response:
    """
    prompt_tmpl = PromptTemplate(prompt_str)
    return prompt_tmpl

def prompt_generation(state):
    """
    Generate the prompt for the given search type, query, and reranking model.
    """
    state = state
    retriever_agent = Retriever(state)
    reranked_documents = retriever_agent.retriever()

    context = "\n\n".join(reranked_documents)
    query = state.get('query')
    prompt_templ = prompt_template().format(context_str=context, query_str=query)

    return prompt_templ

In [8]:
state = {   'chunk_overlap': None,
    'chunk_size': None,
    'current_speaker': 'Concierge',
    'embedding_model': None,
    'input_dir': None,
    'just_finished': False,
    'query': 'what is self-RAG?',
    'reranking_model': 'crossencoder',
    'search_type': 'hybrid',
    'session_token': None}

prompt = prompt_generation(state)

Starting the search and retrieval process


Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Search and retrieval process completed
Reranking of the retrieved documents is complete


In [10]:
prompt

"\n    You are an AI assistant specializing in explaining complex topics related to Retrieval-Augmented Generation(RAG).\n    Your task is to provide a clear, concise, and informative explanation based on the following context and query.\n\n    Context:\n    ragnarök a reusable rag framework and baselines for trec 2024 retrievalaugmented generation track conference17 july 2017 washington dc usa figure 4 webui dark mode showcasing the ragnarök system arena for the user query on why have used car prices increased from trec2024 researchy with two different blinded pipelines the output tab displays the answers in humanreadable form figure 5 the responses tab for the example in figure 4 note that the responses tab reformats the final answers into the json format expected by the io definitions of the trec 2024 rag track\n\npreprint selfrag learning to retrieve  generate and critique through selfreflection akari asai zeqiu wu yizhong wang avirup sil hannaneh hajishirzi university of washingto

In [11]:
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core import PromptTemplate
from llama_index.core import Settings
from llama_index.core.query_engine import CustomQueryEngine
from retriever_agent import RetrieverAgent
from llama_index.llms.openai import OpenAI
from llama_index.agent.openai import OpenAIAgent
from dotenv import load_dotenv
from llama_index.core.response_synthesizers import BaseSynthesizer
from llama_index.core.tools import FunctionTool
import os
import pprint


load_dotenv()

def prompt_template():
    """
    Define the prompt template for generating explanations based on the context and query.
    """
    prompt_str = """
    You are an AI assistant specializing in explaining complex topics related to Retrieval-Augmented Generation(RAG).
    Your task is to provide a clear, concise, and informative explanation based on the following context and query.

    Context:
    {context_str}

    Query: {query_str}

    Please follow these guidelines in your response:
    1. Start with a brief overview of the concept mentioned in the query.
    2. Provide at least one concrete example or use case to illustrate the concept.
    3. If there are any limitations or challenges associated with this concept, briefly mention them.
    4. Conclude with a sentence about the potential future impact or applications of this concept.

    Your explanation should be informative yet accessible, suitable for someone with a basic understanding of RAG.
    If the query asks for information not present in the context, please state that you don't have enough information to provide a complete answer,
    and only respond based on the given context.

    Response:
    """
    prompt_tmpl = PromptTemplate(prompt_str)
    return prompt_tmpl

def prompt_generation(state):
    """
    Generate the prompt for the given search type, query, and reranking model.
    """
    state = state
    retriever_agent = Retriever(state)
    reranked_documents = retriever_agent.retriever()

    context = "\n\n".join(reranked_documents)
    query = state.get('query')
    prompt_templ = prompt_template().format(context_str=context, query_str=query)

    return prompt_templ

class RAGStringQueryEngine(CustomQueryEngine):
    llm: OpenAI
    response_synthesizer: BaseSynthesizer

    def custom_query(self, prompt: str) -> str:
        """
        Generate a response for the given prompt using the LLM and response synthesizer.
        """
        response = self.llm.complete(prompt)
        summary = self.response_synthesizer.get_response(query_str=str(response), text_chunks=str(prompt))

        return str(summary)
    
def create_query_engine(prompt: str):
    """
    Create a query engine for generating responses based on the given prompt.
    """
    llm = OpenAI(model="gpt-3.5-turbo")
    response_synthesizer = TreeSummarize(llm=llm)

    query_engine = RAGStringQueryEngine(
        llm=llm,
        response_synthesizer=response_synthesizer,
    )
    response = query_engine.query(prompt)
    return response.response




In [27]:

def GenerationAgent(state: dict) -> OpenAIAgent:
    """
    Define the GenerationAgent for generating explanations based on the user's query, search type, and reranking model.
    """

    def generation(state):
        """
        Generate an explanation based on the given search type, query, and reranking model.
        """
        prompt = prompt_generation(state)
        print("Passing the ReRanked documents to the LLM")
        response = create_query_engine(prompt)
        print("Retrieved the summarized respopnse from LLMs")
        return str(response)

    def done() -> None:
        """
        Signal that the retrieval process is complete and update the state.
        """
        print("Retrieval process is complete and updating the state")
        state["current_speaker"] = None
        state["just_finished"] = True

    tools = [
        FunctionTool.from_defaults(fn=generation),
        FunctionTool.from_defaults(fn=done),
    ]

    system_prompt = f"""
    You are a helpful assistant that is performing search and retrieval tasks for a retrieval-augmented generation (RAG) system.
    Your task is to retrieve documents based on the user's query, search type, and reranking model.
    To do this, you need to know the search type, query, and reranking model.
    You can ask the user to supply these details.
    If the user supplies the necessary information, then call the tool "generation" using the provided details to perform the search and retrieval process.
    The current user state is:
    {pprint.pformat(state, indent=4)}
    When you have completed the retrieval process, call the tool "done" to signal that you are done.
    If the user asks to do anything other than retrieve documents, call the tool "done" to signal that some other agent should help.
    """

    return OpenAIAgent.from_tools(
        tools,
        llm=OpenAI(model="gpt-3.5-turbo"),
        system_prompt=system_prompt,
    )

In [28]:
 state = {   'chunk_overlap': None,
    'chunk_size': None,
    'current_speaker': None,
    'embedding_model': None,
    'input_dir': None,
    'just_finished': False,
    'query': 'what is self-RAG?',
    'reranking_model': None,
    'search_type': 'hybrid',
    'session_token': None}

In [29]:
agent = GenerationAgent(state=state)
response = agent.chat("I want to query what is a Ragnarök framework? Also can you use hybrid search along with crossencoder reranking model")
print(response)

Starting the search and retrieval process


Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Search and retrieval process completed
Reranking of the retrieved documents is complete
Passing the ReRanked documents to the LLM
Retrieved the summarized respopnse from LLMs
Retrieval process is complete and updating the state
If you need any more assistance or have any other queries, feel free to let me know!


In [21]:
response

AgentChatResponse(response='I have completed the retrieval process.', sources=[ToolOutput(content='None', tool_name='generation', raw_input={'args': (), 'kwargs': {'state': {'query': 'what is self-RAG?', 'search_type': 'hybrid', 'reranking_model': 'crossencoder'}}}, raw_output=None, is_error=False)], source_nodes=[], is_dummy_stream=False, metadata=None)

In [8]:

from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI 
from llama_index.agent.openai import OpenAIAgent
from pydantic import BaseModel

In [17]:
def generation(search_type: str, query: str, reranking_model: str):
        """
        Generate an explanation based on the given search type, query, and reranking model.
        """
        #prompt_gen = prompt_template_generation()
        prompt = prompt_generation(search_type, query, reranking_model)
        print("Starting the search and retrieval process")
        response = create_query_engine(prompt)

        return response

In [18]:
response = generation(search_type, query, reranking_model)

Starting the search and retrieval process


Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Search and retrieval process completed
Reranking of the retrieved documents is complete
Starting the search and retrieval process


In [19]:
response

'Self-RAG, or Self-Reflective Retrieval-Augmented Generation, is a framework that aims to enhance the quality and factuality of responses generated by large language models (LLMs) by incorporating retrieval of relevant knowledge and self-reflection during the generation process. This approach allows the model to adaptively retrieve passages on-demand and reflect on both the retrieved information and its own generated content using special tokens called reflection tokens. An example application of Self-RAG could be in open-domain question answering, where it dynamically retrieves information to generate accurate and coherent responses. However, a potential challenge of Self-RAG is the complexity of training and fine-tuning the model to balance retrieval, generation, and self-reflection processes, as well as ensuring the quality and relevance of retrieved passages for real-world applications. Despite these challenges, Self-RAG holds promise for enhancing various knowledge-intensive tasks

In [25]:
def GenerationAgent(state: dict) -> OpenAIAgent:
    """
    Define the GenerationAgent for generating explanations based on the user's query, search type, and reranking model.
    """
    def generation(search_type: str, query: str, reranking_model: str):
        """
        Generate an explanation based on the given search type, query, and reranking model.
        """
        #prompt_gen = prompt_template_generation()
        prompt = prompt_generation(search_type, query, reranking_model)
        print("Starting the search and retrieval process")
        response = create_query_engine(prompt)
        print("Retrieved the respopnse from LLMs")
        return print(response)

    def done() -> None:
        """
        Signal that the retrieval process is complete and update the state.
        """
        print("Retrieval process is complete and updating the state")
        state["current_speaker"] = None
        state["just_finished"] = True

    tools = [
        FunctionTool.from_defaults(fn=generation),
        FunctionTool.from_defaults(fn=done),
    ]

    system_prompt = f"""
    You are a helpful assistant that is performing search and retrieval tasks for a retrieval-augmented generation (RAG) system.
    Your task is to retrieve documents based on the user's query, search type, and reranking model.
    To do this, you need to know the search type, query, and reranking model.
    You can ask the user to supply these details.
    If the user supplies the necessary information, then call the tool "generation" using the provided details to perform the search and retrieval process.
    The current user state is:
    {pprint.pformat(state, indent=4)}
    When you have completed the retrieval process, call the tool "done" to signal that you are done.
    If the user asks to do anything other than retrieve documents, call the tool "done" to signal that some other agent should help.
    """

    return OpenAIAgent.from_tools(
        tools,
        llm=OpenAI(model="gpt-3.5-turbo"),
        system_prompt=system_prompt,
    )

In [1]:
from document_pre_processing_agent import DocumentPreprocessingAgent
from indexing_agent import QdrantIndexingAgent
from generation_agent import GenerationAgent

  return self.fget.__get__(instance, owner)()


In [2]:
def get_agent(agent_name, state):
    agents = {
        "Data_pre_processing": DocumentPreprocessingAgent,
        "Indexing": QdrantIndexingAgent,
        "Generation": GenerationAgent,
        #"Concierge": continuation_agent_factory,
        # Add other agents here
    }
    return agents.get(agent_name, None)(state)

In [3]:
state = {}
agent = get_agent("Generation", state=state)
agent

<llama_index.agent.openai.base.OpenAIAgent at 0x21494f37a10>

In [4]:
#response = agent.chat(user_msg_str, chat_history=current_history)
response = agent.chat("I want to query what is self-RAG? with hybrid search and following with CrossEncoder Reranking model")
response   


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


AgentChatResponse(response='I encountered an error while trying to finalize the process. I will attempt to resolve this issue.', sources=[ToolOutput(content="Error: 'OpenAIAgent' object has no attribute 'retriever'", tool_name='generation', raw_input={'kwargs': {'search_type': 'Hybrid', 'query': 'what is self-RAG?', 'reranking_model': 'CrossEncoder'}}, raw_output=AttributeError("'OpenAIAgent' object has no attribute 'retriever'"), is_error=False), ToolOutput(content="Error: 'OpenAIAgent' object has no attribute 'retriever'", tool_name='generation', raw_input={'kwargs': {'search_type': 'Hybrid', 'query': 'what is self-RAG?', 'reranking_model': 'CrossEncoder'}}, raw_output=AttributeError("'OpenAIAgent' object has no attribute 'retriever'"), is_error=False), ToolOutput(content="Error: name 'logging' is not defined", tool_name='done', raw_input={'kwargs': {}}, raw_output=NameError("name 'logging' is not defined"), is_error=False), ToolOutput(content="Error: 'OpenAIAgent' object has no attr

In [11]:
import pprint

In [26]:

state = {}
agent = GenerationAgent(state=state)
response = agent.chat("Can you use hybrid search to query what is self-RAG? and with crossencoder Reranking model")
print(response)


Starting the search and retrieval process


Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Search and retrieval process completed
Reranking of the retrieved documents is complete
Starting the search and retrieval process
Retrieved the respopnse from LLMs
Self-RAG, or Self-Reflective Retrieval-Augmented Generation, is a framework that aims to enhance the quality and factuality of large language models by incorporating retrieval of relevant knowledge and self-reflection during the generation process. This approach allows the model to adaptively retrieve passages on-demand, generate responses based on this retrieved knowledge, and reflect on both the retrieved information and its own generated content using special tokens called reflection tokens. The goal is to improve the accuracy and overall quality of the model's responses, particularly in tasks like open-domain question answering, by iteratively enhancing factual accuracy and response quality through self-reflection.
I couldn't find any relevant information on "self-RAG" using the hybrid search with the crossencoder rerank

In [24]:
print(response)

If you need any more assistance or have any other queries, feel free to let me know!


In [4]:
from dotenv import load_dotenv
load_dotenv()

from enum import Enum
from typing import List
import pprint
from colorama import Fore, Back, Style

from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI 
from llama_index.agent.openai import OpenAIAgent
from document_pre_processing_agent import DocumentPreprocessingAgent
from indexing_agent import QdrantIndexingAgent
from generation_agent import GenerationAgent

In [5]:

class Speaker(str, Enum):
    Data_pre_processing = "data_pre_processing"
    Indexing = "indexing"
    Retriever = "retriever"
    ReRanking = "reranking"
    Generation = "generation"
    Concierge = "Concierge"
    ORCHESTRATOR = "orchestrator"

In [6]:
def concierge_agent_factory(state: dict) -> OpenAIAgent:
    def dummy_tool() -> bool:
        """A tool that does nothing."""
        print("Doing nothing.")

    tools = [
        FunctionTool.from_defaults(fn=dummy_tool)
    ]

    system_prompt = (f"""
        You are a helpful assistant that is helping a user navigate the process of querying and indexing their documents using this customizable RAG application.
        Your job is to ask the user questions to figure out what they want to do, and give them the available scenario's.
        That includes:
        * pre-processing and indexing the documents/files into Qdrant vector database using user preferred chunking strategies and embedding models.
        * generating a response to the user query using user preferred search type and reranking model.

        The current state of the user is:
        {pprint.pformat(state, indent=4)}
    """)

    return OpenAIAgent.from_tools(
        tools,
        llm=OpenAI(model="gpt-3.5-turbo"),
        system_prompt=system_prompt,
    )

In [7]:
def continuation_agent_factory(state: dict) -> OpenAIAgent:
    def dummy_tool() -> bool:
        """A tool that does nothing."""
        print("Doing nothing.")

    tools = [
        FunctionTool.from_defaults(fn=dummy_tool)
    ]

    system_prompt = (f"""
        The current state of the user is:
        {pprint.pformat(state, indent=4)}
    """)

    return OpenAIAgent.from_tools(
        tools,
        llm=OpenAI(model="gpt-3.5-turbo", temperature=0.4),
        system_prompt=system_prompt,
    )

In [8]:

def orchestration_agent_factory(state: dict) -> OpenAIAgent:
    def has_input_dir() -> bool:
        """Useful for checking if the user has specified an input file directory."""
        print("Orchestrator checking if input file directory is specified")
        return (state["input_dir"] is not None)

    def has_chunk_size() -> bool:
        """Useful for checking if the user has specified a chunk size."""
        print("Orchestrator checking if chunk size is specified")
        return (state["chunk_size"] is not None)

    def has_chunk_overlap() -> bool:
        """Useful for checking if the user has specified a chunk overlap."""
        print("Orchestrator checking if chunk overlap is specified")
        return (state["chunk_overlap"] is not None)

    def has_embedding_model() -> bool:
        """Useful for checking if the user has specified an embedding model."""
        print("Orchestrator checking if embedding model is specified")
        return (state["embedding_model"] is not None)

    def has_reranking_model() -> bool:
        """Useful for checking if the user has specified a reranking model."""
        print("Orchestrator checking if reranking model is specified")
        return (state["reranking_model"] is not None)

    def has_search_type() -> bool:
        """Useful for checking if the user has specified a search type."""
        print("Orchestrator checking if search type is specified")
        return (state["search_type"] is not None)    

    def has_query() -> bool:
        """Useful for checking if the user has specified query."""
        print("Orchestrator checking if query is specified")
        return (state["query"] is not None)  

    tools = [
        FunctionTool.from_defaults(fn=has_input_dir),
        FunctionTool.from_defaults(fn=has_chunk_size),
        FunctionTool.from_defaults(fn=has_chunk_overlap),
        FunctionTool.from_defaults(fn=has_embedding_model),
        FunctionTool.from_defaults(fn=has_reranking_model),
        FunctionTool.from_defaults(fn=has_search_type),
        FunctionTool.from_defaults(fn=has_query),
    ]

    system_prompt =  (f"""
    You are the orchestration agent.
    Your job is to decide which agent to run based on the current state of the user and what they've asked to do. Agents are identified by short strings.
    What you do is return the name of the agent to run next. You do not do anything else.

    The current state of the user is:
    {pprint.pformat(state, indent=4)}

    If a current_speaker is already selected in the state, simply output that value.

    If there is no current_speaker value, look at the chat history and the current state and you MUST return one of these strings identifying an agent to run:
    * "{Speaker.Data_pre_processing.value}" - if the user wants to pre-process the documents into nodes
        * If they want to pre-process the documents, but they haven't specified an input file, chunk size, or chunk overlap, return "{Speaker.Concierge.value}" instead
    * "{Speaker.Indexing.value}" - if the user wants to embed and index the nodes into a vector database
         * If they want to embed and index the nodes, but there is no preprocessed data, return "{Speaker.Data_pre_processing.value}" instead
        * If they want to embed and index the nodes, but they haven't specified an embedding model, return "{Speaker.Concierge.value}" instead
    * "{Speaker.Generation.value}" - if the user wants to query the documents (requires query, search type, and reranking model)
        * If they want to query the documents, but they haven't specified the query, search type, or reranking model, return "{Speaker.Concierge.value}" instead
    * "{Speaker.Concierge.value}" - if the user wants to do something else, or hasn't said what they want to do, or you can't figure out what they want to do. Choose this by default.

    Output one of these strings and ONLY these strings, without quotes.
    NEVER respond with anything other than one of the above strings. DO NOT be helpful or conversational.
    """)

    return OpenAIAgent.from_tools(
        tools,
        llm=OpenAI(model="gpt-3.5-turbo", temperature=0.4),
        system_prompt=system_prompt,
    )


In [9]:
def get_initial_state() -> dict:
    return {
        "session_token": None,
        "input_dir": None,
        "chunk_size": None,
        "chunk_overlap": None,
        "embedding_model": None,
        "reranking_model": None,
        "search_type": None,
        "query": None,
        "current_speaker": None,
        "just_finished": False,
    }


In [10]:

def get_agent(agent_name, state):
    agents = {
        "Data_pre_processing": DocumentPreprocessingAgent,
        "Indexing": QdrantIndexingAgent,
        "Generation": GenerationAgent,
        "Concierge": continuation_agent_factory,
        # Add other agents here
    }
    return agents.get(agent_name, None)(state)

In [11]:

def run() -> None:
    state = get_initial_state()

    root_memory = ChatMemoryBuffer.from_defaults(token_limit=8000)

    first_run = True
    is_retry = False

    while True:
        if first_run:
            # if this is the first run, start the conversation
            user_msg_str = "Hello there!"
            first_run = False
        elif is_retry == True:
            user_msg_str = "That's not right, try again. Pick one agent."
            is_retry = False
        elif state["just_finished"] == True:
            print("Asking the continuation agent to decide what to do next")
            user_msg_str = str(continuation_agent_factory(state).chat("""
                Look at the chat history to date and figure out what the user was originally trying to do.
                They might have had to do some sub-tasks to complete that task, but what we want is the original thing they started out trying to do.
                Formulate a sentence as if written by the user that asks to continue that task.
                If it seems like the user really completed their task, output "no_further_task" only.
            """, chat_history=current_history))
            print(f"Continuation agent said {user_msg_str}")
            if user_msg_str == "no_further_task":
                user_msg_str = input(">> ").strip()
            state["just_finished"] = False
        else:
            # any other time, get user input
            user_msg_str = input("> ").strip()

        current_history = root_memory.get()

        # who should speak next?
        if (state["current_speaker"]):
            print(f"There's already a speaker: {state['current_speaker']}")
            next_speaker = state["current_speaker"]
        else:
            print("No current speaker, asking orchestration agent to decide")
            orchestration_response = orchestration_agent_factory(state).chat(user_msg_str, chat_history=current_history)
            next_speaker = str(orchestration_response).strip()

        print(f"Next speaker: {next_speaker}")

        agent_class = get_agent(next_speaker, state)
        if agent_class:
            current_speaker = agent_class
            state["current_speaker"] = next_speaker
        else:
            print("Orchestration agent failed to return a valid speaker; ask it to try again")
            is_retry = True
            continue

        #pretty_state = pprint.pformat(state, indent=4)
        
        #print(f"State: {pretty_state}")

        # chat with the current speaker
        response = current_speaker.chat(user_msg_str, chat_history=current_history)
        print(Fore.MAGENTA + str(response) + Style.RESET_ALL)

        # update chat history
        new_history = current_speaker.memory.get_all()
        root_memory.set(new_history)


In [12]:
run()

No current speaker, asking orchestration agent to decide


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


Next speaker: Concierge
State: {   'chunk_overlap': None,
    'chunk_size': None,
    'current_speaker': 'Concierge',
    'embedding_model': None,
    'input_dir': None,
    'just_finished': False,
    'query': None,
    'reranking_model': None,
    'search_type': None,
    'session_token': None}


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


[35mHello! How can I assist you today?[0m


In [1]:
import os
import json
import re
from llama_index.core.schema import Document
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import SimpleDirectoryReader

In [2]:
import logging
from dotenv import load_dotenv
import os
import json
from fastembed import SparseTextEmbedding, TextEmbedding
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct, SparseVector
from tqdm import tqdm

In [3]:
load_dotenv()

True

In [4]:
Qdrant_API_KEY = os.getenv('Qdrant_API_KEY')
Qdrant_URL = os.getenv('Qdrant_URL')
Collection_Name = os.getenv('collection_name')
qdrant_client = QdrantClient(
                            url=Qdrant_URL,
                            api_key=Qdrant_API_KEY)

In [5]:
Collection_Name

'Agentic-Automation-RAG'

In [6]:
def load_nodes():
        metadata = []
        documents = []
        payload_file = r'C:\Users\pavan\Desktop\Generative AI\RAG-Automation-Using-Llamaindex-Agents-and-Qdrant\data\nodes.json'

        try:
            with open(payload_file, 'r') as file:
                nodes = json.load(file)

            for node in nodes:
                metadata.append(node['metadata'])
                documents.append(node['text'])

            print(f"Loaded {len(nodes)} the nodes from JSON file")

        except Exception as e:
            logging.error(f"Error loading nodes from JSON file: {e}")
            raise

        return documents, metadata

In [9]:
def client_collection():
    """
    Create a collection in Qdrant vector database.
    """
    
    if not qdrant_client.collection_exists(collection_name=Collection_Name): 
        qdrant_client.create_collection(
            collection_name= Collection_Name,
            vectors_config={
                    'dense': models.VectorParams(
                        size=384,
                        distance = models.Distance.COSINE,
                    )
            },
            sparse_vectors_config={
                "sparse": models.SparseVectorParams(
                            index=models.SparseIndexParams(
                            on_disk=False,              
                        ),
                    )
                }
        )
        
    print(f"Created collection '{Collection_Name}' in Qdrant vector database.")


In [10]:
client_collection()

Created collection 'Agentic-Automation-RAG' in Qdrant vector database.


In [11]:
def create_sparse_vector(sparse_embedding_model, text):
        """
        Create a sparse vector from the text using SPLADE.
        """
        sparse_embedding_model = sparse_embedding_model
        # Generate the sparse vector using SPLADE model
        embeddings = list(sparse_embedding_model.embed([text]))[0]

        # Check if embeddings has indices and values attributes
        if hasattr(embeddings, 'indices') and hasattr(embeddings, 'values'):
            sparse_vector = models.SparseVector(
                indices=embeddings.indices.tolist(),
                values=embeddings.values.tolist()
            )
            return sparse_vector
        else:
            raise ValueError("The embeddings object does not have 'indices' and 'values' attributes.")

In [12]:
def insert_documents(embedding_model, documents, metadata):
        points = []
        embedding_model = TextEmbedding(model_name=embedding_model)
        sparse_embedding_model = SparseTextEmbedding(model_name="Qdrant/bm42-all-minilm-l6-v2-attentions")
        for i, (doc, metadata) in enumerate(tqdm(zip(documents, metadata), total=len(documents))):
            # Generate both dense and sparse embeddings
            dense_embedding = list(embedding_model.embed([doc]))[0]
            sparse_vector = create_sparse_vector(sparse_embedding_model, doc)

            # Create PointStruct
            point = models.PointStruct(
                id=i,
                vector={
                    'dense': dense_embedding.tolist(),
                    'sparse': sparse_vector,
                },
                payload={
                    'text': doc,
                    **metadata  # Include all metadata
                }
            )
            points.append(point)

        # Upsert points
        qdrant_client.upsert(
            collection_name=Collection_Name,
            points=points
        )

        print(f"Upserted {len(points)} points with dense and sparse vectors into Qdrant vector database.")

In [13]:
def indexing(embedding_model) -> None:
        """
        Index the documents into the Qdrant vector database.
        """
        print("Starting to load the nodes from JSON file")
        documents, metadata = load_nodes()
        client_collection()
        print("Creation of the Qdrant Collection is Done")
        insert_documents(embedding_model, documents, metadata)
        print("Indexing of the nodes is complete")
    
    

In [14]:
indexing(embedding_model = "sentence-transformers/all-MiniLM-L6-v2")

Starting to load the nodes from JSON file
Loaded 71 the nodes from JSON file
Created collection 'Agentic-Automation-RAG' in Qdrant vector database.
Creation of the Qdrant Collection is Done


Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

100%|██████████| 71/71 [00:16<00:00,  4.25it/s]


Upserted 71 points with dense and sparse vectors into Qdrant vector database.
Indexing of the nodes is complete


In [9]:
qdrant_client.set_model("sentence-transformers/all-MiniLM-L6-v2")
# comment this line to use dense vectors only
qdrant_client.set_sparse_model("prithivida/Splade_PP_en_v1")

Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

In [10]:
qdrant_client.recreate_collection(
    collection_name="Hybrid_RAG_Collection",
    vectors_config=qdrant_client.get_fastembed_vector_params(),
    # comment this line to use dense vectors only
    sparse_vectors_config=qdrant_client.get_fastembed_sparse_vector_params(),  
)

  qdrant_client.recreate_collection(
INFO:httpx:HTTP Request: DELETE https://c77ac75e-3a41-4acc-98d2-c9c3eb11b5ea.us-east4-0.gcp.cloud.qdrant.io:6333/collections/Hybrid_RAG_Collection "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: PUT https://c77ac75e-3a41-4acc-98d2-c9c3eb11b5ea.us-east4-0.gcp.cloud.qdrant.io:6333/collections/Hybrid_RAG_Collection "HTTP/1.1 200 OK"


True

In [11]:
import json

input_file = r"C:\Users\pavan\Desktop\Generative AI\RAG-Automation-Using-Llamaindex-Agents-and-Qdrant\data\nodes.json"
metadata = []
documents = []

with open(input_file, 'r') as file:
        nodes = json.load(file)

for node in nodes:
    metadata.append(node['metadata'])
    documents.append(node['text'])

In [12]:
len(documents)

71

In [13]:
from tqdm import tqdm

qdrant_client.add(
    collection_name="Hybrid_RAG_Collection",
    documents=documents,
    metadata=metadata,
    ids=tqdm(range(len(documents))),
)

  0%|          | 0/71 [00:00<?, ?it/s]INFO:httpx:HTTP Request: GET https://c77ac75e-3a41-4acc-98d2-c9c3eb11b5ea.us-east4-0.gcp.cloud.qdrant.io:6333/collections/Hybrid_RAG_Collection "HTTP/1.1 200 OK"


In [5]:
from dotenv import load_dotenv
import os

In [6]:
load_dotenv()
Qdrant_API_KEY = os.getenv('Qdrant_API_KEY')
Qdrant_URL = os.getenv('Qdrant_URL')
Collection_Name = os.getenv('Collection_Name')


In [1]:
from qdrant_client import QdrantClient
from qdrant_client.http import models
from sentence_transformers import SentenceTransformer

def query_semantic_search(query, metadata_filter=None, limit=4):
    # Load the dense embedding model
    embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

    # Initialize the Qdrant client
    qdrant_client = QdrantClient(
        url=Qdrant_URL,
        api_key=Qdrant_API_KEY,
        timeout=30
    )

    # Embed the query using the dense embedding model
    dense_query = embedding_model.encode([query]).tolist()[0]

    # Perform the semantic search
    results = qdrant_client.search(
        collection_name=Collection_Name,
        query_vector=dense_query,
        query_filter=metadata_filter,
        limit=limit,
    )

    return results


In [None]:
query = query_semantic_search(query = "can you explain what is SELF-RAG means?")

In [None]:
indexing(embedding_model = 'sentence-transformers/all-MiniLM-L6-v2')

In [None]:
def DocumentPreprocessingAgent(state: dict) -> OpenAIAgent:

    

    
    

    def done() -> None:
        """When you inserted the vetors into the Qdrant Cluster, call this tool."""
        print("Indexing of the nodes is complete")
        state["current_speaker"] = None
        state["just_finished"] = True

    tools = [
        FunctionTool.from_defaults(fn=indexing),
        FunctionTool.from_defaults(fn=done),
    ]

    system_prompt = (f"""
    You are a helpful assistant that is indexing documents for a retrieval-augmented generation (RAG) system.
    Your task is to index the documents into a Qdrant cluster.
    To do this, you need to know the embedding model to use.
    You can ask the user to supply this.
    If the user supplies the embedding model, call the tool "indexing" with this parameter to index the documents into the Qdrant cluster.
    The current user state is:
    {pprint.pformat(state, indent=4)}
    When you have indexed the documents into the Qdrant cluster, call the tool "done" to signal that you are done.
    If the user asks to do anything other than index the documents, call the tool "done" to signal some other agent should help.
    """)

    return OpenAIAgent.from_tools(
        tools,
        llm=OpenAI(model="gpt-3.5-turbo"),
        system_prompt=system_prompt,
    )
