In [None]:
import fitz
def extract_text_from_pdf(pdf_path):
    pdf_document = fitz.open(pdf_path)
    extracted_text = ""

    for page_num in range(len(pdf_document)):
        page = pdf_document.load_page(page_num)
        text = page.get_text("text")
        
        # Log the extracted text for each page for debugging purposes
        print(f"Page {page_num + 1} text length: {len(text)}")

        extracted_text += text

    pdf_document.close()
    return extracted_text

In [None]:
import fitz  # PyMuPDF

def detect_image_layer(pdf_path):
    # Open the PDF file
    pdf_document = fitz.open(pdf_path)

    # Iterate through each page to check for images
    for page_num in range(len(pdf_document)):
        page = pdf_document.load_page(page_num)
        
        # Extract image information
        image_list = page.get_images(full=True)

        # If there are any images on the page, return True
        if image_list:
            pdf_document.close()
            return True

    # Close the PDF file
    pdf_document.close()

    # Return False if no images were found in any of the pages
    return False

In [None]:
import re

def extract_relevant_details_text(text):
    details = {}

    # Helper function to safely extract data using regex
    def safe_extract(pattern, text, group=1, default=""):
        match = re.search(pattern, text)
        if match:
            return match.group(group)
        return default

    # Extracting relevant details
    details["GSTIN"] = safe_extract(r"GSTIN\s+([A-Z0-9]+)", text)
    details["Mobile"] = safe_extract(r"Mobile\s+(\+?\d[\d\s\-]+)", text)
    details["Email"] = safe_extract(
        r"Email\s+([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", text
    )
    details["Invoice #"] = safe_extract(r"Invoice #:\s+(\S+)", text)
    details["Invoice Date"] = safe_extract(r"Invoice Date:\s+([0-9A-Za-z\s]+)\s+Due Date:", text)
    details["Due Date"] = safe_extract(r"Due Date:\s+([0-9A-Za-z\s]+)\s+Customer Details:", text)

    # More flexible Customer Details extraction
    details["Customer Details"] = safe_extract(r"Customer Details:?\s*(.+?)(?:\n|Ph:)", text)
    
    # Extract Customer Phone, but handle absence of phone numbers
    details["Customer Phone"] = safe_extract(r"Ph:\s+(\d+)", text, default="N/A")

    details["Place of Supply"] = safe_extract(r"Place of Supply:\s+([0-9A-Za-z\s-]+)", text)

    # Extract items
    items = extract_items(text)
    details["Items"] = items

    # Tax and summary details
    details["Taxable Amount"] = safe_extract(r"Taxable Amount\n₹([0-9.,]+)", text)
    details["IGST"] = safe_extract(r"IGST\s*[0-9.]+%\n₹([0-9.,]+)", text, default="N/A")

    # Extract multiple CGST and SGST rates and amounts
    cgst_matches = re.findall(r"CGST\s*([0-9.]+)%\n₹([0-9.,]+)", text)
    sgst_matches = re.findall(r"SGST\s*([0-9.]+)%\n₹([0-9.,]+)", text)

    details["CGST"] = [f"{rate}%: ₹{amount}" for rate, amount in cgst_matches] if cgst_matches else "N/A"
    details["SGST"] = [f"{rate}%: ₹{amount}" for rate, amount in sgst_matches] if sgst_matches else "N/A"
    
    # Handle round-off values, including negative values
    details["Round Off"] = safe_extract(r"Round Off\n([+-]?[0-9.,]+)", text, default="0.00")
  
    # Extract other key details
    details["Total"] = safe_extract(r"Total\n₹([0-9.,]+)", text)
    details["Total Discount"] = safe_extract(r"Total Discount\n₹([0-9.,]+)", text, default="N/A")
    details["Total Items / Qty"] = safe_extract(r"Total Items / Qty : ([0-9]+) / ([0-9.]+)", text)

    # Bank details
    details["Bank"] = safe_extract(r"Bank:\s+(.+)", text, default="N/A")
    details["IFSC Code"] = safe_extract(r"IFSC Code:\s+([A-Za-z0-9]+)", text, default="N/A")
    details["Account #"] = safe_extract(r"Account #:\s+(\d+)", text, default="N/A")
    details["Branch"] = safe_extract(r"Branch:\s+(.+)", text, default="N/A")

    # Total Amount in words
    details["Total Amount (in words)"] = safe_extract(
        r"Total amount \(in words\):\s+(.+)", text
    )

    return details

# Log missing fields for debugging purposes
def log_missing_fields(details, pdf_filename):
    missing_fields = [key for key, value in details.items() if not value]
    if missing_fields:
        print(f"Missing fields in {pdf_filename}: {', '.join(missing_fields)}")

# Function to extract items from the invoice
def extract_items(text):
    # Extract only after the item header to avoid capturing irrelevant parts
    item_start = re.search(r"Item\s+Rate / Item\s+Qty\s+Taxable Value\s+Tax Amount\s+Amount", text)
    if item_start:
        # Trim the text to start after the item header
        text = text[item_start.end():]

    # Now, extract items until we hit a recognizable "Total" or other section
    item_end = re.search(r"Taxable Amount|CGST|SGST|IGST|Round Off|Total", text)
    if item_end:
        text = text[:item_end.start()]  # Trim after the last item

    # Split the remaining text into lines to process each item
    lines = [line.strip() for line in text.split("\n") if line.strip()]

    items = []
    index = 0

    while index < len(lines):
        try:
            # Detect the start of a new item based on item number (digits)
            if re.match(r"^\d+$", lines[index]):  # Item number is a digit
                current_item = {
                    "Item #": lines[index]
                }

                # Collect the item description which may span multiple lines
                description_lines = []
                index += 1
                # Continue collecting until we hit a numeric value (Rate / Item)
                while index < len(lines) and not re.match(r"^[0-9,.]+$", lines[index]):
                    description_lines.append(lines[index])
                    index += 1

                # Join all the description lines into a single description
                current_item["Item"] = " ".join(description_lines).strip()

                # Check that we have enough remaining lines for the expected fields
                if index + 4 < len(lines):
                    # Now, handle the case where numbers may be split across multiple lines
                    current_item["Rate / Item"] = lines[index]

                    # Handle the cases where the next field may be split across lines
                    if "(" in lines[index + 1]:
                        current_item["Rate / Item"] += " " + lines[index + 1]
                        current_item["Qty"] = lines[index + 2]
                        current_item["Taxable Value"] = lines[index + 3]
                        current_item["Tax Amount"] = lines[index + 4]
                        current_item["Amount"] = lines[index + 5]
                        index += 6
                    else:
                        current_item["Qty"] = lines[index + 1]
                        current_item["Taxable Value"] = lines[index + 2]
                        current_item["Tax Amount"] = lines[index + 3]
                        current_item["Amount"] = lines[index + 4]
                        index += 5

                    # Append the item to the items list
                    items.append(current_item)

            else:
                index += 1  # Skip lines that don't match the expected item format

        except IndexError:
            # If we run into an index issue, skip this item and continue
            index += 1

    return items

In [None]:
import re
import pandas as pd
from datetime import datetime

# Validate GSTIN format
def validate_gstin(gstin):
    gstin_pattern = r"^[0-9]{2}[A-Z]{5}[0-9]{4}[A-Z]{1}[1-9A-Z]{1}Z[0-9A-Z]{1}$"
    return re.match(gstin_pattern, gstin) is not None

# Validate email format
def validate_email(email):
    email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    return re.match(email_pattern, email) is not None

# Validate dates
def validate_dates(invoice_date, due_date):
    try:
        inv_date = datetime.strptime(invoice_date, "%d %b %Y")
        due_date = datetime.strptime(due_date, "%d %b %Y")
        return inv_date <= due_date
    except ValueError:
        return False

# Cross-field validation for item amounts and total
# Simplified cross-field validation: compare sum of item amounts to taxable amount
def cross_field_validation(row):
    try:
        # Extract relevant fields from the row
        items = eval(row["Items"])  # Convert the string representation of items to a list of dictionaries
        taxable_amount = float(row["Taxable Amount"].replace(",", "")) if isinstance(row["Taxable Amount"], str) else row["Taxable Amount"]
        total = float(row["Total"].replace(",", "")) if isinstance(row["Total"], str) else row["Total"]
        round_off = float(row["Round Off"]) if row["Round Off"] else 0.0

        # Sum up the amounts from all items
        total_item_amounts = sum(float(item["Amount"].replace(",", "")) if isinstance(item["Amount"], str) else item["Amount"] for item in items)

        # Debugging print statements
        print(f"Sum of Item Amounts: {total_item_amounts}, Total with Round Off: {total_item_amounts + round_off}, Invoice Total: {total}")

        # Check if the sum of item amounts plus round-off matches the invoice total
        if abs(total_item_amounts + round_off - total) <= 0.02:  # Allowing for small rounding differences
            return True
        else:
            print(f"Cross-field validation failed: sum of item amounts {total_item_amounts:.2f} with round-off {round_off:.2f} vs. invoice total {total:.2f}")
            return False

    except Exception as e:
        print(f"Cross-field validation error: {str(e)}")
        return False


# Validate total with round-off and tax values
def validate_total(taxable_amount, cgst, sgst, igst, round_off, total):
    try:
        # Ensure taxable_amount, round_off, and total are treated as floats
        taxable_amount = float(taxable_amount.replace(",", "")) if isinstance(taxable_amount, str) else taxable_amount
        round_off = float(round_off) if isinstance(round_off, str) else round_off
        total = float(total.replace(",", "")) if isinstance(total, str) else total

        # Handle empty, NaN, or missing CGST and SGST values, defaulting to 0
        if isinstance(cgst, str):
            cgst = sum(float(t.split("₹")[1].replace(",", "")) for t in eval(cgst)) if cgst else 0
        elif isinstance(cgst, list):
            cgst = sum(cgst)
        else:
            cgst = float(cgst) if isinstance(cgst, str) and cgst else 0
        
        if isinstance(sgst, str):
            sgst = sum(float(t.split("₹")[1].replace(",", "")) for t in eval(sgst)) if sgst else 0
        elif isinstance(sgst, list):
            sgst = sum(sgst)
        else:
            sgst = float(sgst) if isinstance(sgst, str) and sgst else 0
        
        igst = float(igst.replace(",", "")) if isinstance(igst, str) and igst != "N/A" else 0

        # Debugging print statements
        calculated_total = taxable_amount + cgst + sgst + igst + round_off
        print(f"Taxable Amount: {taxable_amount}")
        print(f"CGST: {cgst}, SGST: {sgst}, IGST: {igst}, Round Off: {round_off}")
        print(f"Expected Total: {calculated_total}, Actual Total: {total}")

        # Set a small tolerance for rounding differences (e.g., 0.02)
        tolerance = 0.02

        # Validate total by allowing small differences within the tolerance
        return abs(calculated_total - total) <= tolerance

    except Exception as e:
        print(f"Error validating total: {e}")
        return False

# Trust determination based on validation checks
def determine_trust(row):
    errors = []
    trust_level = "High Trust"  # Default to High Trust
    
    # Field-level validation
    if not validate_gstin(row["GSTIN"]):
        errors.append("Invalid GSTIN")
    
    if not validate_email(row["Email"]):
        errors.append("Invalid Email")
    
    if not validate_dates(row["Invoice Date"], row["Due Date"]):
        # High Trust but with a note for date discrepancy
        errors.append("Invalid Date Range (Invoice Date after Due Date)")
    
    # Cross-field validation for total consistency
    if not cross_field_validation(row):
        errors.append("Cross-field Inconsistency")
        trust_level = "Low Trust"  # Set to Low Trust for cross-field inconsistency

    # Combine errors in brackets if there are any
    if errors:
        return f"{trust_level} ({', '.join(errors)})"
    else:
        return trust_level

In [None]:
from paddleocr import PaddleOCR
from pdf2image import convert_from_path
import os

# Initialize the OCR engine
ocr = PaddleOCR(use_angle_cls=True, lang='en')

def extract_text_with_ocr(pdf_path):
    # Convert PDF pages to images
    images = convert_from_path(pdf_path)
    extracted_text = ""

    for page_num, image in enumerate(images):
        image_path = f"page_{page_num}.png"
        image.save(image_path, "PNG")

        # Run OCR on the saved image
        ocr_result = ocr.ocr(image_path, cls=True)

        # Combine OCR results into a single string
        for line in ocr_result:
            for word_info in line:
                extracted_text += word_info[1][0] + " "
            extracted_text += "\n"

        # Remove the temporary image
        os.remove(image_path)

    return extracted_text

In [None]:
def extract_relevant_details_ocr(text):
    details = {}

    # Helper function to safely extract data using regex
    def safe_extract(pattern, text, group=1, default=""):
        match = re.search(pattern, text, re.IGNORECASE)
        if match:
            return match.group(group).strip()
        return default

    # Extracting relevant details with adjusted patterns
    details["GSTIN"] = safe_extract(r"GSTIN\s*([A-Z0-9]+)", text)
    details["Mobile"] = safe_extract(r"Mobile\s*(\+?\d[\d\s\-]+)", text)
    details["Email"] = safe_extract(
        r"Email\s*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", text
    )
    details["Invoice #"] = safe_extract(r"Invoice #[:\s]+(\S+)", text)
    details["Invoice Date"] = safe_extract(r"Invoice Date[:\s]+([0-9A-Za-z\s]+)", text)
    details["Due Date"] = safe_extract(r"Due Date[:\s]+([0-9A-Za-z\s]+)", text)
    details["Customer Details"] = safe_extract(r"Customer Details[:\s]*([A-Za-z\s]+)", text)
    details["Place of Supply"] = safe_extract(r"Place of Supply[:\s]+([0-9A-Za-z\s-]+)", text)

    # Now call the `extract_items` function to extract items
    items = extract_items(text)
    details["Items"] = items

    # Tax and summary details
    details["Taxable Amount"] = safe_extract(r"Taxable Amount\s*([0-9,.\s]+)", text)
    details["IGST"] = safe_extract(r"IGST\s*[0-9.]+%\s*([0-9,.\s]+)", text, default="N/A")

    # Extract multiple CGST and SGST rates and amounts
    cgst_matches = re.findall(r"CGST\s*([0-9.]+)%\s*([0-9,.\s]+)", text)
    sgst_matches = re.findall(r"SGST\s*([0-9.]+)%\s*([0-9,.\s]+)", text)

    details["CGST"] = [f"{rate}%: ₹{amount}" for rate, amount in cgst_matches] if cgst_matches else "N/A"
    details["SGST"] = [f"{rate}%: ₹{amount}" for rate, amount in sgst_matches] if sgst_matches else "N/A"
    
    # Handle round-off values, including negative values
    details["Round Off"] = safe_extract(r"Round Off\s*([+-]?[0-9,.\s]+)", text, default="0.00")
  
    # Extract other key details
    details["Total"] = safe_extract(r"Total\s*([0-9,.\s]+)", text)
    details["Total Discount"] = safe_extract(r"Total Discount\s*([0-9,.\s]+)", text, default="N/A")
    details["Total Amount (in words)"] = safe_extract(r"Total amount (in words)[:\s]+(.+?)(?:\s+Amount Paid|$)", text)

    details["Total Items / Qty"] = safe_extract(r"Total Items / Qty\s*:\s*([0-9]+)\s*/\s*([0-9.]+)", text)

    # Bank details
    details["Bank"] = safe_extract(r"Bank[:\s]+(.+?)(?:IFSC|Account #|Branch|$)", text)
    details["IFSC Code"] = safe_extract(r"IFSC Code[:\s]+([A-Z0-9]+)", text)
    details["Account #"] = safe_extract(r"Account #[:\s]+(\d+)", text)
    details["Branch"] = safe_extract(r"Branch[:\s]+(.+?)(?:Authorized|$)", text)

    return details


# Function to extract items from the invoice
def extract_items(text):
    # Updated regex pattern to better match item structures
    item_pattern = re.compile(
        r"(\d+)\s+([A-Za-z\s\-0-9]+?)\s+([0-9,]+(?:\.[0-9]{2})?)\s+([0-9A-Za-z\s]+)\s+([0-9,]+(?:\.[0-9]{2})?)\s+([0-9,]+(?:\.[0-9]{2})?)\s*\((\d+%?)\)\s+([0-9,]+(?:\.[0-9]{2})?)",
        re.IGNORECASE
    )
    
    items = []
    
    # Find all matches using the pattern
    for match in item_pattern.finditer(text):
        item_details = {
            "Item #": match.group(1).strip(),
            "Item": match.group(2).strip(),
            "Rate / Item": match.group(3).strip(),
            "Qty": match.group(4).strip(),
            "Taxable Value": match.group(5).strip(),
            "Tax Amount": f"{match.group(7)}: ₹{match.group(6).strip()}",
            "Amount": match.group(8).strip(),
        }
        items.append(item_details)
    
    return items

In [None]:
def consolidate_data(initial_details, ocr_details):
    """
    Merges data extracted from text-based extraction with data from OCR.
    Priority is given to initial_details unless a field is missing or invalid.
    """
    # Create a copy of initial_details to preserve the original data.
    consolidated_details = initial_details.copy()

    # Iterate over each field in ocr_details
    for key, value in ocr_details.items():
        # If the initial details field is empty or invalid, use the OCR result.
        if not consolidated_details.get(key) or consolidated_details[key] in ["", "N/A"]:
            consolidated_details[key] = value

    return consolidated_details

In [None]:
def process_invoice(pdf_path):
    # Step 1: Detect if the PDF has images.
    has_image_layer = detect_image_layer(pdf_path)

    # Step 2: Extract text using the text-only pipeline.
    text = extract_text_from_pdf(pdf_path)  # Extract text even if images are present.
    
    # Step 3: Parse the text into structured fields using the text-based regex logic.
    extracted_details = extract_relevant_details_text(text)
    
    # Step 4: Validate the fields and identify missing/incorrect fields.
    missing_fields = [key for key, value in extracted_details.items() if not value]

    # Step 5: If there are missing fields, use OCR selectively.
    if has_image_layer and missing_fields:
        ocr_text = extract_text_with_ocr(pdf_path)
        ocr_details = extract_relevant_details_ocr(ocr_text)
        final_details = consolidate_data(extracted_details, ocr_details)
    
    elif not text.strip():
        # If no text was extracted, fall back to full OCR.
        ocr_text = extract_text_with_ocr(pdf_path)
        final_details = extract_relevant_details_ocr(ocr_text)
    else:
        final_details = extracted_details

    # Step 6: Apply trust determination to assess data accuracy.
    final_details["Trust Level"] = determine_trust(final_details)

    # Step 7: Return the final details with trust level.
    return final_details

In [None]:
import os
import csv

def process_invoices_in_folder(folder_path, output_csv_path):
    # Prepare a list to store the results for each processed PDF.
    extracted_data_list = []

    # Loop through all files in the given folder.
    for filename in os.listdir(folder_path):
        if filename.lower().endswith(".pdf"):  # Process only PDF files.
            pdf_path = os.path.join(folder_path, filename)
            print(f"Processing {filename}...")

            try:
                # Run the process_invoice function on each PDF.
                extracted_details = process_invoice(pdf_path)
                
                # Add the filename to the extracted details for reference.
                extracted_details["Filename"] = filename
                
                # Append the extracted details to the list.
                extracted_data_list.append(extracted_details)

                print(f"Successfully processed {filename}")
            except Exception as e:
                print(f"Error processing {filename}: {e}")

    # Write all the extracted data to a CSV file.
    save_extracted_data_to_csv(extracted_data_list, output_csv_path)

def save_extracted_data_to_csv(data_list, output_csv_path):
    # Define the header based on the keys in the first extracted record.
    if data_list:
        header = list(data_list[0].keys())
    else:
        print("No data to save.")
        return

    # Write data to a CSV file.
    with open(output_csv_path, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=header)
        writer.writeheader()

        # Write each row of data.
        for data in data_list:
            writer.writerow(data)

    print(f"Data saved to {output_csv_path}")

# Example usage:
folder_path = "path_to_your_pdf_folder"  # Replace with the path to the folder containing PDFs.
output_csv_path = "extracted_invoice_data.csv"  # Replace with the desired output CSV file path.
process_invoices_in_folder(folder_path, output_csv_path)