## Income Statement validation

In [1]:
import os
import re
import json
import fitz  # PyMuPDF
import time
import pandas as pd
import ollama
from PyPDF2 import PdfReader, PdfWriter
from docling.document_converter import DocumentConverter


In [2]:


#  Folder where PDFs are located
pdf_folder_path = "C:\\Users\\ramum\\pdfs"

#  Step 1: Find income-statement-related pages
def find_income_statement_pages(pdf_path, keywords=["profit after tax", "total income", "total expenses"]):
    doc = fitz.open(pdf_path)
    matched_pages = []
    for page_num in range(len(doc)):
        text = doc.load_page(page_num).get_text().lower()
        hits = sum(1 for keyword in keywords if keyword in text)
        if hits >= 3:
            matched_pages.append(page_num)
    return matched_pages

#  Step 2: Extract selected pages into a unique filtered PDF
def extract_pages_to_temp_pdf(input_pdf, selected_pages):
    reader = PdfReader(input_pdf)
    writer = PdfWriter()
    for page_index in selected_pages:
        writer.add_page(reader.pages[page_index])
    temp_filename = os.path.splitext(os.path.basename(input_pdf))[0] + "_filtered_income.pdf"
    print(selected_pages)
    temp_pdf_path = os.path.join(os.path.dirname(input_pdf), temp_filename)
    with open(temp_pdf_path, "wb") as f:
        writer.write(f)
    return temp_pdf_path

#  Extract a number from markdown column by index
def extract_number_from_column(line, column_index):
    try:
        parts = [cell.strip() for cell in line.split("|") if cell.strip()]
        return float(parts[column_index].replace(",", ""))
    except (IndexError, ValueError):
        return 0.0

#  Extract markdown tables with income statement keywords
def extract_income_tables_from_markdown(markdown_text):
    tables, current, inside = [], [], False
    for line in markdown_text.splitlines():
        line = line.strip()
        if line.startswith("|") and line.endswith("|"):
            inside = True
            current.append(line)
        elif inside and not line:
            inside = False
            joined = " ".join(current).lower()
            if any(k in joined for k in ["profit after tax", "total income", "total expenses"]):
                tables.append(current)
            current = []
    return tables

#  Parse income statement values
def parse_income_statement_tables(tables, submitted_net_income=None):
    parsed = []
    for table in tables:
        headers = [h.strip() for h in table[0].split("|") if h.strip()]
        latest_index = next((i for i, h in enumerate(headers) if re.match(r"Q\d\s*FY\d+|FY\d+Q\d", h)), 0)

        entry = {
            "quarter": headers[latest_index] if latest_index < len(headers) else "LatestQuarter",
            "revenues": 0.0,
            "expenses": 0.0,
            "netIncome": 0.0,
            "grossProfit": 0.0,
            "profitMarginPercent": 0.0,
            "submittedNetIncome": submitted_net_income,
            "calculatedNetIncome": 0.0,
            "isValid": None
        }

        for line in table:
            if "Total Income" in line and "operations" not in line:
                entry["revenues"] = extract_number_from_column(line, latest_index)
            elif "Total Expenses" in line:
                entry["expenses"] = extract_number_from_column(line, latest_index)
            elif "Profit After Tax" in line and "Margin" not in line:
                entry["netIncome"] = extract_number_from_column(line, latest_index)

        entry["grossProfit"] = entry["revenues"] - entry["expenses"]
        if entry["revenues"] > 0:
            entry["profitMarginPercent"] = round((entry["netIncome"] / entry["revenues"]) * 100, 2)

        if submitted_net_income is not None:
            entry["calculatedNetIncome"] = entry["netIncome"]
            entry["isValid"] = submitted_net_income == entry["netIncome"]

        if entry["netIncome"] > 0:
            parsed.append(entry)
    return parsed

#  Run Docling and validate
def extract_and_validate_income_statements(pdf_path, submitted_net_income=None):
    start = time.time()
    matched_pages = find_income_statement_pages(pdf_path)

    if not matched_pages:
        print(f" No income-statement pages found in {pdf_path}")
        return []

    filtered_pdf_path = extract_pages_to_temp_pdf(pdf_path, matched_pages)

    converter = DocumentConverter()
    result = converter.convert(filtered_pdf_path)
    markdown = result.document.export_to_markdown()

    # Print number of pages in the filtered PDF
    print(f" Pages in filtered PDF: {len(result.document.pages)}")

    tables = extract_income_tables_from_markdown(markdown)
    parsed_data = parse_income_statement_tables(tables, submitted_net_income)

    for entry in parsed_data:
        entry["fileName"] = os.path.basename(pdf_path)
        entry["filteredPDF"] = os.path.basename(filtered_pdf_path)
        entry["pageCount"] = len(result.document.pages)   # Add to output JSON
        entry["processingTimeSeconds"] = round(time.time() - start, 2)

    return parsed_data


#  Handle a list of validation requests
def validate_uploaded_pdfs(validation_requests):
    results = []
    for req in validation_requests:
        file_name = req["fileName"]
        submitted_income = req["submittedNetIncome"]
        full_path = os.path.join(pdf_folder_path, file_name)
        print(f" Processing: {file_name}")
        parsed = extract_and_validate_income_statements(full_path, submitted_net_income=submitted_income)
        results.extend(parsed)
    return results

# Inputs
validation_requests = [
    {"fileName": "Q3FY25 Earnings Presentation V16.pdf", "submittedNetIncome": 3834},
    {"fileName": "INVESTOR_PRESENTATION_MAR25.pdf", "submittedNetIncome": 2650}
]

# Entry point
if __name__ == "__main__":
    output = validate_uploaded_pdfs(validation_requests)
    print("\n Final JSON Result:\n")
    print(json.dumps(output, indent=2))
#7 minutes 4 pages


 Processing: Q3FY25 Earnings Presentation V16.pdf
[6, 13]




 Pages in filtered PDF: 2
 Processing: INVESTOR_PRESENTATION_MAR25.pdf
[6, 13]
 Pages in filtered PDF: 2

 Final JSON Result:

[
  {
    "quarter": "Q3 FY25",
    "revenues": 4807.0,
    "expenses": 1084.0,
    "netIncome": 3834.0,
    "grossProfit": 3723.0,
    "profitMarginPercent": 79.76,
    "submittedNetIncome": 3834,
    "calculatedNetIncome": 3834.0,
    "isValid": true,
    "fileName": "Q3FY25 Earnings Presentation V16.pdf",
    "filteredPDF": "Q3FY25 Earnings Presentation V16_filtered_income.pdf",
    "pageCount": 2,
    "processingTimeSeconds": 222.54
  },
  {
    "quarter": "Q3 FY25",
    "revenues": 4289.0,
    "expenses": 1241.0,
    "netIncome": 2291.0,
    "grossProfit": 3048.0,
    "profitMarginPercent": 53.42,
    "submittedNetIncome": 3834,
    "calculatedNetIncome": 2291.0,
    "isValid": false,
    "fileName": "Q3FY25 Earnings Presentation V16.pdf",
    "filteredPDF": "Q3FY25 Earnings Presentation V16_filtered_income.pdf",
    "pageCount": 2,
    "processingTimeSeco

## Invoice Statement validation

In [3]:
# Folder path for PDFs
pdf_folder_path = "C:\\Users\\ramum\\pdfs\\Invoice_pdf"

#  Extract numeric value from string (supports ₹, commas, decimals)
def extract_number(line):
    match = re.search(r"(?:Rs\.?|₹)?\s*([\d,]+\.\d{2}|\d{1,3}(?:,\d{3})+)", line)
    if match:
        try:
            return float(match.group(1).replace(",", ""))
        except:
            return 0.0
    return 0.0

# Main logic to extract financial values from markdown
def extract_invoice_totals_from_markdown(markdown):
    totals = {
        "taxableAmount": 0.0,
        "taxAmount": 0.0,
        "totalAmount": 0.0,
        "effectiveTaxRate": 0.0,
        "lineItemSum": 0.0,
        "lineItemDiscrepancy": 0.0,
        "quantity": 0.0
    }

    probable_total_keywords = [
        "net amount", "grand total", "total amount", "total", "invoice total", "amount due", "balance due"
    ]
    probable_tax_keywords = ["tax amount", "gst", "cgst", "sgst", "igst"]
    probable_taxable_keywords = ["taxable amount", "taxable value"]
    probable_quantity_keywords = ["qty", "quantity", "units"]

    all_numbers = []

    for line in markdown.splitlines():
        lower = line.lower().strip()
        clean = line.strip()

        # Extract Taxable
        if any(k in lower for k in probable_taxable_keywords):
            totals["taxableAmount"] = extract_number(clean)

        # Extract Tax
        elif any(k in lower for k in probable_tax_keywords):
            totals["taxAmount"] = extract_number(clean)

        # Extract Total
        elif any(k in lower for k in probable_total_keywords):
            if "subtotal" not in lower:
                total = extract_number(clean)
                if total > totals["totalAmount"]:
                    totals["totalAmount"] = total

        # Extract Quantity
        elif any(k in lower for k in probable_quantity_keywords):
            qty_match = re.search(r"[\d,]+\.\d+|\d+", line)
            if qty_match:
                try:
                    totals["quantity"] = float(qty_match.group(0).replace(",", ""))
                except:
                    totals["quantity"] = 0.0

        # Extract row total (line items)
        matches = re.findall(r"(?:Rs\.?|₹)?\s*([\d,]+\.\d{2}|\d{1,3}(?:,\d{3})+)", clean)
        if len(matches) >= 3:
            try:
                totals["lineItemSum"] += float(matches[-1].replace(",", ""))
            except:
                pass

        for val in matches:
            try:
                all_numbers.append(float(val.replace(",", "")))
            except:
                pass

    # Fallback: if no totalAmount found via keywords, use max number
    if totals["totalAmount"] == 0 and all_numbers:
        totals["totalAmount"] = max(all_numbers)

    # Compute derived values
    if totals["taxableAmount"] > 0:
        totals["effectiveTaxRate"] = round((totals["taxAmount"] / totals["taxableAmount"]) * 100, 2)

    totals["lineItemDiscrepancy"] = round(totals["lineItemSum"] - totals["totalAmount"], 2)
    return totals

#  Compare against submitted amount
def validate_invoice_totals(calculated, submitted_amount):
    return {
        "submittedAmount": submitted_amount,
        "calculatedTaxable": calculated["taxableAmount"],
        "calculatedTax": calculated["taxAmount"],
        "calculatedTotal": calculated["totalAmount"],
        "effectiveTaxRatePercent": calculated["effectiveTaxRate"],
        "lineItemSum": calculated["lineItemSum"],
        "lineItemDiscrepancy": calculated["lineItemDiscrepancy"],
        "quantity": calculated["quantity"],
        "matchWithSubmission": abs(submitted_amount - calculated["totalAmount"]) < 1,
    }

#  List of invoices with expected Net Amount
invoice_files = [
    {"fileName": "invoice_1.pdf", "submittedAmount": 1250.0},
    {"fileName": "invoice_2.pdf", "submittedAmount": 2499.0},
    {"fileName": "invoice_3.pdf", "submittedAmount": 623.0},
    {"fileName": "medical_invoice.pdf", "submittedAmount": 4467.0}
]

#  Run validation
results = []
for invoice in invoice_files:
    file_name = invoice["fileName"]
    submitted_amount = invoice["submittedAmount"]
    full_path = os.path.join(pdf_folder_path, file_name)

    print(f" Validating: {file_name}")

    try:
        converter = DocumentConverter()
        result = converter.convert(full_path)
        markdown = result.document.export_to_markdown()

        extracted_totals = extract_invoice_totals_from_markdown(markdown)
        validation = validate_invoice_totals(extracted_totals, submitted_amount)
        validation["fileName"] = file_name
        results.append(validation)

    except Exception as e:
        results.append({
            "fileName": file_name,
            "error": str(e)
        })

#  Output
print("\n## Invoice Validation Results")
print(json.dumps(results, indent=2))


 Validating: invoice_1.pdf
 Validating: invoice_2.pdf
 Validating: invoice_3.pdf
 Validating: medical_invoice.pdf

## Invoice Validation Results
[
  {
    "submittedAmount": 1250.0,
    "calculatedTaxable": 150.0,
    "calculatedTax": 30.0,
    "calculatedTotal": 180.0,
    "effectiveTaxRatePercent": 20.0,
    "lineItemSum": 0.0,
    "lineItemDiscrepancy": -180.0,
    "quantity": 0.0,
    "matchWithSubmission": false,
    "fileName": "invoice_1.pdf"
  },
  {
    "submittedAmount": 2499.0,
    "calculatedTaxable": 160.0,
    "calculatedTax": 0.5,
    "calculatedTotal": 159.5,
    "effectiveTaxRatePercent": 0.31,
    "lineItemSum": 0.0,
    "lineItemDiscrepancy": -159.5,
    "quantity": 0.0,
    "matchWithSubmission": false,
    "fileName": "invoice_2.pdf"
  },
  {
    "submittedAmount": 623.0,
    "calculatedTaxable": 500.0,
    "calculatedTax": 123.0,
    "calculatedTotal": 623.0,
    "effectiveTaxRatePercent": 24.6,
    "lineItemSum": 0.0,
    "lineItemDiscrepancy": -623.0,
    "quant

In [4]:
# Step 1: Extract income-related pages from a PDF and create temp
def extract_income_statement_pages(pdf_path, keywords=None):
    if keywords is None:
        keywords = ["income statement", "consolidated statements of income", "net income", "gross profit"]
    
    doc = fitz.open(pdf_path)
    matched_pages = []
    for page_num in range(len(doc)):
        text = doc.load_page(page_num).get_text().lower()
        match_count = sum(1 for k in keywords if k in text)
        if match_count >= 2:
            matched_pages.append(page_num)
    return matched_pages

# Step 2: Save matched pages into new temp PDF
def create_temp_pdf_with_matched_pages(original_pdf_path, matched_pages):
    reader = PdfReader(original_pdf_path)
    writer = PdfWriter()
    for page_num in matched_pages:
        writer.add_page(reader.pages[page_num])
    
    temp_path = os.path.join(os.path.dirname(original_pdf_path), "_filtered_income_statement.pdf")
    with open(temp_path, "wb") as f:
        writer.write(f)
    return temp_path

# Step 3: Run Docling only on filtered PDF
def extract_income_statement_from_pdf(pdf_path):
    start = time.time()
    matched_pages = extract_income_statement_pages(pdf_path)

    if not matched_pages:
        print("❌ No income statement pages found.")
        return ""

    filtered_pdf_path = create_temp_pdf_with_matched_pages(pdf_path, matched_pages)

    converter = DocumentConverter()
    result = converter.convert(filtered_pdf_path)
    markdown = result.document.export_to_markdown()

    print(f" Extracted in {round(time.time() - start, 2)} seconds")
    return markdown

# Run
if __name__ == "__main__":
    input_file = "C:/Users/ramum/pdfs/Coca-Cola 2025 Q1 Earnings Release_Full Release_4.29.25.pdf"
    markdown_output = extract_income_statement_from_pdf(input_file)
    print(markdown_output)
    



 Extracted in 48.45 seconds
## THE COCA-COLA COMPANY AND SUBSIDIARIES

## Consolidated Statements of Income

(In millions except per share data)

|                                                                  | Three Months Ended   | Three Months Ended   | Three Months Ended   |
|------------------------------------------------------------------|----------------------|----------------------|----------------------|
|                                                                  | March 28, 2025       | March 29, 2024       | % Change             |
| Net Operating Revenues                                           | $ 11,129             | $ 11,300             | (2)                  |
| Cost of goods sold                                               | 4,163                | 4,235                | (2)                  |
| Gross Profit                                                     | 6,966                | 7,065                | (1)                  |
| Selling, general and adm

##  Coca-Cola Income Statement Analysis Pipeline (Markdown + LLM)
This Python script automates the process of extracting financial tables (income statements) from Coca-Cola’s PDF earnings report, cleans the table data into JSON and DataFrame formats, and sends it to a local LLM (Ollama/Mistral) for analysis.
________________________________________
## Features
• Extracts income statement pages from a PDF using keyword matching
• Converts PDF content to structured markdown using Docling
• Parses financial tables into clean JSON and DataFrame format
• Uses Ollama (e.g., Mistral) to generate natural language insights for financial review


In [5]:
# Step 1: Identify income-statement pages
def extract_income_statement_pages(pdf_path, keywords=None):
    if keywords is None:
        keywords = ["income statement", "consolidated statements of income", "net income", "gross profit"]
    doc = fitz.open(pdf_path)
    matched_pages = []
    for page_num in range(len(doc)):
        text = doc.load_page(page_num).get_text().lower()
        match_count = sum(1 for k in keywords if k in text)
        if match_count >= 2:
            matched_pages.append(page_num)
    return matched_pages

# Step 2: Extract matched pages to a filtered PDF
def create_temp_pdf_with_matched_pages(original_pdf_path, matched_pages):
    reader = PdfReader(original_pdf_path)
    writer = PdfWriter()
    for page_num in matched_pages:
        writer.add_page(reader.pages[page_num])
    temp_path = os.path.join(os.path.dirname(original_pdf_path), "_filtered_income_statement.pdf")
    with open(temp_path, "wb") as f:
        writer.write(f)
    return temp_path

# Step 3: Run Docling to get markdown from filtered PDF
def extract_income_statement_from_pdf(pdf_path):
    start = time.time()
    matched_pages = extract_income_statement_pages(pdf_path)
    if not matched_pages:
        print("No income statement pages found.")
        return ""

    filtered_pdf_path = create_temp_pdf_with_matched_pages(pdf_path, matched_pages)
    converter = DocumentConverter()
    result = converter.convert(filtered_pdf_path)
    markdown = result.document.export_to_markdown()
    print(f" Extracted in {round(time.time() - start, 2)} seconds")
    return markdown

# Step 4: Extract all valid markdown tables
def extract_markdown_tables(markdown):
    lines = markdown.splitlines()
    tables = []
    current_table = []
    inside_table = False

    for line in lines:
        line = line.strip()

        if line.startswith("|") and "---" in line:
            if current_table and current_table[-1].startswith("|"):
                current_table.append(line)
                inside_table = True
                continue

        if inside_table:
            if line.startswith("|"):
                current_table.append(line)
            else:
                if len(current_table) >= 3:
                    tables.append(current_table)
                current_table = []
                inside_table = False

        elif line.startswith("|") and not inside_table:
            current_table = [line]

    if inside_table and len(current_table) >= 3:
        tables.append(current_table)

    return tables

# Step 5: Convert markdown table to JSON with fixed headers
def markdown_table_to_json(table_lines):
    if len(table_lines) < 4:
        return []

    headers = ["Metric", "Q1 2025", "Q1 2024", "% Change"]
    json_data = []

    for line in table_lines[3:]:  # Skip junk header rows + separator
        row = [cell.strip().replace("$", "").replace(",", "") for cell in line.split("|") if cell.strip()]
        if len(row) == 4:
            json_data.append(dict(zip(headers, row)))
        else:
            print(f" Skipped malformed row: {row}")
    return json_data

# Step 6: Send cleaned data to Ollama model for LLM analysis
def send_to_llm(df):
    prompt = f"""You are a financial analyst.

Here is Coca-Cola's quarterly comparison:

{df.to_string(index=False)}

Please analyze:
- Revenue and profit trends
- Operating efficiency
- Notable changes or red flags
- Investment or leadership recommendations

Present 3–5 insights clearly in bullet points.
"""
    response = ollama.chat(model="mistral", messages=[
        {"role": "user", "content": prompt}
    ])
    #VLM model or text model
    print("\n LLM Insights (Mistral):\n")
    print(response['message']['content'])

#  Main runner
if __name__ == "__main__":
    input_file = "C:/Users/ramum/pdfs/Coca-Cola 2025 Q1 Earnings Release_Full Release_4.29.25.pdf"
    markdown_output = extract_income_statement_from_pdf(input_file)

    tables = extract_markdown_tables(markdown_output)
    if not tables:
        print(" No valid tables found.")
    else:
        best_table = max(tables, key=len)
        income_json = markdown_table_to_json(best_table)

        print("\n Extracted Table as JSON:\n")
        print(json.dumps(income_json, indent=2))

        # Convert to DataFrame
        df = pd.DataFrame(income_json)
        print("\n Cleaned Financial Table:\n")
        print(df)

        # Send to Ollama LLM
        send_to_llm(df)


 Extracted in 48.66 seconds

 Extracted Table as JSON:

[
  {
    "Metric": "Net Operating Revenues",
    "Q1 2025": " 11129",
    "Q1 2024": " 11300",
    "% Change": "(2)"
  },
  {
    "Metric": "Cost of goods sold",
    "Q1 2025": "4163",
    "Q1 2024": "4235",
    "% Change": "(2)"
  },
  {
    "Metric": "Gross Profit",
    "Q1 2025": "6966",
    "Q1 2024": "7065",
    "% Change": "(1)"
  },
  {
    "Metric": "Selling general and administrative expenses",
    "Q1 2025": "3234",
    "Q1 2024": "3351",
    "% Change": "(4)"
  },
  {
    "Metric": "Other operating charges",
    "Q1 2025": "73",
    "Q1 2024": "1573",
    "% Change": "(95)"
  },
  {
    "Metric": "Operating Income",
    "Q1 2025": "3659",
    "Q1 2024": "2141",
    "% Change": "71"
  },
  {
    "Metric": "Interest income",
    "Q1 2025": "180",
    "Q1 2024": "246",
    "% Change": "(27)"
  },
  {
    "Metric": "Interest expense",
    "Q1 2025": "387",
    "Q1 2024": "382",
    "% Change": "1"
  },
  {
    "Metric": "E