In [1]:
import os
from typing import List, Tuple, Dict, Any
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain.schema import AIMessage, HumanMessage, BaseRetriever
from langchain_core.documents import Document
from dotenv import load_dotenv
from pydantic import Field
import re
import pytesseract
from pdf2image import convert_from_path
import pandas as pd
import pdfplumber

In [2]:
# Load environment variables
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
    raise ValueError("OpenAI API key not found. Set it in the .env file.")

In [3]:
# Configure Tesseract OCR path 
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"

In [5]:
# Data processing functions
# Function to extract text from PDF
def extract_text_from_pdf(file_path: str) -> List[Dict[str, Any]]:
    """
    Extract and split text from the PDF file into manageable chunks.
    """
    try:
        loader = PyPDFLoader(file_path)
        docs = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(
                                                         separators=["\n\n", "\n", ". ", "Revenue", "Profit", "Expenses", "Net Income", "Table", "Figure", "Summary"],  # Split by paragraphs, then sentences, then characters
                                                        chunk_size=500,
                                                        chunk_overlap=50
                                                    )
        chunks = text_splitter.split_documents(docs)
        return [{"type": "text", "content": chunk.page_content, "metadata": chunk.metadata} for chunk in chunks]
    except Exception as e:
        print(f"Error extracting text from {file_path}: {e}")
        return []

# Function to extract tables from PDF
def extract_tables_with_pdfplumber(file_path: str) -> List[Dict[str, Any]]:
    """
    Extract tables from a PDF using PDFPlumber and return them as structured chunks.
    """
    try:
        table_chunks = []
        with pdfplumber.open(file_path) as pdf:
            for page_num, page in enumerate(pdf.pages, start=1):
                tables = page.extract_tables()
                for table in tables:
                    df = pd.DataFrame(table)
                    table_text = df.to_string(index=False, header=True)
                    metadata = {"source": file_path, "page": page_num, "type": "table"}
                    table_chunks.append({"type": "table", "content": table_text, "metadata": metadata})
        return table_chunks
    except Exception as e:
        print(f"Error extracting tables from {file_path}: {e}")
        return []
# Function to extract text from images using OCR
def extract_text_from_images(file_path: str) -> List[Dict[str, Any]]:
    """
    Extract text from images in a PDF using Tesseract OCR.
    """
    try:
        images = convert_from_path(file_path)
        image_chunks = []
        for page_num, image in enumerate(images, start=1):
            ocr_text = pytesseract.image_to_string(image).strip()
            if ocr_text:
                metadata = {"source": file_path, "page": page_num, "type": "image"}
                image_chunks.append({"type": "image", "content": ocr_text, "metadata": metadata})
        return image_chunks
    except Exception as e:
        print(f"Error extracting text from images in {file_path}: {e}")
        return []
    
# Main function to process a single PDF
def process_pdf(file_path: str) -> List[Dict[str, Any]]:
    """
    Process a PDF to extract text, tables, and OCR-based content into structured chunks.
    """
    print(f"Processing: {file_path}")
    chunks = []
    chunks.extend(extract_text_from_pdf(file_path))
    chunks.extend(extract_tables_with_pdfplumber(file_path))
    chunks.extend(extract_text_from_images(file_path))
    return chunks

# Function to process multiple PDFs in a directory
def process_multiple_pdfs(directory: str) -> List[Dict[str, Any]]:
    """
    Process all PDF files in a directory and return embedding-ready chunks.
    """
    files = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith(".pdf")]
    all_chunks = []
    for file_path in files:
        all_chunks.extend(process_pdf(file_path))
    return all_chunks

# Configure file directory
file_directory = r"C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case"
chunks = process_multiple_pdfs(file_directory)
print(f"Total chunks extracted: {len(chunks)}")

Processing: C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case\BMW_2021.pdf
Processing: C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case\BMW_2022.pdf
Processing: C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case\BMW_2023.pdf
Processing: C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case\Ford_2021.pdf
Processing: C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case\Ford_2022.pdf
Processing: C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case\Ford_2023.pdf
Processing: C:\Users\abhishekk\OneDrive - Universitetet i Agder\Welcome\Language\Langchain Project\Rag_Chatbot\data_4_case\news.pdf
Processing: C:\Users\abhishekk\OneDrive - Univers

In [6]:
# Function to create and save a FAISS vector store
def create_faiss_vector_store(chunks: List[Dict[str, Any]], faiss_index_path: str = "faiss_index") -> FAISS:
    """
    Create and save a FAISS vector store using OpenAI embeddings.
    """
    embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
    contents = [chunk["content"] for chunk in chunks]
    metadatas = [chunk["metadata"] for chunk in chunks]
    faiss_store = FAISS.from_texts(contents, embeddings, metadatas=metadatas)
    return faiss_store

vector_store = create_faiss_vector_store(chunks, faiss_index_path="pdf_faiss_index")

In [14]:
# Step 1: Classify Query
def classify_query_with_year(query: str) -> Tuple[str, List[str]]:
    """
    Classify the query as basic, comparison, or aggregation and extract year(s).
    """
    years = re.findall(r"\b\d{4}\b", query)
    query_lower = query.lower()
    comparison_keywords = ["compare", "difference", "vs", "how does", "comparison between"]
    aggregation_keywords = ["total", "sum", "aggregate", "combined", "overall"]

    if any(keyword in query_lower for keyword in comparison_keywords):
        query_type = "comparison"
    elif any(keyword in query_lower for keyword in aggregation_keywords):
        query_type = "aggregation"
    else:
        query_type = "basic"

    return query_type, years

# Step 2: Helper: Entity Extraction for Comparisons
def extract_entities_from_query(query: str) -> List[str]:
    """
    Extract entities from a query for comparison tasks.
    """
    query_lower = query.lower()
    entities = []
    if "bmw" in query_lower:
        entities.append("BMW")
    if "tesla" in query_lower:
        entities.append("Tesla")
    if "ford" in query_lower:
        entities.append("Ford")
    return entities
# Step 3: Retrieve Relevant Chunks
def retrieve_chunks(query: str, retriever: BaseRetriever, query_type: str, years: List[str]) -> List[Document]:
    """
    Retrieve document chunks dynamically based on query type.
    """
    if query_type == "basic":
        if years:
            query = f"{query} {' '.join(years)}"
        return retriever.invoke(query)

    elif query_type == "comparison":
        entities = extract_entities_from_query(query)
        chunks = []
        for entity in entities:
            for year in years:
                chunks.extend(retriever.invoke(f"{entity} {query} {year}"))
        return chunks

    elif query_type == "aggregation":
        if years:
            query = f"{query} {' '.join(years)}"
        return retriever.invoke(query)

    else:
        raise ValueError("Unsupported query type.")
    
# Step 4: find relevant documents
def advanced_retrieve(query: str, retriever: BaseRetriever) -> List[Document]:
    """
    Advanced retriever pipeline for handling complex queries with LLM integration.
    """
    query_type, years = classify_query_with_year(query)
    chunks = retrieve_chunks(query, retriever, query_type, years)
    return chunks

# Step 5: Define Advanced Conversational Retriever   
class AdvancedConversationalRetriever(BaseRetriever):
    retriever: BaseRetriever = Field(...)  # Define retriever as a field

    def _get_relevant_documents(self, query: str) -> List[Document]:
        """
        Override the retriever's _get_relevant_documents to use advanced logic.
        """
        return advanced_retrieve(query, self.retriever)

    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        """
        Asynchronous version of _get_relevant_documents.
        """
        return self.retriever.invoke({"query": query})  # Use invoke


In [12]:
# Initialize Retriever
def build_retriever(vector_store: FAISS, top_k: int = 15) -> BaseRetriever:
    """
    Build a retriever from the vector store.
    """
    return vector_store.as_retriever(search_type="similarity", search_kwargs={"k": top_k})

retriever = build_retriever(vector_store)
# Initialize the LLM
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=5
)

#Initialize the Chatprompt
chat_prompt = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template(
        "You are a helpful assistant. Use the following context to answer the user's question. "
        "If the answer is not in the context, say 'I don't have an exact answer, can I help with something else?'"
    ),
    HumanMessagePromptTemplate.from_template(
        "Context:\n{context}\n\nQuestion:\n{question}"
    )
])
# initialize the advanced_retriever
advanced_retriever = AdvancedConversationalRetriever(retriever=retriever)
# define retrieval chain
retrieval_chain = ConversationalRetrievalChain.from_llm(
    llm=llm,
    retriever=advanced_retriever,
    return_source_documents=True,
    combine_docs_chain_kwargs={"prompt": chat_prompt}
)

In [15]:
# Final Step: Process User Query
chat_history = [] # List to store chat history
while True:
    query = input("Enter your question please: ")
    print("User:", query)
    if query.lower() == "exit":
        print("Goodbye!")
        break

    response = retrieval_chain.invoke({"question": query, "chat_history": chat_history})
    print(f"Bot: {response['answer']}")

    chat_history.append(HumanMessage(content=query))
    chat_history.append(AIMessage(content=response["answer"]))

User:  What were Tesla's profit numbers for 2023
Bot: In 2023, Tesla's net income was $14,974 million, and the comprehensive income was $15,192 million.
User: compare tesla profit fro 2023 and 2022
Bot: In 2023, Tesla's net income attributable to common stockholders was $15.00 billion, which represents a favorable change of $2.44 billion compared to the net income of $12.56 billion in 2022.
User: exit
Goodbye!
