In [2]:
pip install pdf2image pillow img2table pandas openpyxl pytesseract requests


Note: you may need to restart the kernel to use updated packages.


In [20]:
import os
import json
import re
import pytesseract
from pdf2image import convert_from_path
import pandas as pd
import requests

API_KEY = "AIzaSyBYDxv5LCDXiWSU0uoANX1UmlNpF8WGKBs"  # Replace with your actual API key

def pdf_to_images(pdf_path):
    print("Converting PDF to images...")
    return convert_from_path(pdf_path)

def ocr_image_to_text(image):
    return pytesseract.image_to_string(image)

def call_gemini_api(prompt_text, api_key):
    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}"
    headers = {
        "Content-Type": "application/json"
    }
    data = {
        "contents": [
            {
                "parts": [
                    {"text": prompt_text}
                ]
            }
        ]
    }
    response = requests.post(url, json=data, headers=headers)
    response.raise_for_status()
    resp_json = response.json()
    # Debug print full response (optional)
    print("Full API response:")
    print(json.dumps(resp_json, indent=2))
    return resp_json["candidates"][0]["content"]["parts"][0]["text"]

def clean_response_text(text):
    """
    Remove ```json and ``` markdown code block wrappers from the text.
    """
    text = re.sub(r"^```json\s*", "", text.strip())
    text = re.sub(r"\s*```$", "", text.strip())
    return text.strip()

def extract_general_info(text):
    prompt = f"""
Extract invoice_number, invoice_date, supplier_gst_number, bill_to_gst_number, po_number, shipping_address from this text:

{text}

Return JSON with keys and values only.
"""
    response = call_gemini_api(prompt, API_KEY)
    cleaned = clean_response_text(response)
    try:
        return json.loads(cleaned)
    except json.JSONDecodeError:
        print("Failed to parse JSON from general info response:")
        print(cleaned)
        return {}

def extract_table_info(table_text):
    prompt = f"""
Extract all line items from this invoice table text and return them as a JSON list.
Each item must include:
- serial_number
- description
- hsn_sac
- quantity
- unit_price
- total_amount

Table Text:
{table_text}

Return only the JSON array.
"""

    response = call_gemini_api(prompt, API_KEY)
    cleaned = clean_response_text(response)

    # Attempt to parse the JSON from Gemini
    try:
        if "```" in cleaned:
            cleaned = re.sub(r"```(?:json)?\s*", "", cleaned)
            cleaned = cleaned.replace("```", "")
        items = json.loads(cleaned)
    except json.JSONDecodeError:
        print("❌ JSON parsing failed in Gemini response.")
        return []

    # Fallback: Inject correct HSN & quantity using regex from table_text
    hsn_matches = re.findall(r'HSN\s*(?:NO\.?)?\s*[:\-]?\s*([\d\s]+)', table_text, re.IGNORECASE)
    qty_matches = re.findall(r'QTY\s*[:\-]?\s*([\d\s]+)', table_text, re.IGNORECASE)

    hsn_list = re.findall(r'\b\d{4}\b', ' '.join(hsn_matches))
    qty_list = re.findall(r'\b\d+\b', ' '.join(qty_matches))

    # Fill in the corrected values
    for idx, item in enumerate(items):
        item['serial_number'] = idx + 1
        item['description'] = item.get('description', f'Item {idx + 1}')

        # Inject HSN/SAC if available
        if idx < len(hsn_list):
            item['hsn_sac'] = hsn_list[idx]
        else:
            item['hsn_sac'] = None

        # Inject Quantity if available
        if idx < len(qty_list):
            item['quantity'] = int(qty_list[idx])
        else:
            item['quantity'] = None

        # Clean numeric formats
        item['unit_price'] = float(str(item.get('unit_price')).replace("Rs.", "").replace(",", "").strip()) if item.get('unit_price') else None
        item['total_amount'] = float(str(item.get('total_amount')).replace("Rs.", "").replace(",", "").strip()) if item.get('total_amount') else None

    return items



def save_outputs(general_info, table_items):
    os.makedirs("output", exist_ok=True)

    # save JSON
    out_json = {
        "general_info": general_info,
        "line_items": table_items
    }
    with open("output/extracted_data.json", "w") as f:
        json.dump(out_json, f, indent=2)

    # save excel if table items exist
    if table_items:
        df = pd.DataFrame(table_items)
        df.to_excel("output/extracted_data.xlsx", index=False)
        print("Table items saved to 'output/extracted_data.xlsx'")
    else:
        print("No table items to save to Excel.")

def main():
    input_pdf = "input/1WhatsApp Image.pdf"

    if not os.path.exists(input_pdf):
        print(f"Input file not found: {input_pdf}")
        return

    images = pdf_to_images(input_pdf)
    full_text = ""
    print("Performing OCR on images...")
    for img in images:
        full_text += ocr_image_to_text(img) + "\n"

    # For simplicity, use full_text for both general and table extraction
    general_text = full_text
    table_text = full_text

    print("Extracting general invoice info...")
    general_info = extract_general_info(general_text)

    print("Extracting table line items...")
    table_items = extract_table_info(table_text)

    save_outputs(general_info, table_items)
    print("Extraction completed. Outputs saved to 'output/' directory.")

if __name__ == "__main__":
    main()


Converting PDF to images...
Performing OCR on images...
Extracting general invoice info...
Full API response:
{
  "candidates": [
    {
      "content": {
        "parts": [
          {
            "text": "```json\n{\n    \"invoice_number\": \"654654\",\n    \"invoice_date\": \"08-03-2021\",\n    \"supplier_gst_number\": \"898989898989\",\n    \"bill_to_gst_number\": \"6969696969696969\",\n    \"po_number\": null,\n    \"shipping_address\": \"Sector-200, Noida, U.P\\nUttar Pradesh\"\n}\n```"
          }
        ],
        "role": "model"
      },
      "finishReason": "STOP",
      "avgLogprobs": -0.034273386001586914
    }
  ],
  "usageMetadata": {
    "promptTokenCount": 482,
    "candidatesTokenCount": 128,
    "totalTokenCount": 610,
    "promptTokensDetails": [
      {
        "modality": "TEXT",
        "tokenCount": 482
      }
    ],
    "candidatesTokensDetails": [
      {
        "modality": "TEXT",
        "tokenCount": 128
      }
    ]
  },
  "modelVersion": "gemini-2.0-f

In [21]:
import re

def extract_general_fields(text):
    fields = {}

    # Patterns
    fields['invoice_number'] = re.search(r'Invoice\s*Number[:\-]?\s*(\S+)', text, re.IGNORECASE)
    fields['invoice_date'] = re.search(r'Date[:\-]?\s*(\d{1,2}[\/\.-]\d{1,2}[\/\.-]\d{2,4})', text, re.IGNORECASE)
    fields['supplier_gst_number'] = re.search(r'Supplier.*?GST.*?[:\-]?\s*([0-9A-Z]{15})', text, re.IGNORECASE)
    fields['bill_to_gst_number'] = re.search(r'Bill.*?GST.*?[:\-]?\s*([0-9A-Z]{15})', text, re.IGNORECASE)
    fields['po_number'] = re.search(r'PO\s*(Number)?[:\-]?\s*(\S+)', text, re.IGNORECASE)
    fields['shipping_address'] = re.search(r'Shipping\s*Address[:\-]?\s*(.*)', text, re.IGNORECASE)

    # Extract match text or None
    for key, val in fields.items():
        if val:
            fields[key] = val.group(1).strip()
        else:
            fields[key] = None

    return fields


In [22]:
import cv2
import numpy as np
import os

def detect_seal_signature(image_path, output_dir='output'):
    img = cv2.imread(image_path, cv2.IMREAD_COLOR)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Threshold to highlight dark signatures/stamps
    _, thresh = cv2.threshold(gray, 100, 255, cv2.THRESH_BINARY_INV)

    # Find contours
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    seal_detected = False
    for i, cnt in enumerate(contours):
        x, y, w, h = cv2.boundingRect(cnt)
        if w > 80 and h > 80:  # Filter out small blobs
            seal_detected = True
            seal_img = img[y:y+h, x:x+w]
            seal_path = os.path.join(output_dir, f'seal_signature_{i+1}.png')
            cv2.imwrite(seal_path, seal_img)

    return seal_detected


In [23]:
from pytesseract import image_to_string, image_to_data

def extract_table_from_image(image_path):
    text = image_to_string(image_path, config='--psm 6')
    lines = text.split('\n')

    items = []
    for line in lines:
        # Match rows like: 1 ITEM NAME 2 2541 26 235.52 6123.52
        match = re.match(r'(\d+)\s+(.+?)\s+(\d{4})\s+(\d+)\s+Rs?.?\s*([\d,]+\.\d+)\s+Rs?.?\s*([\d,]+\.\d+)', line)
        if match:
            items.append({
                'serial_number': int(match.group(1)),
                'description': match.group(2).strip(),
                'hsn_sac': match.group(3),
                'quantity': int(match.group(4)),
                'unit_price': float(match.group(5).replace(",", "")),
                'total_amount': float(match.group(6).replace(",", "")),
            })
    return items


In [24]:
import pandas as pd

def save_to_excel(general_info, table_items, output_dir='output'):
    # Save general info
    general_df = pd.DataFrame([general_info])
    general_df.to_excel(os.path.join(output_dir, "general_info.xlsx"), index=False)

    # Save line items
    table_df = pd.DataFrame(table_items)
    table_df.to_excel(os.path.join(output_dir, "line_items.xlsx"), index=False)


In [27]:
pip install word2number


Collecting word2number
  Downloading word2number-1.1.zip (9.7 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: word2number
  Building wheel for word2number (setup.py): started
  Building wheel for word2number (setup.py): finished with status 'done'
  Created wheel for word2number: filename=word2number-1.1-py3-none-any.whl size=5601 sha256=e5e6f4b370f5fa6ad262f74220184863e9d9b421f89bf6ffe4dd5cc2cd246b5b
  Stored in directory: c:\users\csmuk\appdata\local\pip\cache\wheels\84\ff\26\d3cfbd971e96c5aa3737ecfced81628830d7359b55fbb8ca3b
Successfully built word2number
Installing collected packages: word2number
Successfully installed word2number-1.1
Note: you may need to restart the kernel to use updated packages.


  DEPRECATION: Building 'word2number' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'word2number'. Discussion can be found at https://github.com/pypa/pip/issues/6334


In [26]:
import os
import json
import re
import pytesseract
from pdf2image import convert_from_path
import pandas as pd
from PIL import Image
import cv2
import numpy as np
import requests

API_KEY = "AIzaSyBYDxv5LCDXiWSU0uoANX1UmlNpF8WGKBs"  # Replace with your actual Gemini API key

def pdf_to_images(pdf_path):
    print("Converting PDF to images...")
    return convert_from_path(pdf_path)

def ocr_image_to_text(image):
    return pytesseract.image_to_string(image)

def detect_signature_or_seal(image, output_folder="output"):
    gray = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2GRAY)
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)
    edged = cv2.Canny(blurred, 50, 200)
    contours, _ = cv2.findContours(edged, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    for i, cnt in enumerate(contours):
        x, y, w, h = cv2.boundingRect(cnt)
        aspect_ratio = w / float(h)
        if 1.5 < aspect_ratio < 6 and w > 100 and h > 30:
            cropped = image.crop((x, y, x + w, y + h))
            os.makedirs(output_folder, exist_ok=True)
            filename = os.path.join(output_folder, f"seal_or_signature.png")
            cropped.save(filename)
            return True, filename
    return False, None

def call_gemini_api(prompt_text, api_key):
    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}"
    headers = { "Content-Type": "application/json" }
    data = {
        "contents": [{ "parts": [{ "text": prompt_text }] }]
    }
    response = requests.post(url, json=data, headers=headers)
    response.raise_for_status()
    return response.json()["candidates"][0]["content"]["parts"][0]["text"]

def clean_response_text(text):
    text = re.sub(r"^```json\s*", "", text.strip())
    text = re.sub(r"\s*```$", "", text.strip())
    return text.strip()

def extract_general_info(text, seal_present):
    prompt = f"""
Extract the following fields from the text:
- invoice_number
- invoice_date
- supplier_gst_number
- bill_to_gst_number
- po_number
- shipping_address

Text:
{text}

Return a JSON with these keys.
Seal/Signature Present: {seal_present}
"""
    response = call_gemini_api(prompt, API_KEY)
    try:
        return json.loads(clean_response_text(response))
    except:
        print("Error parsing JSON in general info.")
        return {}

def extract_table_info(text):
    prompt = f"""
Extract line items from this invoice text and return as a JSON array. Each item must have:
- serial_number
- description
- hsn_sac (4-digit code)
- quantity
- unit_price (number only)
- total_amount (number only)

Text:
{text}

Return only the JSON array.
"""
    response = call_gemini_api(prompt, API_KEY)
    cleaned = clean_response_text(response)
    try:
        items = json.loads(cleaned)
    except:
        print("Gemini JSON parse error")
        return []

    for idx, item in enumerate(items):
        item["serial_number"] = idx + 1
        item["description"] = item.get("description", f"Item {idx+1}")
        item["hsn_sac"] = re.search(r"\b\d{4}\b", str(item.get("hsn_sac", ""))).group(0) if re.search(r"\b\d{4}\b", str(item.get("hsn_sac", ""))) else None
        item["quantity"] = int(re.search(r"\d+", str(item.get("quantity", "0"))).group(0)) if re.search(r"\d+", str(item.get("quantity", ""))) else None
        item["unit_price"] = float(str(item.get("unit_price", "0")).replace("Rs.", "").replace(",", "").strip())
        item["total_amount"] = float(str(item.get("total_amount", "0")).replace("Rs.", "").replace(",", "").strip())
    return items

def save_outputs(general_info, table_items, seal_present, seal_path=None):
    os.makedirs("output", exist_ok=True)
    out_json = {
        "general_info": general_info,
        "seal_and_sign_present": seal_present,
        "seal_image": seal_path if seal_present else None,
        "line_items": table_items
    }
    with open("output/extracted_data.json", "w") as f:
        json.dump(out_json, f, indent=2)

    if table_items:
        df = pd.DataFrame(table_items)
        df.to_excel("output/extracted_data.xlsx", index=False)

def main():
    input_pdf = "input/1WhatsApp Image.pdf"
    if not os.path.exists(input_pdf):
        print("PDF file not found.")
        return

    images = pdf_to_images(input_pdf)
    full_text = ""
    seal_present = False
    seal_path = None

    for img in images:
        full_text += ocr_image_to_text(img) + "\n"
        present, path = detect_signature_or_seal(img)
        if present:
            seal_present = True
            seal_path = path

    general_info = extract_general_info(full_text, seal_present)
    table_info = extract_table_info(full_text)
    save_outputs(general_info, table_info, seal_present, seal_path)

    print("✅ Extraction complete. Results saved in the 'output/' folder.")

if __name__ == "__main__":
    main()


Converting PDF to images...
✅ Extraction complete. Results saved in the 'output/' folder.
