# Hierarchical Indices in Document Retrieval

## Overview

This code implements a Hierarchical Indexing system for document retrieval, utilizing two levels of encoding: document-level summaries and detailed chunks. This approach aims to improve the efficiency and relevance of information retrieval by first identifying relevant document sections through summaries, then drilling down to specific details within those sections.

## Motivation

Traditional flat indexing methods can struggle with large documents or corpus, potentially missing context or returning irrelevant information. Hierarchical indexing addresses this by creating a two-tier search system, allowing for more efficient and context-aware retrieval.

## Key Components

1. PDF processing and text chunking
2. Asynchronous document summarization using OpenAI's GPT-4
3. Vector store creation for both summaries and detailed chunks using FAISS and OpenAI embeddings
4. Custom hierarchical retrieval function

## Method Details

### Document Preprocessing and Encoding

1. The PDF is loaded and split into documents (likely by page).
2. Each document is summarized asynchronously using GPT-4.
3. The original documents are also split into smaller, detailed chunks.
4. Two separate vector stores are created:
   - One for document-level summaries
   - One for detailed chunks

### Asynchronous Processing and Rate Limiting

1. The code uses asynchronous programming (asyncio) to improve efficiency.
2. Implements batching and exponential backoff to handle API rate limits.

### Hierarchical Retrieval

The `retrieve_hierarchical` function implements the two-tier search:

1. It first searches the summary vector store to identify relevant document sections.
2. For each relevant summary, it then searches the detailed chunk vector store, filtering by the corresponding page number.
3. This approach ensures that detailed information is retrieved only from the most relevant document sections.

## Benefits of this Approach

1. Improved Retrieval Efficiency: By first searching summaries, the system can quickly identify relevant document sections without processing all detailed chunks.
2. Better Context Preservation: The hierarchical approach helps maintain the broader context of retrieved information.
3. Scalability: This method is particularly beneficial for large documents or corpus, where flat searching might be inefficient or miss important context.
4. Flexibility: The system allows for adjusting the number of summaries and chunks retrieved, enabling fine-tuning for different use cases.

## Implementation Details

1. Asynchronous Programming: Utilizes Python's asyncio for efficient I/O operations and API calls.
2. Rate Limit Handling: Implements batching and exponential backoff to manage API rate limits effectively.
3. Persistent Storage: Saves the generated vector stores locally to avoid unnecessary recomputation.

## Conclusion

Hierarchical indexing represents a sophisticated approach to document retrieval, particularly suitable for large or complex document sets. By leveraging both high-level summaries and detailed chunks, it offers a balance between broad context understanding and specific information retrieval. This method has potential applications in various fields requiring efficient and context-aware information retrieval, such as legal document analysis, academic research, or large-scale content management systems.

<div style="text-align: center;">

<img src="../images/hierarchical_indices.svg" alt="hierarchical_indices" style="width:50%; height:auto;">
</div>

<div style="text-align: center;">

<img src="../images/hierarchical_indices_example.svg" alt="hierarchical_indices" style="width:100%; height:auto;">
</div>

In [1]:
!pip install pandas pyPDF2 langchain_openai langchain langchain-community faiss-cpu python-dotenv pyarrow fastparquet huggingface_hub ipywidgets pypdf ragas --quiet

In [13]:
import asyncio
import random
from openai import RateLimitError


async def exponential_backoff(attempt):
    """
    Implements exponential backoff with a jitter.
    
    Args:
        attempt: The current retry attempt number.
        
    Waits for a period of time before retrying the operation.
    The wait time is calculated as (2^attempt) + a random fraction of a second.
    """
    # Calculate the wait time with exponential backoff and jitter
    wait_time = (2 ** attempt) + random.uniform(0, 1)
    print(f"Rate limit hit. Retrying in {wait_time:.2f} seconds...")

    # Asynchronously sleep for the calculated wait time
    await asyncio.sleep(wait_time)


async def retry_with_exponential_backoff(coroutine, max_retries=5):
    """
    Retries a coroutine using exponential backoff upon encountering a RateLimitError.
    
    Args:
        coroutine: The coroutine to be executed.
        max_retries: The maximum number of retry attempts.
        
    Returns:
        The result of the coroutine if successful.
        
    Raises:
        The last encountered exception if all retry attempts fail.
    """
    for attempt in range(max_retries):
        try:
            # Attempt to execute the coroutine
            return await coroutine
        except RateLimitError as e:
            # If the last attempt also fails, raise the exception
            if attempt == max_retries - 1:
                raise e

            # Wait for an exponential backoff period before retrying
            await exponential_backoff(attempt)

    # If max retries are reached without success, raise an exception
    raise Exception("Max retries reached")

### Import libraries 

In [14]:
import asyncio
import getpass
import os
import sys
from tqdm import tqdm
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI
from langchain_openai import AzureOpenAIEmbeddings
from langchain.chains.summarize.chain import load_summarize_chain
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document


sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) # Add the parent directory to the path sicnce we work with notebooks


# Load environment variables from a .env file
load_dotenv('../.env')

# Set the OpenAI API key environment variable
api_endpoint = os.getenv('AZURE_OPENAI_ENDPOINT') 
api_key=os.getenv('AZURE_OPENAI_API_KEY')
llm_deployment_name = os.getenv('AZURE_OPENAI_MODEL_NAME')
embedding_deployment_name = os.getenv('AZURE_OPENAI_EMBEDDING_MODEL')
api_version = '2024-02-15-preview' # this might change in the future


if "AZURE_OPENAI_API_KEY" not in os.environ:
    os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass(
        "Enter your AzureOpenAI API key: "
    )
os.environ["AZURE_OPENAI_ENDPOINT"] = api_endpoint

### Define document path

In [15]:
path = "../data/Understanding_Climate_Change.pdf"

In [16]:
llm = AzureChatOpenAI(
        model=llm_deployment_name,
        azure_deployment=llm_deployment_name,
        api_version=api_version,
         )

embeddings = AzureOpenAIEmbeddings()

### Function to encode to both summary and chunk levels, sharing the page metadata

In [17]:
async def encode_pdf_hierarchical(path, chunk_size=1000, chunk_overlap=200, is_string=False):
    """
    Asynchronously encodes a PDF book into a hierarchical vector store using OpenAI embeddings.
    Includes rate limit handling with exponential backoff.
    
    Args:
        path: The path to the PDF file.
        chunk_size: The desired size of each text chunk.
        chunk_overlap: The amount of overlap between consecutive chunks.
        
    Returns:
        A tuple containing two FAISS vector stores:
        1. Document-level summaries
        2. Detailed chunks
    """
    
    # Load PDF documents
    if not is_string:
        loader = PyPDFLoader(path)
        documents = await asyncio.to_thread(loader.load)
    else:
        text_splitter = RecursiveCharacterTextSplitter(
            # Set a really small chunk size, just to show.
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            is_separator_regex=False,
        )
        documents = text_splitter.create_documents([path])


    # Create document-level summaries
    summary_llm = AzureChatOpenAI(
        model=llm_deployment_name,
        azure_deployment=llm_deployment_name,
        api_version=api_version,
        
    )
    # summary_llm = AzureChatOpenAI(temperature=0, model_name="gpt-4o-mini", max_tokens=4000)
    summary_chain = load_summarize_chain(summary_llm, chain_type="map_reduce")
    
    async def summarize_doc(doc):
        """
        Summarizes a single document with rate limit handling.
        
        Args:
            doc: The document to be summarized.
            
        Returns:
            A summarized Document object.
        """
        # Retry the summarization with exponential backoff
        summary_output = await retry_with_exponential_backoff(summary_chain.ainvoke([doc]))
        summary = summary_output['output_text']
        return Document(
            page_content=summary,
            metadata={"source": path, "page": doc.metadata["page"], "summary": True}
        )

    # Process documents in smaller batches to avoid rate limits
    batch_size = 5  # Adjust this based on your rate limits
    summaries = []
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        batch_summaries = await asyncio.gather(*[summarize_doc(doc) for doc in batch])
        summaries.extend(batch_summaries)
        await asyncio.sleep(1)  # Short pause between batches

    # Split documents into detailed chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
    )
    detailed_chunks = await asyncio.to_thread(text_splitter.split_documents, documents)

    # Update metadata for detailed chunks
    for i, chunk in enumerate(detailed_chunks):
        chunk.metadata.update({
            "chunk_id": i,
            "summary": False,
            "page": int(chunk.metadata.get("page", 0))
        })

    # Create embeddings
    embeddings = AzureOpenAIEmbeddings()

    # Create vector stores asynchronously with rate limit handling
    async def create_vectorstore(docs):
        """
        Creates a vector store from a list of documents with rate limit handling.
        
        Args:
            docs: The list of documents to be embedded.
            
        Returns:
            A FAISS vector store containing the embedded documents.
        """
        return await retry_with_exponential_backoff(
            asyncio.to_thread(FAISS.from_documents, docs, embeddings)
        )

    # Generate vector stores for summaries and detailed chunks concurrently
    summary_vectorstore, detailed_vectorstore = await asyncio.gather(
        create_vectorstore(summaries),
        create_vectorstore(detailed_chunks)
    )

    return summary_vectorstore, detailed_vectorstore

### Encode the PDF book to both document-level summaries and detailed chunks if the vector stores do not exist


In [18]:
if os.path.exists("../vector_stores/summary_store") and os.path.exists("../vector_stores/detailed_store"):
   embeddings = AzureOpenAIEmbeddings()
   summary_store = FAISS.load_local("../vector_stores/summary_store", embeddings, allow_dangerous_deserialization=True)
   detailed_store = FAISS.load_local("../vector_stores/detailed_store", embeddings, allow_dangerous_deserialization=True)

else:
    summary_store, detailed_store = await encode_pdf_hierarchical(path)
    summary_store.save_local("../vector_stores/summary_store")
    detailed_store.save_local("../vector_stores/detailed_store")


### Retrieve information according to summary level, and then retrieve information from the chunk level vector store and filter according to the summary level pages

In [19]:
def retrieve_hierarchical(query, summary_vectorstore, detailed_vectorstore, k_summaries=3, k_chunks=5):
    """
    Performs a hierarchical retrieval using the query.

    Args:
        query: The search query.
        summary_vectorstore: The vector store containing document summaries.
        detailed_vectorstore: The vector store containing detailed chunks.
        k_summaries: The number of top summaries to retrieve.
        k_chunks: The number of detailed chunks to retrieve per summary.

    Returns:
        A list of relevant detailed chunks.
    """
    
    # Retrieve top summaries
    top_summaries = summary_vectorstore.similarity_search(query, k=k_summaries)
    
    relevant_chunks = []
    for summary in top_summaries:
        # For each summary, retrieve relevant detailed chunks
        page_number = summary.metadata["page"]
        page_filter = lambda metadata: metadata["page"] == page_number
        page_chunks = detailed_vectorstore.similarity_search(
            query, 
            k=k_chunks, 
            filter=page_filter
        )
        relevant_chunks.extend(page_chunks)
    
    return relevant_chunks

### Demonstrate on a use case

In [None]:
query = "What is the greenhouse effect?"
results = retrieve_hierarchical(query, summary_store, detailed_store)

# Print results
for chunk in results:
    print(f"Page: {chunk.metadata['page']}")
    print(f"Content: {chunk.page_content}...")  # Print first 100 characters
    print("---")

In [None]:
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate


# Assume we already have the retrieve_hierarchical function and the vector stores initialized

def query_with_hierarchical_retrieval(query, summary_vectorstore, detailed_vectorstore, llm):
    # Step 1: Use the hierarchical retrieval function to get relevant chunks
    relevant_chunks = retrieve_hierarchical(query, summary_vectorstore, detailed_vectorstore)
    
    # Combine the relevant chunks into a context string
    context = "\n\n".join([chunk.page_content for chunk in relevant_chunks])
    
    # Step 2: Define a prompt for the LLM to use this context to generate an answer
    prompt_template = """
    Given the following context, answer the query:
    
    Context:
    {context}
    
    Query:
    {query}
    
    Answer:
    """
    
    # Initialize the prompt template
    prompt = PromptTemplate(template=prompt_template, input_variables=["context", "query"])
    
    # Step 3: Run the query through the LLM
    chain = LLMChain(llm=llm, prompt=prompt)
    answer = chain.run(context=context, query=query)
    
    return answer, relevant_chunks


query = "What are the key benefits of sustainable energy solutions?"
answer, _ = query_with_hierarchical_retrieval(query, summary_store, detailed_store, llm)
answer


# Evaluation

## Metrics

### Faithfulness

This measures the factual consistency of the generated answer against the given context. It is calculated from answer and retrieved context. The answer is scaled to (0,1) range. Higher the better.

The generated answer is regarded as faithful if all the claims made in the answer can be inferred from the given context. To calculate this, a set of claims from the generated answer is first identified. Then each of these claims is cross-checked with the given context to determine if it can be inferred from the context.


### Response Relevancy
The evaluation metric, Answer Relevancy, focuses on assessing how pertinent the generated answer is to the given prompt. A lower score is assigned to answers that are incomplete or contain redundant information and higher scores indicate better relevancy. This metric is computed using the user_input, the retrived_contexts and the response.


### Context Recall
Context Recall measures how many of the relevant documents (or pieces of information) were successfully retrieved. It focuses on not missing important results. Higher recall means fewer relevant documents were left out. In short, recall is about not missing anything important. Since it is about not missing anything, calculating context recall always requires a reference to compare against.

### Context Precision
Context Precision is a metric that measures the proportion of relevant chunks in the retrieved_contexts. It is calculated as the mean of the precision@k for each chunk in the context. Precision@k is the ratio of the number of relevant chunks at rank k to the total number of chunks at rank k.



### Answer Correctness

The assessment of Answer Correctness involves gauging the accuracy of the generated answer when compared to the ground truth. This evaluation relies on the ground truth and the answer, with scores ranging from 0 to 1. A higher score indicates a closer alignment between the generated answer and the ground truth, signifying better correctness.

Answer correctness encompasses two critical aspects: semantic similarity between the generated answer and the ground truth, as well as factual similarity. These aspects are combined using a weighted scheme to formulate the answer correctness score. Users also have the option to employ a 'threshold' value to round the resulting score to binary, if desired.



In [11]:
from ragas.metrics import (
    context_precision,
    answer_relevancy,
    faithfulness,
    context_recall,
    answer_correctness,
)

# list of metrics we're going to use
metrics = [
    faithfulness,
    answer_relevancy,
    answer_correctness,
    context_recall,
    context_precision,
]

In [None]:
from datasets import load_from_disk




# Load the test set from the specified path
testset_path = "../data/testset"
testset_df = load_from_disk(testset_path)
testset_df

In [None]:
def populate_responses(dataset, summary_store, detailed_store, llm):
    """
    Populates the 'response' and 'contexts' columns in the dataset using the query_with_hierarchical_retrieval function.
    Also renames the existing 'contexts' column to 'groundtruth_contexts'.

    Args:
        dataset: The dataset containing the questions.
        summary_store: The vector store containing document summaries.
        detailed_store: The vector store containing detailed chunks.
        llm: The language model to generate responses.

    Returns:
        The dataset with the 'response' and 'contexts' columns populated and 'contexts' renamed to 'groundtruth_contexts'.
    """
    responses = []
    retrieved_contexts = []
    
    for question in tqdm(dataset['question'], desc="Processing questions"):
        response, context_chunks = query_with_hierarchical_retrieval(question, summary_store, detailed_store, llm)
        relevant_context = [chunk.page_content for chunk in context_chunks]
        responses.append(response)
        retrieved_contexts.append(relevant_context)
    
    # Rename the existing 'contexts' column to 'groundtruth_contexts'
    dataset = dataset.rename_column('contexts', 'groundtruth_contexts')
    
    # Add the new 'response' and 'contexts' columns
    dataset = dataset.add_column('response', responses)
    dataset = dataset.add_column('contexts', retrieved_contexts)
    
    return dataset

# Populate the 'response' and 'contexts' columns in the testset_df
testset_df = populate_responses(testset_df, summary_store, detailed_store, llm)

In [None]:
testset_df[0]['response']

In [None]:
from ragas import evaluate

result = evaluate(testset_df, metrics=metrics, llm=llm, embeddings=embeddings)

In [None]:
result

In [None]:
result.to_pandas()

In [None]:
result.to_pandas

In [None]:
result.to_pandas().iloc[0].question

In [None]:
result.to_pandas().iloc[0].contexts