# Text and Tables RAG on Infosys 

In [1]:
# ! pip install langchain langchain-chroma "unstructured[all-docs]" pydantic lxml langchainhub

In [2]:
path = "./annual_reports/"

In [3]:
from typing import Any
from pydantic import BaseModel
from unstructured.partition.pdf import partition_pdf
from tqdm import tqdm
import fitz  # PyMuPDF

# Get total pages first
doc = fitz.open(path + "Hindunilvr.pdf")
total_pages = len(doc)
doc.close()

# Create a callback function for the progress bar with a better format
pbar = tqdm(
    total=total_pages,
    desc="Processing PDF",
    bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} pages [{elapsed}<{remaining}, {rate_fmt}]'
)

def progress_callback(page_num: int, total: int):
    pbar.update(1)

try:
    # Get elements
    raw_pdf_elements = partition_pdf(
        filename=path + "Hindunilvr.pdf",
        # Unstructured first finds embedded image blocks
        extract_images_in_pdf=False,
        # Use layout model (YOLOX) to get bounding boxes (for tables) and find titles
        # Titles are any sub-section of the document
        infer_table_structure=True,
        # Post processing to aggregate text once we have the title
        chunking_strategy="by_title",
        # Chunking params to aggregate text blocks
        # Attempt to create a new chunk 3800 chars
        # Attempt to keep chunks > 2000 chars
        max_characters=4000,
        new_after_n_chars=3800,
        combine_text_under_n_chars=2000,
        image_output_dir_path=path,
        progress_bar=progress_callback,
        include_page_breaks=True
    )
finally:
    pbar.close()

  from .autonotebook import tqdm as notebook_tqdm
Processing PDF:   0%|          | 0/229 pages [31:21<?, ?it/s]


In [4]:

# Create a dictionary to store counts of each type
category_counts = {}

for element in raw_pdf_elements:
    category = str(type(element))
    if category in category_counts:
        category_counts[category] += 1
    else:
        category_counts[category] = 1

# Unique_categories will have unique elements
unique_categories = set(category_counts.keys())
category_counts

{"<class 'unstructured.documents.elements.CompositeElement'>": 519,
 "<class 'unstructured.documents.elements.TableChunk'>": 7,
 "<class 'unstructured.documents.elements.Table'>": 1}

In [5]:
class Element(BaseModel):
    type: str
    text: Any


# Categorize by type
categorized_elements = []
for element in raw_pdf_elements:
    if "unstructured.documents.elements.Table" in str(type(element)):
        categorized_elements.append(Element(type="table", text=str(element)))
    elif "unstructured.documents.elements.CompositeElement" in str(type(element)):
        categorized_elements.append(Element(type="text", text=str(element)))

# Tables
table_elements = [e for e in categorized_elements if e.type == "table"]
print(len(table_elements))

# Text
text_elements = [e for e in categorized_elements if e.type == "text"]
print(len(text_elements))

8
519


# Multivector retrievar

### Summaries

In [7]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_groq import ChatGroq
import os
os.environ["GROQ_API_KEY"] = "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"


In [8]:
# Prompt
prompt_text = """You are an financial assistant tasked with summarizing tables and text. 
Give a concise summary of the table or text. Table or text chunk: {element} """
prompt = ChatPromptTemplate.from_template(prompt_text)

# Summary chain
model = ChatGroq(temperature=0, model="llama3-groq-8b-8192-tool-use-preview")
summarize_chain = {"element": lambda x: x} | prompt | model | StrOutputParser()

In [9]:
# Apply to tables
tables = [i.text for i in table_elements]
import asyncio
import time
from collections import deque
from typing import List
import math
from langchain_groq import ChatGroq

class GroqBatcher:
    def __init__(self, 
                 batch_size: int = 5,
                 requests_per_minute: int = 25,  # Conservative limit under 30
                 tokens_per_minute: int = 14000, # Conservative under 15000
                 tokens_per_day: int = 450000,   # Conservative under 500000
                 base_delay: float = 2.0):
        self.batch_size = batch_size
        self.requests_per_minute = requests_per_minute
        self.tokens_per_minute = tokens_per_minute
        self.tokens_per_day = tokens_per_day
        self.base_delay = base_delay
        self.request_times = deque()
        self.token_usage = deque()
        self.daily_token_usage = 0
        self.reset_time = time.time()

    async def process_batch(self, items: List[str], summarize_chain) -> List[str]:
        results = []
        
        for i in range(0, len(items), self.batch_size):
            batch = items[i:i + self.batch_size]
            batch_results = await self._process_items(batch, summarize_chain)
            results.extend(batch_results)
            
            # Add delay between batches
            await asyncio.sleep(self.base_delay)
            
        return results

    async def _process_items(self, batch: List[str], summarize_chain) -> List[str]:
        results = []
        retry_count = 0
        
        for item in batch:
            try:
                now = time.time()
                
                # Reset daily counts if 24 hours passed
                if now - self.reset_time > 86400:
                    self.daily_token_usage = 0
                    self.reset_time = now
                
                # Clean up old records
                while self.request_times and now - self.request_times[0] > 60:
                    self.request_times.popleft()
                    self.token_usage.popleft()

                # Check rate limits
                if len(self.request_times) >= self.requests_per_minute:
                    wait_time = 60 - (now - self.request_times[0])
                    await asyncio.sleep(max(0, wait_time))

                if sum(self.token_usage) >= self.tokens_per_minute:
                    wait_time = 60 - (now - self.request_times[0])
                    await asyncio.sleep(max(0, wait_time))

                # Process item
                result = await summarize_chain.ainvoke(item)
                
                # Update tracking
                self.request_times.append(time.time())
                tokens_used = len(item.split()) + len(str(result).split())
                self.token_usage.append(tokens_used)
                self.daily_token_usage += tokens_used
                
                results.append(result)
                retry_count = 0  # Reset retry count on success

            except Exception as e:
                if "rate_limit_exceeded" in str(e):
                    retry_count += 1
                    delay = self.base_delay * math.exp(retry_count)
                    await asyncio.sleep(delay)
                    continue
                raise e

        return results

# Usage
async def process_tables(tables: List[str], summarize_chain) -> List[str]:
    batcher = GroqBatcher(
        batch_size=5,
        requests_per_minute=25
    )
    return await batcher.process_batch(tables, summarize_chain)

# Execute
import nest_asyncio
nest_asyncio.apply()

# Replace your original code with:
table_summaries = asyncio.get_event_loop().run_until_complete(
    process_tables(tables, summarize_chain)
)

In [11]:
# import asyncio
# import time
# from collections import deque
# from typing import List
# import math
# from langchain_groq import ChatGroq

# class TextBatchProcessor:
#     def __init__(self, 
#                  batch_size: int = 5,
#                  requests_per_minute: int = 25,  # Under 30 limit
#                  tokens_per_minute: int = 14000, # Under 15000 limit
#                  tokens_per_day: int = 450000,   # Under 500000 limit
#                  max_concurrency: int = 5,
#                  base_delay: float = 2.0):
#         self.batch_size = batch_size
#         self.requests_per_minute = requests_per_minute
#         self.tokens_per_minute = tokens_per_minute
#         self.tokens_per_day = tokens_per_day
#         self.max_concurrency = max_concurrency
#         self.base_delay = base_delay
#         self.request_times = deque()
#         self.token_usage = deque()
#         self.daily_token_usage = 0
#         self.reset_time = time.time()

#     async def process_texts(self, texts: List[str], summarize_chain) -> List[str]:
#         summaries = []
#         semaphore = asyncio.Semaphore(self.max_concurrency)
        
#         async def process_with_rate_limit(text: str) -> str:
#             async with semaphore:
#                 try:
#                     now = time.time()
                    
#                     # Reset daily tracking if needed
#                     if now - self.reset_time > 86400:
#                         self.daily_token_usage = 0
#                         self.reset_time = now
                    
#                     # Clean up old records
#                     while self.request_times and now - self.request_times[0] > 60:
#                         self.request_times.popleft()
#                         self.token_usage.popleft()

#                     # Check rate limits
#                     if len(self.request_times) >= self.requests_per_minute:
#                         wait_time = 60 - (now - self.request_times[0])
#                         await asyncio.sleep(max(0, wait_time))

#                     if sum(self.token_usage) >= self.tokens_per_minute:
#                         wait_time = 60 - (now - self.request_times[0])
#                         await asyncio.sleep(max(0, wait_time))

#                     # Process text
#                     summary = await summarize_chain.ainvoke(text)
                    
#                     # Update tracking
#                     self.request_times.append(time.time())
#                     tokens_used = len(text.split()) + len(str(summary).split())
#                     self.token_usage.append(tokens_used)
#                     self.daily_token_usage += tokens_used
                    
#                     return summary

#                 except Exception as e:
#                     if "rate_limit_exceeded" in str(e):
#                         # Parse retry delay handling both seconds and milliseconds
#                         retry_str = str(e).split("try again in ")[-1].split(" ")[0]
#                         if retry_str.endswith('ms'):
#                             retry_after = float(retry_str.rstrip('ms')) / 1000
#                         else:
#                             retry_after = float(retry_str.rstrip('s'))
#                         await asyncio.sleep(retry_after + 0.5)
#                         return await process_with_rate_limit(text)
#                     raise e
#         for i in range(0, len(texts), self.batch_size):
#             batch = texts[i:i + self.batch_size]
#             batch_tasks = [process_with_rate_limit(text) for text in batch]
#             batch_results = await asyncio.gather(*batch_tasks)
#             summaries.extend(batch_results)
#             await asyncio.sleep(self.base_delay)

#         return summaries

# # Usage
# async def process_text_summaries(texts: List[str], summarize_chain) -> List[str]:
#     processor = TextBatchProcessor()
#     return await processor.process_texts(texts, summarize_chain)

# # Initialize and run
# import nest_asyncio
# nest_asyncio.apply()

# # Replace original code with:
# texts = [i.text for i in text_elements]
# text_summaries = asyncio.get_event_loop().run_until_complete(
#     process_text_summaries(texts, summarize_chain)
# )

In [12]:
texts = [i.text for i in text_elements]
text_summaries = texts


### Add to Vectorstore

In [None]:
! pip install langchain_huggingface

In [20]:
import gc
import torch
gc.collect()
torch.cuda.empty_cache()

In [17]:
import uuid

from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_huggingface import HuggingFaceEmbeddings
import chromadb
from chromadb.utils import embedding_functions

embeddings = HuggingFaceEmbeddings(
    model_name="all-mpnet-base-v2",
    model_kwargs={'trust_remote_code': True}
)
persistent_client = chromadb.PersistentClient("./database")
collection = persistent_client.get_or_create_collection("rag_on_infosys_report")

# The vectorstore to use to index the child chunks
vectorstore = Chroma(client=persistent_client, collection_name="summaries", embedding_function=embeddings)

# The storage layer for the parent documents
store = InMemoryStore()
id_key = "doc_id"

# The retriever (empty to start)
retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    docstore=store,
    id_key=id_key,
)

# Add texts
doc_ids = [str(uuid.uuid4()) for _ in texts]
summary_texts = [
    Document(page_content=s, metadata={id_key: doc_ids[i]})
    for i, s in enumerate(text_summaries)
]
retriever.vectorstore.add_documents(summary_texts)
retriever.docstore.mset(list(zip(doc_ids, texts)))

# Add tables
table_ids = [str(uuid.uuid4()) for _ in tables]
summary_tables = [
    Document(page_content=s, metadata={id_key: table_ids[i]})
    for i, s in enumerate(table_summaries)
]
retriever.vectorstore.add_documents(summary_tables)
retriever.docstore.mset(list(zip(table_ids, tables)))

### RAG

In [18]:
from langchain_core.runnables import RunnablePassthrough

# Prompt template
template = """You are a financial agent whose role is to get key insights from the annual report of a company realted to the User's Question. You must be very detailed in your response and must stick to the resources provided (Context here are the resources provided), now answer the question based only on the following context, which can include text and tables:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# LLM
model = ChatGroq(temperature=0, model="llama-3.3-70b-versatile")

# RAG pipeline
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

In [19]:
result = chain.invoke("""You are a financial expert and market analyst. Analyze the annual financial report 
            of the company and provide a precise summary of the company's performance and investment potential.
            
            Retrieved Context:
            {context}
            
            If the context is limited or missing, focus on providing general analysis based on the available information.""")
print(result)

Given the context provided, it appears to be a jumbled collection of characters, symbols, and some numerical values, which does not resemble a standard annual financial report of a company. The text lacks coherence, and there are no clear financial statements, such as balance sheets, income statements, or cash flow statements, that are typically found in an annual report.

However, as a financial expert and market analyst, I can provide a general analysis based on the assumption that the context might contain some hidden or encoded financial information. 

1. **Lack of Transparency**: The primary concern with the provided context is its lack of transparency. Investors and analysts rely heavily on clear, concise, and detailed financial information to assess a company's performance and potential. The current state of the context does not allow for such an assessment.

2. **Risk Assessment**: Without clear financial data, it's challenging to assess the company's risk profile. Typically, i