In [None]:
import os
import asyncio
import sys
from dotenv import load_dotenv
from langchain.docstore.document import Document
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains.summarize.chain import load_summarize_chain
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_cohere import CohereEmbeddings
from google.api_core import exceptions as google_exceptions
from typing import Coroutine, Any, Tuple, Type
import random
from cohere import TooManyRequestsError

load_dotenv()

base_embeddings = CohereEmbeddings(
    model="embed-english-light-v3.0"
)

llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0,
    timeout=None,
    max_retries=2,
    # other params...
)

In [7]:
path = "data/Understanding_Climate_Change.pdf"

In [5]:
async def exponential_backoff(attempt):
    """
    Implements exponential backoff with a jitter.
    """
    wait_time = (2 ** attempt) + random.uniform(0, 1)
    print(f"API limit hit or temporary error. Retrying in {wait_time:.2f} seconds...")
    await asyncio.sleep(wait_time)

async def retry_with_exponential_backoff(
    coroutine: Coroutine[Any, Any, Any], 
    errors_to_catch: Tuple[Type[Exception], ...] | Type[Exception],
    max_retries: int = 5
):
    """
    Retries a coroutine with exponential backoff upon encountering specific exceptions.
    
    Args:
        coroutine: The coroutine to be executed.
        errors_to_catch: A single exception class or a tuple of exception classes to catch.
        max_retries: The maximum number of retry attempts.
        
    Returns:
        The result of the coroutine if successful.
        
    Raises:
        The last encountered exception if all retry attempts fail.
    """
    for attempt in range(max_retries):
        try:
            return await coroutine
        except errors_to_catch as e:
            print(f"Attempt {attempt + 1}/{max_retries} failed with error: {e}")
            
            if attempt == max_retries - 1:
                print("Max retries reached. Raising the last exception.")
                raise e

            await exponential_backoff(attempt)

    raise Exception("Max retries reached without success")


In [6]:
async def encode_pdf_hierarchical(path, chunk_size=1000, chunk_overlap=200, is_string=False):
    """
    Asynchronously encodes a PDF book into a hierarchical vector store using OpenAI embeddings.
    Includes rate limit handling with exponential backoff.
    
    Args:
        path: The path to the PDF file.
        chunk_size: The desired size of each text chunk.
        chunk_overlap: The amount of overlap between consecutive chunks.
        
    Returns:
        A tuple containing two FAISS vector stores:
        1. Document-level summaries
        2. Detailed chunks
    """
    
    # Load PDF documents
    if not is_string:
        loader = PyPDFLoader(path)
        documents = await asyncio.to_thread(loader.load)
    else:
        text_splitter = RecursiveCharacterTextSplitter(
            # Set a really small chunk size, just to show.
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            is_separator_regex=False,
        )
        documents = text_splitter.create_documents([path])


    # Create document-level summaries
    summary_llm = llm
    summary_chain = load_summarize_chain(summary_llm, chain_type="map_reduce")
    
    async def summarize_doc(doc):
        """
        Summarizes a single document with rate limit handling.
        
        Args:
            doc: The document to be summarized.
            
        Returns:
            A summarized Document object.
        """
        # Retry the summarization with exponential backoff
        summary_output = await retry_with_exponential_backoff(
            summary_chain.ainvoke([doc]),
            errors_to_catch=google_exceptions.ResourceExhausted
        )
        summary = summary_output['output_text']
        return Document(
            page_content=summary,
            metadata={"source": path, "page": doc.metadata["page"], "summary": True}
        )

    # Process documents in smaller batches to avoid rate limits
    batch_size = 5  # Adjust this based on your rate limits
    summaries = []
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        batch_summaries = await asyncio.gather(*[summarize_doc(doc) for doc in batch])
        summaries.extend(batch_summaries)
        await asyncio.sleep(1)  # Short pause between batches

    # Split documents into detailed chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
    )
    detailed_chunks = await asyncio.to_thread(text_splitter.split_documents, documents)

    # Update metadata for detailed chunks
    for i, chunk in enumerate(detailed_chunks):
        chunk.metadata.update({
            "chunk_id": i,
            "summary": False,
            "page": int(chunk.metadata.get("page", 0))
        })

    # Create embeddings
    embeddings = base_embeddings

    # Create vector stores asynchronously with rate limit handling
    async def create_vectorstore(docs):
        """
        Creates a vector store from a list of documents with rate limit handling.
        
        Args:
            docs: The list of documents to be embedded.
            
        Returns:
            A FAISS vector store containing the embedded documents.
        """
        return await retry_with_exponential_backoff(
            asyncio.to_thread(FAISS.from_documents, docs, embeddings),
            errors_to_catch=TooManyRequestsError
        )

    # Generate vector stores for summaries and detailed chunks concurrently
    summary_vectorstore, detailed_vectorstore = await asyncio.gather(
        create_vectorstore(summaries),
        create_vectorstore(detailed_chunks)
    )

    return summary_vectorstore, detailed_vectorstore

In [8]:
summary_store, detailed_store = await encode_pdf_hierarchical(path)
summary_store.save_local("../vector_stores/summary_store")
detailed_store.save_local("../vector_stores/detailed_store")

Retrying langchain_google_genai.chat_models._achat_with_retry.<locals>._achat_with_retry in 2.0 seconds as it raised ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/usage?tab=rate-limit.
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 10
Please retry in 1.997415708s. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-2.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 10
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  sec

In [None]:
summary_store = FAISS.load_local("../vector_stores/summary_store", base_embeddings, allow_dangerous_deserialization=True)
detailed_store = FAISS.load_local("../vector_stores/detailed_store", base_embeddings, allow_dangerous_deserialization=True)

In [9]:
def retrieve_hierarchical(query, summary_vectorstore, detailed_vectorstore, k_summaries=3, k_chunks=5):
    """
    Performs a hierarchical retrieval using the query.

    Args:
        query: The search query.
        summary_vectorstore: The vector store containing document summaries.
        detailed_vectorstore: The vector store containing detailed chunks.
        k_summaries: The number of top summaries to retrieve.
        k_chunks: The number of detailed chunks to retrieve per summary.

    Returns:
        A list of relevant detailed chunks.
    """
    
    # Retrieve top summaries
    top_summaries = summary_vectorstore.similarity_search(query, k=k_summaries)
    
    relevant_chunks = []
    for summary in top_summaries:
        # For each summary, retrieve relevant detailed chunks
        page_number = summary.metadata["page"]
        page_filter = lambda metadata: metadata["page"] == page_number
        page_chunks = detailed_vectorstore.similarity_search(
            query, 
            k=k_chunks, 
            filter=page_filter
        )
        relevant_chunks.extend(page_chunks)
    
    return relevant_chunks

In [10]:
query = "What is the greenhouse effect?"
results = retrieve_hierarchical(query, summary_store, detailed_store)

# Print results
for chunk in results:
    print(f"Page: {chunk.metadata['page']}")
    print(f"Content: {chunk.page_content}...")  # Print first 100 characters
    print("---")

Page: 3
Content: The Arctic is warming at more than twice the global average rate, leading to significant ice 
loss. Antarctic ice sheets are also losing mass, contributing to sea level rise. This melting 
affects global ocean currents and weather patterns. 
Glacial Retreat 
Glaciers around the world are retreating, affecting water supplies for millions of people. 
Regions dependent on glacial meltwater, such as the Himalayas and the Andes, face 
particular risks. Glacial melt also impacts hydropower generation and agriculture. 
Coastal Erosion 
Rising sea levels and increased storm surges are accelerating coastal erosion, threatening 
homes, infrastructure, and ecosystems. Low-lying islands and coastal regions are especially 
vulnerable. Coastal communities must invest in adaptation measures like sea walls and 
managed retreats. 
Extreme Weather Events 
Climate change is linked to an increase in the frequency and severity of extreme weather...
---
Page: 3
Content: managed retreats. 
E