In [1]:
# Install system dependencies
!apt-get install -y tesseract-ocr libtesseract-dev poppler-utils
# Install Python libraries
!pip install pytesseract pdf2image fastapi uvicorn python-multipart pyngrok nest_asyncio opencv-python-headless dateparser

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
tesseract-ocr is already the newest version (4.1.1-2.1build1).
The following additional packages will be installed:
  libarchive-dev libleptonica-dev
The following NEW packages will be installed:
  libarchive-dev libleptonica-dev libtesseract-dev poppler-utils
0 upgraded, 4 newly installed, 0 to remove and 1 not upgraded.
Need to get 3,929 kB of archives.
After this operation, 16.7 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 libarchive-dev amd64 3.6.0-1ubuntu1.5 [581 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 libleptonica-dev amd64 1.82.0-3build1 [1,562 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy/universe amd64 libtesseract-dev amd64 4.1.1-2.1build1 [1,600 kB]
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 poppler-utils amd64 22.02.0-2ubuntu0.12 [186 kB]
Fetched 3,929 kB in 2s (2,293 kB/

In [2]:
import os, signal, subprocess

def stop_server(port=8000):
    try:
        # Find the Process ID (PID) using the port
        result = subprocess.run(["lsof", "-t", f"-i:{port}"], capture_output=True, text=True)
        if result.stdout:
            for pid in result.stdout.strip().split("\n"):
                os.kill(int(pid), signal.SIGKILL)
            print(f"üõë Server on port {port} has been stopped.")
        else:
            print("‚ÑπÔ∏è No server was running on that port.")
    except Exception as e:
        print(f"Error: {e}")

stop_server(8000)

‚ÑπÔ∏è No server was running on that port.


In [None]:
import os, re, cv2, sys, asyncio, subprocess, threading, time, signal
import numpy as np
import pytesseract
import dateparser
import nest_asyncio
import uvicorn
import requests
from PIL import Image, ImageOps
from pdf2image import convert_from_path
from fastapi import FastAPI, File, UploadFile
from google.colab import files

# ---------------------------------------------------------
# 1. SYSTEM INITIALIZATION
# ---------------------------------------------------------
def initialize_system():
    result = subprocess.run(["lsof", "-t", "-i:8000"], capture_output=True, text=True)
    if result.stdout:
        for pid in result.stdout.strip().split("\n"):
            try: os.kill(int(pid), signal.SIGKILL)
            except: pass

    subprocess.run(["apt-get", "update"], capture_output=True)
    subprocess.run(["apt-get", "install", "-y", "tesseract-ocr", "poppler-utils"], capture_output=True)
    subprocess.run([sys.executable, "-m", "pip", "install", "pytesseract", "pdf2image",
                    "fastapi", "uvicorn", "python-multipart", "nest_asyncio",
                    "opencv-python-headless", "dateparser"], capture_output=True)

print("‚öôÔ∏è Initializing environment...")
initialize_system()

# ---------------------------------------------------------
# 2. ENHANCED EXTRACTION ENGINE
# ---------------------------------------------------------
class FinanceEngine:
    def __init__(self, text):
        self.text = text
        # Remove artifacts often caused by QR codes or logos
        self.clean_text = re.sub(r'[|\\/_]{2,}', '', text)

    def extract(self):
        # 1. Vendor: Search for Amazon "Sold By" or Myntra "Bill From"
        vendor = "Unknown"
        # Specifically targeting the line immediately after the Sold By/Bill From labels [cite: 3, 78, 107]
        vendor_match = re.search(r'(?:Sold By|Bill From|Bill\s*To\s*/\s*Ship\s*To)[:\s]+(.*?)(?:\n|GSTIN|Plot|Address)', self.text, re.IGNORECASE | re.DOTALL)
        if vendor_match:
            lines = [l.strip() for l in vendor_match.group(1).split('\n') if len(l.strip()) > 2]
            if lines:
                vendor = lines[0]

        # Fallback for Vendor if it picked up a generic "Sudheer" (Buyer name) [cite: 8, 43, 76, 105]
        if vendor.lower() in ["sudheer", "unknown"]:
            legal_match = re.search(r'([A-Z][A-Z\s&]+(?:PVT\.?\s*LTD|PRIVATE LIMITED|CORPORATION|RETAIL))', self.text, re.IGNORECASE)
            if legal_match:
                vendor = legal_match.group(1).strip()

        # 2. Date: Extract and parse correctly [cite: 27, 60, 83, 111]
        date_val = "None"
        date_pattern = r'(?:Invoice Date|Order Date)\s*[:\.]?\s*(\d{1,2}[.\-/\s](?:\d{1,2}|[A-Za-z]{3,})\s*[.\-/\s]\d{2,4})'
        date_matches = re.findall(date_pattern, self.text, re.IGNORECASE)
        if date_matches:
            parsed = dateparser.parse(date_matches[0].replace('.', '-'))
            if parsed:
                date_val = parsed.strftime("%Y-%m-%d")

        # 3. Total: Targeted extraction for Table Footers [cite: 28, 61, 88, 117]
        total = 0.0
        # Strategy: Look for the 'TOTAL' row and grab the very last number in that text block
        total_blocks = re.findall(r'TOTAL[:\s]+.*?(\d{1,3}(?:,\d{3})*(?:\.\d{2}))', self.text, re.IGNORECASE | re.DOTALL)

        if total_blocks:
            # We take the last match because the Grand Total is usually the final number in the sequence [cite: 28, 88]
            final_val = total_blocks[-1].replace(',', '')
            total = float(final_val)
        else:
            # Secondary strategy: Look for "Amount in Words" and grab the number just before it [cite: 29, 62]
            fallback = re.findall(r'(\d{1,3}(?:,\d{3})*(?:\.\d{2}))\s*(?:Amount in Words|One|Rs|INR)', self.text, re.IGNORECASE)
            if fallback:
                total = max([float(x.replace(',', '')) for x in fallback])

        return {"vendor": vendor, "date": date_val, "total": total}

# ---------------------------------------------------------
# 3. BACKGROUND API
# ---------------------------------------------------------
app = FastAPI()

@app.post("/extract")
async def extract_api(file: UploadFile = File(...)):
    temp_path = f"process_{file.filename}"
    try:
        content = await file.read()
        with open(temp_path, "wb") as f:
            f.write(content)

        is_pdf = temp_path.lower().endswith(".pdf")
        images = convert_from_path(temp_path) if is_pdf else [Image.open(temp_path)]

        full_text = ""
        for img in images:
            # Grayscale + Thresholding to clear up the "Total" column noise
            img = ImageOps.grayscale(img)
            img = img.point(lambda x: 0 if x < 150 else 255, '1')
            full_text += pytesseract.image_to_string(img, config='--psm 3') + "\n"

        return FinanceEngine(full_text).extract()
    except Exception as e:
        return {"error": str(e)}
    finally:
        if os.path.exists(temp_path): os.remove(temp_path)

def start_api():
    nest_asyncio.apply()
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="error")

threading.Thread(target=start_api, daemon=True).start()
time.sleep(3)
print("üöÄ OCR Server is LIVE.")

# ---------------------------------------------------------
# 4. RUNNER
# ---------------------------------------------------------
def run_ocr_loop():
    print("\n" + "="*40 + "\nüîÑ INVOICE ANALYZER READY\n" + "="*40)
    try:
        while True:
            uploaded = files.upload()
            if not uploaded: break
            for filename in uploaded.keys():
                with open(filename, "rb") as f:
                    response = requests.post("http://127.0.0.1:8000/extract", files={"file": (filename, f)})
                if response.status_code == 200:
                    data = response.json()
                    print(f"\n‚úÖ {filename}:")
                    print(f"   üè¢ Vendor: {data.get('vendor')}")
                    print(f"   üìÖ Date:   {data.get('date')}")
                    print(f"   üí∞ Total:  ‚Çπ{data.get('total')}")
                if os.path.exists(filename): os.remove(filename)
    except KeyboardInterrupt:
        print("\nüõë Stopped.")

run_ocr_loop()

‚öôÔ∏è Initializing environment...
üöÄ OCR Server is LIVE.

üîÑ INVOICE ANALYZER READY


Saving M-invoice-1.pdf to M-invoice-1.pdf

‚úÖ M-invoice-1.pdf:
   üè¢ Vendor: Puma Sports India Pvt. Ltd
   üìÖ Date:   2025-07-09
   üí∞ Total:  ‚Çπ299.0


Saving invoice-2.pdf to invoice-2.pdf

‚úÖ invoice-2.pdf:
   üè¢ Vendor: KAY KAY OVERSEAS CORPORATION
   üìÖ Date:   2023-10-21
   üí∞ Total:  ‚Çπ243.92


Saving invoice-1.pdf to invoice-1.pdf

‚úÖ invoice-1.pdf:
   üè¢ Vendor: Appario Retail Private Ltd
   üìÖ Date:   2023-04-12
   üí∞ Total:  ‚Çπ259.16



üõë Stopped.


In [1]:
import os, re, cv2, sys, asyncio, subprocess, threading, time, signal
import numpy as np
import pytesseract
import dateparser
import nest_asyncio
import uvicorn
import requests
from PIL import Image, ImageOps
from pdf2image import convert_from_path
from fastapi import FastAPI, File, UploadFile
from google.colab import files

# ---------------------------------------------------------
# 1. SYSTEM INITIALIZATION
# ---------------------------------------------------------
def initialize_system():
    result = subprocess.run(["lsof", "-t", "-i:8000"], capture_output=True, text=True)
    if result.stdout:
        for pid in result.stdout.strip().split("\n"):
            try: os.kill(int(pid), signal.SIGKILL)
            except: pass

    subprocess.run(["apt-get", "update"], capture_output=True)
    subprocess.run(["apt-get", "install", "-y", "tesseract-ocr", "poppler-utils"], capture_output=True)
    subprocess.run([sys.executable, "-m", "pip", "install", "pytesseract", "pdf2image",
                    "fastapi", "uvicorn", "python-multipart", "nest_asyncio",
                    "opencv-python-headless", "dateparser"], capture_output=True)

print("‚öôÔ∏è Initializing environment...")
initialize_system()

# ---------------------------------------------------------
# 2. TARGETED EXTRACTION ENGINE
# ---------------------------------------------------------
class FinanceEngine:
    def __init__(self, text):
        self.text = text
        # Clean up common OCR noise that breaks regex
        self.clean_text = re.sub(r'[|\\/_]+', ' ', text)

    def extract(self):
        # 1. Vendor: Target 'Sold By' or 'Bill From' and take the first real line
        vendor = "Unknown"
        vendor_match = re.search(r'(?:Sold By|Bill From)[:\s]+(.*?)(?:\n|GSTIN|Plot|Address)', self.text, re.IGNORECASE | re.DOTALL)
        if vendor_match:
            lines = [l.strip() for l in vendor_match.group(1).split('\n') if len(l.strip()) > 3]
            if lines: vendor = lines[0]

        # 2. Date: Improved to handle dots, slashes, and text months
        date_val = "None"
        date_pattern = r'(?:Invoice|Order)\s*Date\s*[:\.]?\s*(\d{1,2}[.\-/\s](?:\d{1,2}|[A-Za-z]{3,})\s*[.\-/\s]\d{2,4})'
        date_match = re.search(date_pattern, self.text, re.IGNORECASE)
        if date_match:
            parsed = dateparser.parse(date_match.group(1).replace('.', '-'))
            if parsed: date_val = parsed.strftime("%Y-%m-%d")

        # 3. Total Amount: Targeting the "Grand Total" at the bottom-right
        total = 0.0

        # Strategy A: Find the word "TOTAL" and get the last currency value in that block
        # This prevents picking up tax percentages or sub-totals
        total_rows = re.findall(r'TOTAL.*', self.text, re.IGNORECASE)
        if total_rows:
            # Look at the very last row containing 'TOTAL'
            last_total_row = total_rows[-1]
            amounts = re.findall(r'(\d{1,3}(?:,\d{3})*(?:\.\d{2}))', last_total_row)
            if amounts:
                # The right-most value in the total column is always the last match
                total = float(amounts[-1].replace(',', ''))

        # Strategy B: If Strategy A results in 0.0, look for "Amount in Words" as an anchor
        if total <= 0.0:
            words_anchor = re.search(r'(\d{1,3}(?:,\d{3})*(?:\.\d{2}))\s*(?:\n|Amount in Words)', self.text, re.IGNORECASE)
            if words_anchor:
                total = float(words_anchor.group(1).replace(',', ''))

        # Strategy C: Final Fallback - find the absolute highest value in the entire document
        if total <= 0.0:
            all_amounts = re.findall(r'(\d{1,3}(?:,\d{3})*(?:\.\d{2}))', self.text)
            if all_amounts:
                total = max([float(x.replace(',', '')) for x in all_amounts])

        return {"vendor": vendor, "date": date_val, "total": total}

# ---------------------------------------------------------
# 3. BACKGROUND API
# ---------------------------------------------------------
app = FastAPI()

@app.post("/extract")
async def extract_api(file: UploadFile = File(...)):
    temp_path = f"process_{file.filename}"
    try:
        content = await file.read()
        with open(temp_path, "wb") as f:
            f.write(content)

        images = convert_from_path(temp_path) if temp_path.endswith(".pdf") else [Image.open(temp_path)]
        full_text = ""
        for img in images:
            # Grayscale and Contrast boost helps distinguish the numbers in the table
            img = ImageOps.grayscale(img)
            img = ImageOps.autocontrast(img)
            full_text += pytesseract.image_to_string(img, config='--psm 3') + "\n"

        return FinanceEngine(full_text).extract()
    except Exception as e:
        return {"error": str(e)}
    finally:
        if os.path.exists(temp_path): os.remove(temp_path)

def start_api():
    nest_asyncio.apply()
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="error")

threading.Thread(target=start_api, daemon=True).start()
time.sleep(3)
print("üöÄ OCR Server is LIVE.")

# ---------------------------------------------------------
# 4. RUNNER
# ---------------------------------------------------------
def run_ocr_loop():
    print("\n" + "="*40 + "\nüîÑ INVOICE ANALYZER (VERSION 2.0)\n" + "="*40)
    try:
        while True:
            uploaded = files.upload()
            if not uploaded: break
            for filename in uploaded.keys():
                with open(filename, "rb") as f:
                    response = requests.post("http://127.0.0.1:8000/extract", files={"file": (filename, f)})
                if response.status_code == 200:
                    data = response.json()
                    print(f"\n‚úÖ {filename}:")
                    print(f"   üè¢ Vendor: {data.get('vendor')}")
                    print(f"   üìÖ Date:   {data.get('date')}")
                    print(f"   üí∞ Total:  ‚Çπ{data.get('total')}")
                if os.path.exists(filename): os.remove(filename)
    except KeyboardInterrupt:
        print("\nüõë Stopped.")

run_ocr_loop()

‚öôÔ∏è Initializing environment...
üöÄ OCR Server is LIVE.

üîÑ INVOICE ANALYZER (VERSION 2.0)


Saving invoice-2.pdf to invoice-2.pdf

‚úÖ invoice-2.pdf:
   üè¢ Vendor: KAY KAY OVERSEAS CORPORATION
   üìÖ Date:   2023-10-21
   üí∞ Total:  ‚Çπ21599.0



üõë Stopped.
