In [None]:
# Cell 1: Importing Required Libraries
# This cell imports all necessary libraries for PDF processing, data extraction, MongoDB handling, and Streamlit.
import pdfplumber
import pandas as pd
from pymongo import MongoClient
import re
from typing import Dict, List, Any
from datetime import datetime
from bson import ObjectId
import numpy as np
import streamlit as st
import os

In [None]:
# Cell 2: Table Data Extraction Class
# Defines a class to extract and process table data from PDFs.
class TableDataExtractor:
    def __init__(self):
        self.numerical_pattern = re.compile(r'[-+]?\d*\.?\d+')

    def extract_value(self, cell: str) -> Any:
        # Cleans and extracts numerical values from a cell.
        if cell is None or str(cell).strip() == '':
            return None
        value = str(cell).strip()
        value = re.sub(r'[\u20b9$,]', '', value)
        try:
            if '.' in value:
                return float(value)
            elif value.replace('-', '').isdigit():
                return int(value)
            return value
        except ValueError:
            return value

    def process_table(self, table: List[List[str]]) -> List[Dict[str, Any]]:
        # Processes the table data and returns a list of dictionaries.
        if not table or len(table) < 2:
            return []
        headers = []
        seen = set()
        for col in table[0]:
            header = str(col).strip().lower().replace(' ', '_') if col else 'unnamed'
            base_header = header
            counter = 1
            while header in seen:
                header = f"{base_header}_{counter}"
                counter += 1
            seen.add(header)
            headers.append(header)
        processed_rows = []
        for row in table[1:]:
            row_dict = {}
            has_data = False
            for idx, cell in enumerate(row):
                if idx < len(headers):
                    value = self.extract_value(cell)
                    if value is not None:
                        row_dict[headers[idx]] = value
                        has_data = True
            if has_data:
                processed_rows.append(row_dict)
        return processed_rows

In [None]:
# Cell 3: PDF Data Extraction Class
# Handles table and text extraction from PDF files.
class PDFDataExtractor:
    def __init__(self):
        self.table_extractor = TableDataExtractor()
        self.financial_keywords = [
            'revenue', 'profit', 'assets', 'liabilities', 'equity',
            'expenses', 'cash', 'tax', 'income', 'balance', 'total',
            'net', 'gross', 'operating', 'current', 'consolidated',
            'statement', 'notes', 'financial'
        ]

    def extract_tables(self, pdf_path: str) -> List[Dict]:
        # Extracts table data from the PDF using pdfplumber.
        tables_data = []
        with pdfplumber.open(pdf_path) as pdf:
            for page_num, page in enumerate(pdf.pages, 1):
                table_settings = [
                    {'vertical_strategy': 'text', 'horizontal_strategy': 'text'},
                    {'vertical_strategy': 'lines', 'horizontal_strategy': 'lines'},
                ]
                for settings in table_settings:
                    try:
                        tables = page.extract_tables(table_settings=settings)
                        for table in tables:
                            if table and len(table) > 1:
                                processed_rows = self.table_extractor.process_table(table)
                                if processed_rows:
                                    table_data = {
                                        '_id': ObjectId(),
                                        'page_number': page_num,
                                        'data': processed_rows,
                                        'extraction_method': str(settings),
                                        'type': 'table',
                                        'extracted_at': datetime.now()
                                    }
                                    tables_data.append(table_data)
                    except Exception as e:
                        print(f"Warning: Error on page {page_num} with settings {settings}: {str(e)}")
        return tables_data

    def extract_text(self, pdf_path: str) -> List[Dict]:
        # Extracts text sections from the PDF and identifies financial content.
        text_data = []
        with pdfplumber.open(pdf_path) as pdf:
            for page_num, page in enumerate(pdf.pages, 1):
                text = page.extract_text()
                if text:
                    sections = self.split_into_sections(text)
                    for section in sections:
                        if self.is_relevant_financial_text(section):
                            embedding = self.tf_idf_embedding(section)
                            para_dict = {
                                '_id': ObjectId(),
                                'page_number': page_num,
                                'content': section.strip(),
                                'embedding': embedding,
                                'type': 'text',
                                'extracted_at': datetime.now()
                            }
                            text_data.append(para_dict)
        return text_data

    def tf_idf_embedding(self, text: str) -> List[float]:
        # Creates a TF-IDF vector for a given text section.
        words = text.split()
        word_set = list(set(words))
        tf_idf_vector = [words.count(word) / len(words) for word in word_set]
        return tf_idf_vector[:300] + [0] * (300 - len(tf_idf_vector))

    def split_into_sections(self, text: str) -> List[str]:
        # Splits text into smaller sections for easier processing.
        sections = re.split(r'\n\s*\n', text)
        final_sections = []
        for section in sections:
            subsections = re.split(r'(?:\d+\.|[A-Z]\.|\u2022|\*)\s+', section)
            final_sections.extend([s.strip() for s in subsections if s.strip()])
        return final_sections

    def is_relevant_financial_text(self, text: str) -> bool:
        # Checks if a text section contains financial keywords or numerical data.
        text_lower = text.lower()
        if any(keyword in text_lower for keyword in self.financial_keywords):
            return True
        if re.search(r'\d+\.?\d*', text):
            return True
        return False

In [None]:
# Cell 4: MongoDB Handler Class
# Handles storing and retrieving data from MongoDB.
class MongoDBHandler:
    def __init__(self, db_name: str = 'financial_db'):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client[db_name]
        self.tables_collection = self.db['tables']
        self.text_collection = self.db['text']
        self.tables_collection.create_index([('page_number', 1)])
        self.text_collection.create_index([('page_number', 1)])

    def store_data(self, tables_data: List[Dict], text_data: List[Dict]):
        # Stores extracted tables and text in MongoDB collections.
        self.tables_collection.delete_many({})
        self.text_collection.delete_many({})
        if tables_data:
            self.tables_collection.insert_many(tables_data, ordered=False)
        if text_data:
            self.text_collection.insert_many(text_data, ordered=False)

    def retrieve_similar_text(self, query: str, top_k: int = 5) -> List[Dict]:
        # Retrieves text documents similar to the query using cosine similarity.
        query_embedding = np.array(PDFDataExtractor().tf_idf_embedding(query))
        cursor = self.text_collection.find()
        results = []
        for doc in cursor:
            doc_embedding = np.array(doc['embedding'])
            similarity = np.dot(query_embedding, doc_embedding) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(doc_embedding)
            )
            results.append((similarity, doc))
        results.sort(reverse=True, key=lambda x: x[0])
        return [doc for _, doc in results[:top_k]]


In [None]:
# Cell 5: Streamlit App
# Builds a user interface for PDF upload, processing, and querying.
st.title("Financial Data QA Bot")
st.sidebar.header("Upload and Query")

uploaded_file = st.sidebar.file_uploader("Upload a P&L PDF", type=["pdf"])
query = st.text_input("Enter your financial query", placeholder="What is the total revenue?")

if uploaded_file:
    pdf_path = os.path.join("temp", uploaded_file.name)
    with open(pdf_path, "wb") as f:
        f.write(uploaded_file.read())

    st.sidebar.success("File uploaded successfully!")

    if st.sidebar.button("Process PDF"):
        pdf_extractor = PDFDataExtractor()
        mongo_handler = MongoDBHandler()
        tables_data = pdf_extractor.extract_tables(pdf_path)
        text_data = pdf_extractor.extract_text(pdf_path)

        mongo_handler.store_data(tables_data, text_data)
        st.write(f"Extracted {len(tables_data)} tables and {len(text_data)} text sections.")

if query:
    mongo_handler = MongoDBHandler()
    results = mongo_handler.retrieve_similar_text(query, top_k=5)
    if results:
        st.write("Relevant Results:")
        for result in results:
            st.write(f"Page {result['page_number']}: {result['content']}")
    else:
        st.warning("No relevant results found.")