In [1]:
from openai import OpenAI
import fitz  # PyMuPDF
import io
import os
from PIL import Image
import base64
import json
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=api_key)

***Convert Multi-page PDF to Base64-encoded Images***

In [3]:
@staticmethod
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")


def pdf_to_base64_images(pdf_path):
    #Handles PDFs with multiple pages
    pdf_document = fitz.open(pdf_path)
    base64_images = []
    temp_image_paths = []

    total_pages = len(pdf_document)

    for page_num in range(total_pages):
        page = pdf_document.load_page(page_num)
        pix = page.get_pixmap()
        img = Image.open(io.BytesIO(pix.tobytes()))
        temp_image_path = f"temp_page_{page_num}.png"
        img.save(temp_image_path, format="PNG")
        temp_image_paths.append(temp_image_path)
        base64_image = encode_image(temp_image_path)
        base64_images.append(base64_image)

    for temp_image_path in temp_image_paths:
        os.remove(temp_image_path)

    return base64_images

***Extract Structured Invoice Data from Base64 Image Using GPT-4 Vision***

In [4]:
def extract_invoice_data(base64_image):
    system_prompt = f"""
    You are an OCR-like data extraction tool that extracts retail invoice data from PDFs.
   
Please extract the data in this invoice, grouping it according to logical themes or subgroups such as store information, transaction details, itemized purchases, totals, and payment information. Output the result in a well-structured JSON format.
Please keep the keys and values of the JSON in the original language, including any non-English characters.
The type of data you might encounter in the invoice includes but is not limited to: store name and location, receipt date and time, cashier/salesperson, register info, list of items (with quantity, item name, price, amount), subtotal, tax, total, payment method, and any loyalty/survey messages.
If the page contains no charge data, please output an empty JSON object and do not fabricate or infer any data.
If any fields are blank or missing in the invoice, include them in the JSON as "null" values.
If there is a table-like list of purchased items, capture each row and all columns (e.g., item name, quantity, unit price, total amount) in the JSON format.
Maintain the structure of the itemized list. If a field is blank in any row, represent it as "null" in that row.
Do not interpolate or make up any data not explicitly present in the invoice.
Please ensure that item descriptions, formatting (like size or color), and special characters are preserved exactly as in the original invoice text.


    """
    
    response = client.chat.completions.create(
        model="gpt-4o",
        response_format={ "type": "json_object" },
        messages=[
            {
                "role": "system",
                "content": system_prompt
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "extract the data in this hotel invoice and output into JSON "},
                    {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}", "detail": "high"}}
                ]
            }
        ],
        temperature=0.0,
    )
    return response.choices[0].message.content

***Extract Invoice Data from Multi-page PDFs and Save as JSON***

In [5]:
def extract_from_multiple_pages(base64_images, original_filename, output_directory):
    entire_invoice = []

    for base64_image in base64_images:
        invoice_json = extract_invoice_data(base64_image)
        invoice_data = json.loads(invoice_json)
        entire_invoice.append(invoice_data)

    # Ensure the output directory exists
    os.makedirs(output_directory, exist_ok=True)

    # Construct the output file path
    output_filename = os.path.join(output_directory, original_filename.replace('.pdf', '_extracted.json'))
    
    # Save the entire_invoice list as a JSON file
    with open(output_filename, 'w', encoding='utf-8') as f:
        json.dump(entire_invoice, f, ensure_ascii=False, indent=4)
    return output_filename


def main_extract(read_path, write_path):
    for filename in os.listdir(read_path):
        file_path = os.path.join(read_path, filename)
        if os.path.isfile(file_path):
            base64_images = pdf_to_base64_images(file_path)
            extract_from_multiple_pages(base64_images, filename, write_path)


read_path= os.getenv("read_path")    # Provide the path where the pdfs are stored.
write_path= os.getenv("write_path")  # provide the path where you want to store the output in JSON

main_extract(read_path, write_path)

***Transform Extracted Invoice JSONs to a Standard Schema***

In [7]:
	
def transform_invoice_data(json_raw, json_schema):
    system_prompt = f"""
    You are a data transformation tool that takes in JSON data and a reference JSON schema, and outputs JSON data according to the schema.
    Not all of the data in the input JSON will fit the schema, so you may need to omit some data or add null values to the output JSON.
    Translate all data into English if not already in English.
    Ensure values are formatted as specified in the schema (e.g. dates as YYYY-MM-DD).
    Here is the schema:
    {json_schema}

    """
    
    response = client.chat.completions.create(
        model="gpt-4o",
        response_format={ "type": "json_object" },
        messages=[
            {
                "role": "system",
                "content": system_prompt
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": f"Transform the following raw JSON data according to the provided schema. Ensure all data is in English and formatted as specified by values in the schema. Here is the raw JSON: {json_raw}"}
                ]
            }
        ],
        temperature=0.0,
    )
    return json.loads(response.choices[0].message.content)



def main_transform(extracted_invoice_json_path, json_schema_path, save_path):
    # Load the JSON schema
    with open(json_schema_path, 'r', encoding='utf-8') as f:
        json_schema = json.load(f)

    # Ensure the save directory exists
    os.makedirs(save_path, exist_ok=True)

    # Process each JSON file in the extracted invoices directory
    for filename in os.listdir(extracted_invoice_json_path):
        if filename.endswith(".json"):
            file_path = os.path.join(extracted_invoice_json_path, filename)

            # Load the extracted JSON
            with open(file_path, 'r', encoding='utf-8') as f:
                json_raw = json.load(f)

            # Transform the JSON data
            transformed_json = transform_invoice_data(json_raw, json_schema)

            # Save the transformed JSON to the save directory
            transformed_filename = f"transformed_{filename}"
            transformed_file_path = os.path.join(save_path, transformed_filename)
            with open(transformed_file_path, 'w', encoding='utf-8') as f:
                json.dump(transformed_json, f, ensure_ascii=False, indent=2)

   
extracted_invoice_json_path = os.getenv("extracted_invoice_json_path")
json_schema_path = os.getenv("json_schema_path")
save_path = os.getenv("save_path")

main_transform(extracted_invoice_json_path, json_schema_path, save_path)

****Store data into Database and than Query on it***

In [8]:
import sqlite3


In [9]:
def ingest_transformed_jsons(json_folder_path, db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS STOREINFO (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        store_number INTEGER,
        store_name TEXT ,
        address TEXT,
        phone TEXT,
        salesperson TEXT ,
        item TEXT,
        price REAL 
    )
    ''')

    for filename in os.listdir(json_folder_path):
        if filename.endswith(".json"):
            file_path = os.path.join(json_folder_path, filename)

            # Load the JSON data
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            store_info = data["store_information"]
            transaction = data["transaction_details"]
            items = data["itemized_purchases"]

            for item in items:
                cursor.execute('''
                INSERT INTO STOREINFO (store_number, store_name, address, phone, salesperson, item, price) 
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ''', (
                    store_info.get("store_number"),
                    store_info.get("name"),
                    store_info.get("address"),
                    store_info.get("phone"),
                    transaction.get("salesperson"),
                    item.get("item"),
                    item.get("price")
                ))
                store_number = cursor.lastrowid

    conn.commit()
    conn.close()


In [10]:
def execute_query(db_path, query, params=()):
    """
    Execute a SQL query and return the results.

    Parameters:
    db_path (str): Path to the SQLite database file.
    query (str): SQL query to be executed.
    params (tuple): Parameters to be passed to the query (default is an empty tuple).

    Returns:
    list: List of rows returned by the query.
    """
    try:
        # Connect to the SQLite database
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        # Execute the query with parameters
        cursor.execute(query, params)
        results = cursor.fetchall()

        # Commit if it's an INSERT/UPDATE/DELETE query
        if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')):
            conn.commit()

        return results
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
        return []
    finally:
        # Close the connection
        if conn:
            conn.close()



In [11]:
transformed_invoices_path = os.getenv("transformed_invoices_path")
db_path = os.getenv("db_path")
ingest_transformed_jsons(transformed_invoices_path, db_path)

query = '''
    SELECT 
        *
    FROM 
        STOREINFO
    '''

results = execute_query(db_path, query)
for row in results:
    print(row)

(1, 104, 'Coca-Cola Store', '3785 Las Vegas Blvd. S. Las Vegas, NV 89109', '800-810-2653', 'PHILIP', 'Mn Santa Socks 3pk', 12.95)
(2, 104, 'Coca-Cola Store', '3785 Las Vegas Blvd. S. Las Vegas, NV 89109', '800-810-2653', 'PHILIP', 'Mn Stripe W-Script Socks', 7.95)
(3, 104, 'Coca-Cola Store', '3785 Las Vegas Blvd. S. Las Vegas, NV 89109', '800-810-2653', 'PHILIP', 'SYM Sub Script Tee-ref', 24.95)
(4, 104, 'Coca-Cola Store', '3785 Las Vegas Blvd. S. Las Vegas, NV 89109', '800-810-2653', 'PHILIP', 'C.Geyser Alp Spring', 16.99)
(5, 104, 'Coca-Cola Store', '3785 Las Vegas Blvd. S. Las Vegas, NV 89109', '800-810-2653', 'PHILIP', 'CA Redemption Value', 0.1)
(6, 104, 'Coca-Cola Store', '3785 Las Vegas Blvd. S. Las Vegas, NV 89109', '800-810-2653', 'PHILIP', 'ITO EN Oi Ocha Green Tea', 16.99)
(7, 104, 'Coca-Cola Store', '3785 Las Vegas Blvd. S. Las Vegas, NV 89109', '800-810-2653', 'PHILIP', 'Fresh 2 Go Pita Hm Chs', 74.05)


*** Show Output in Gradio ***

In [14]:
import pandas as pd
import gradio as gr
import sqlite3

# Function to fetch data from the database
def fetch_invoice_data(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute("SELECT * FROM STOREINFO")
    rows = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]

    conn.close()
    return pd.DataFrame(rows, columns=columns)

# DB path
db_path = os.getenv("db_path")


def display_data():
    return fetch_invoice_data(db_path)

# Gradio Interface
with gr.Blocks(title="📋 Invoice Viewer") as demo:
    gr.Markdown("## 🧾 Extracted Invoice Data")
    gr.Markdown("Easily explore your transformed receipt data from the SQLite database.")

    with gr.Row():
        with gr.Column():
            gr.Markdown("🔍 Click below to refresh and view the latest invoices:")
            refresh_btn = gr.Button("🔄 Refresh Invoices")

    output_table = gr.Dataframe(
        label="Invoice Records",
        interactive=False,
        wrap=True
    )

    refresh_btn.click(fn=display_data, inputs=[], outputs=output_table)

demo.launch()


  from .autonotebook import tqdm as notebook_tqdm


* Running on local URL:  http://127.0.0.1:7860
* To create a public link, set `share=True` in `launch()`.


