In [1]:
from pathlib import Path
from tqdm.asyncio import tqdm

# Data Connector
from llama_index import SimpleDirectoryReader
# Index
from llama_index import VectorStoreIndex


# Llama Index LLM
from llama_index import ServiceContext
from llama_index import get_response_synthesizer
from llama_index import PromptTemplate

# Other LLM
from langchain.llms import OpenAI

# Retriever
from llama_index.indices.vector_store.retrievers import VectorIndexRetriever
from llama_index.retrievers import BM25Retriever

# Embeddings
from sentence_transformers import SentenceTransformer
from langchain.embeddings import SentenceTransformerEmbeddings

# Display
from llama_index.response.notebook_utils import display_source_node


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import nest_asyncio
nest_asyncio.apply()

nest_asyncio.apply() patches the existing event loop in a Jupyter Notebook environment to allow nested usage of asyncio.

It is utilized later in the notebook to ensure that the asyncio event loop functions correctly within a Jupyter Notebook environment, enabling the concurrent execution of multiple asynchronous retrieval tasks without encountering event loop compatibility issues.

# LLM

In [3]:
# Initialize the SentenceTransformerEmbeddings with the loaded model
local_embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")

In [4]:
# Initialize the Mistral model from LM studio server
llm = OpenAI(openai_api_key="NULL",temperature=0,openai_api_base="http://192.168.48.33:1234/v1")
# Initialize service context : LLM and Embeddings model for the vector store
service_context = ServiceContext.from_defaults(llm=llm, embed_model=local_embeddings)

  warn_deprecated(


# Import data

In [5]:
# Initialize the data connector/ reader. 
# SimpleDirectoryReader adapt to the document format.
reader = SimpleDirectoryReader(
    input_files=["thesis.pdf"]
)

documents  = reader.load_data()
print(f"Loaded {len(documents)} docs")

Loaded 138 docs


# Load in vector store

In [6]:
# Initialize a simple vector store index 
index = VectorStoreIndex.from_documents(
    documents, service_context=service_context
)

# Smart app

In [7]:
#query_str = "explain me how we used langchain in the methodology?"

In [8]:
query_gen_prompt_str = (
    "You are a helpful assistant that generates multiple search queries based on a "
    "single input query. Generate {num_queries} search queries, one on each line, "
    "related to the following input query:\n"
    "Query: {query}\n"
    "Queries:\n"
)
query_gen_prompt = PromptTemplate(query_gen_prompt_str)

In [9]:
def generate_queries(llm, query_str: str, num_queries: int = 4):
    fmt_prompt = query_gen_prompt.format(
        num_queries=num_queries - 1, query=query_str # remove the original query
    )
    
    response = llm.generate([fmt_prompt])
    
    # Assuming there's only one generation in the response
    if response.generations and len(response.generations[0]) > 0:
        generation_text = response.generations[0][0].text
        queries = generation_text.split("\n")
        return queries
    else:
        return []
    

In [10]:
#queries = generate_queries(llm, query_str, num_queries=4)

### More examples


In [11]:
async def run_queries(queries, retrievers):
    """
    Run queries against retrievers asynchronously.

    :param queries: A list of queries to be processed.
    :param retrievers: A list of retriever objects that will process the queries.
    :return: A dictionary mapping each query and its index to its corresponding result.
    """
    tasks = []
    for query in queries:
        # For each query, iterate over each retriever.
        for i, retriever in enumerate(retrievers):
            # For each retriever, create an asynchronous task to retrieve the query
            # and add it to the tasks list.
            tasks.append(retriever.aretrieve(query))

    task_results = await tqdm.gather(*tasks)

    results_dict = {}
     # Iterate over each pair of query and its result.
    for i, (query, query_result) in enumerate(zip(queries, task_results)):
        # Map each query and its index to its result in the dictionary.
        results_dict[(query, i)] = query_result

    return results_dict

In [12]:
# vector retriever
vector_retriever = index.as_retriever(similarity_top_k=2)

In [13]:
# bm25 retriever
bm25_retriever = BM25Retriever.from_defaults(
    docstore=index.docstore, similarity_top_k=2
)

In [14]:
#results_dict = await run_queries(queries, [vector_retriever, bm25_retriever])

In [15]:
# Llama index function 
def fuse_results(results_dict, similarity_top_k: int = 2):
    """Fuse results."""
    k = 60.0  # `k` is a parameter used to control the impact of outlier rankings.
    fused_scores = {}
    text_to_node = {}

    # compute reciprocal rank scores
    for nodes_with_scores in results_dict.values():
        for rank, node_with_score in enumerate(
            sorted(
                nodes_with_scores, key=lambda x: x.score or 0.0, reverse=True
            )
        ):
            text = node_with_score.node.get_content()
            text_to_node[text] = node_with_score
            if text not in fused_scores:
                fused_scores[text] = 0.0
            fused_scores[text] += 1.0 / (rank + k)

    # sort results
    reranked_results = dict(
        sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    )

    # adjust node scores
    reranked_nodes: List[NodeWithScore] = []
    for text, score in reranked_results.items():
        reranked_nodes.append(text_to_node[text])
        reranked_nodes[-1].score = score

    return reranked_nodes[:similarity_top_k]

In [16]:
#final_results = fuse_results(results_dict)

# Plug into RetrieverQueryEngine

In [17]:
from llama_index import QueryBundle
from llama_index.retrievers import BaseRetriever
from typing import Any, List
from llama_index.schema import NodeWithScore

from llama_index.query_engine import RetrieverQueryEngine

In [18]:
query_str = "explain me how we used langchain in the methodology?"
queries = generate_queries(llm, query_str, num_queries=4)
results_dict = await run_queries(queries, [vector_retriever, bm25_retriever])

100%|██████████| 6/6 [00:00<00:00, 91.20it/s]


In [24]:
class FusionRetriever(BaseRetriever):
    """Ensemble retriever with fusion."""

    def __init__(
        self,
        llm,
        retrievers: List[BaseRetriever],
        similarity_top_k: int = 2,
    ) -> None:
        """Init params."""
        self.llm = llm  # Store the llm instance
        #self.query_str = " "
        self._retrievers = retrievers
        self._similarity_top_k = similarity_top_k
        super().__init__()

    def _retrieve(self, query_bundle) -> List[NodeWithScore]:
        """Retrieve."""
        queries = generate_queries(self.llm, query_str, num_queries=4)  # Use the llm instance
        result =  run_queries(queries, [vector_retriever, bm25_retriever]) 
        final_results = fuse_results(
            results_dict, similarity_top_k=self._similarity_top_k
        )

        return final_results

SyntaxError: 'await' outside async function (2561157583.py, line 20)

In [20]:
llm = OpenAI(openai_api_key="NULL",temperature=0,openai_api_base="http://192.168.48.33:1234/v1")
service_context = ServiceContext.from_defaults(llm=llm, embed_model=local_embeddings)

In [21]:
fusion_retriever = FusionRetriever(
    llm, [vector_retriever, bm25_retriever], similarity_top_k=2
)

response_synthesizer= get_response_synthesizer(service_context,streaming=True) # streaming False for classic answer generation

In [22]:
# Initialize the RetrieverQueryEngine
query_engine = RetrieverQueryEngine.from_args(
    retriever=fusion_retriever,
    response_synthesizer=response_synthesizer,
    service_context=service_context, 
    streaming=True # streaming False for classic answer generation
)

In [23]:
streaming_response = query_engine.query(
    "Tell me about the document",
)

streaming_response.print_response_stream()

TypeError: object of type 'coroutine' has no len()

In [None]:
streaming_response = query_engine.query(
    "What are the conclusions for the future of the tool in the company",
)

streaming_response.print_response_stream()