<a href="https://colab.research.google.com/github/elichen/oai-playground/blob/main/Data_Extraction_and_Transformation_in_ELT_Workflows_using_GPT_4o_as_an_OCR_Alternative.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%pip install openai pymupdf --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m328.5/328.5 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m37.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.7/15.7 MB[0m [31m20.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.9/77.9 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.3/58.3 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from google.colab import userdata
from openai import OpenAI
client = OpenAI(api_key=userdata.get('openai-key'))

In [3]:
import os
import requests
def download_file(url, save_dir="."):
    os.makedirs(save_dir, exist_ok=True)
    file_name = os.path.basename(url)
    try:
        with open(os.path.join(save_dir, file_name), 'wb') as f:
            f.write(requests.get(url).content)
        print(f"Downloaded {file_name}")
    except:
        print(f"Failed to download {file_name}")

In [4]:
url = "https://github.com/openai/openai-cookbook/raw/main/examples/data/hotel_invoices/receipts_2019_de_hotel"

In [9]:
download_file("https://github.com/openai/openai-cookbook/raw/main/examples/data/hotel_invoices/invoice_schema.json")

Downloaded invoice_schema.json


In [5]:
from urllib.parse import urljoin
def download_github_folder(repo_owner, repo_name, folder_path, save_dir="."):
    api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{folder_path}"

    response = requests.get(api_url)
    if response.status_code != 200:
        print(f"Failed to fetch folder contents. Status code: {response.status_code}")
        return

    contents = response.json()

    for item in contents:
        if item['type'] == 'file':
            download_url = item['download_url']
            download_file(download_url, save_dir)
        elif item['type'] == 'dir':
            sub_folder = os.path.join(save_dir, item['name'])
            download_github_folder(repo_owner, repo_name, item['path'], sub_folder)

repo_owner = "openai"
repo_name = "openai-cookbook"
folder_path = "examples/data/hotel_invoices/receipts_2019_de_hotel"
save_dir = "receipts_2019_de_hotel"

download_github_folder(repo_owner, repo_name, folder_path, save_dir)

Downloaded 20190119_002.pdf
Downloaded 20190202_THE%20MADISON%20HAMBURG.pdf
Downloaded 20190202_THE%20MADISON%20HAMBURG_001.pdf
Downloaded citadines-20190331_Invoice.pdf
Downloaded citadines_08372561.pdf
Downloaded hampton-25789.pdf
Downloaded hampton_20190411.pdf
Downloaded hampton_24361.pdf
Downloaded hampton_28646.pdf
Downloaded madison-489347.pdf
Downloaded madison-490057.pdf
Downloaded madison-490969.pdf
Downloaded madison-492602.pdf
Downloaded madison-493304.pdf
Downloaded madison-496987.pdf
Downloaded madison-502875.pdf
Downloaded madison-5036666.pdf
Downloaded madison_497810.pdf
Downloaded madison_folio_g_cp_efolio5895702.pdf
Downloaded madison_folio_g_cp_efolio5895707.pdf
Downloaded madison_folio_g_cp_efolio5945547.pdf
Downloaded madison_folio_g_cp_efolio5972171.pdf
Downloaded madison_folio_g_cp_efolio5976009.pdf
Downloaded madison_folio_g_cp_efolio5991896.pdf
Downloaded mercure-37816396.pdf
Downloaded motelone-524544306.pdf
Downloaded motelone_20191111.pdf
Downloaded motelone

In [6]:
from openai import OpenAI
import fitz  # PyMuPDF
import io
import os
from PIL import Image
import base64
import json

@staticmethod
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")


def pdf_to_base64_images(pdf_path):
    #Handles PDFs with multiple pages
    pdf_document = fitz.open(pdf_path)
    base64_images = []
    temp_image_paths = []

    total_pages = len(pdf_document)

    for page_num in range(total_pages):
        page = pdf_document.load_page(page_num)
        pix = page.get_pixmap()
        img = Image.open(io.BytesIO(pix.tobytes()))
        temp_image_path = f"temp_page_{page_num}.png"
        img.save(temp_image_path, format="PNG")
        temp_image_paths.append(temp_image_path)
        base64_image = encode_image(temp_image_path)
        base64_images.append(base64_image)

    for temp_image_path in temp_image_paths:
        os.remove(temp_image_path)

    return base64_images

In [7]:
def extract_invoice_data(base64_image):
    system_prompt = f"""
    You are an OCR-like data extraction tool that extracts hotel invoice data from PDFs.

    1. Please extract the data in this hotel invoice, grouping data according to theme/sub groups, and then output into JSON.

    2. Please keep the keys and values of the JSON in the original language.

    3. The type of data you might encounter in the invoice includes but is not limited to: hotel information, guest information, invoice information,
    room charges, taxes, and total charges etc.

    4. If the page contains no charge data, please output an empty JSON object and don't make up any data.

    5. If there are blank data fields in the invoice, please include them as "null" values in the JSON object.

    6. If there are tables in the invoice, capture all of the rows and columns in the JSON object.
    Even if a column is blank, include it as a key in the JSON object with a null value.

    7. If a row is blank denote missing fields with "null" values.

    8. Don't interpolate or make up data.

    9. Please maintain the table structure of the charges, i.e. capture all of the rows and columns in the JSON object.

    """

    response = client.chat.completions.create(
        model="gpt-4o",
        response_format={ "type": "json_object" },
        messages=[
            {
                "role": "system",
                "content": system_prompt
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "extract the data in this hotel invoice and output into JSON "},
                    {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}", "detail": "high"}}
                ]
            }
        ],
        temperature=0.0,
    )
    return response.choices[0].message.content


In [8]:
def extract_from_multiple_pages(base64_images, original_filename, output_directory):
    entire_invoice = []

    for base64_image in base64_images:
        invoice_json = extract_invoice_data(base64_image)
        invoice_data = json.loads(invoice_json)
        entire_invoice.append(invoice_data)

    # Ensure the output directory exists
    os.makedirs(output_directory, exist_ok=True)

    # Construct the output file path
    output_filename = os.path.join(output_directory, original_filename.replace('.pdf', '_extracted.json'))

    # Save the entire_invoice list as a JSON file
    with open(output_filename, 'w', encoding='utf-8') as f:
        json.dump(entire_invoice, f, ensure_ascii=False, indent=4)
    return output_filename


def main_extract(read_path, write_path):
    for filename in os.listdir(read_path):
        print(filename)
        file_path = os.path.join(read_path, filename)
        if os.path.isfile(file_path):
            base64_images = pdf_to_base64_images(file_path)
            extract_from_multiple_pages(base64_images, filename, write_path)


read_path= "./receipts_2019_de_hotel"
write_path= "./extracted_invoice_json"

main_extract(read_path, write_path)


madison_folio_g_cp_efolio5945547.pdf
madison_folio_g_cp_efolio5895707.pdf
citadines-20190331_Invoice.pdf
motelone_20191111.pdf
madison_folio_g_cp_efolio5895702.pdf
citadines_08372561.pdf
madison-490057.pdf
20190119_002.pdf
hampton_28646.pdf
20190202_THE%20MADISON%20HAMBURG.pdf
madison-492602.pdf
mercure-37816396.pdf
madison_folio_g_cp_efolio5976009.pdf
madison-496987.pdf
madison-490969.pdf
premierinn_GABCI19014325.pdf
madison_folio_g_cp_efolio5991896.pdf
motelone_20191118.pdf
moxy-20191221_007.pdf
20190202_THE%20MADISON%20HAMBURG_001.pdf
madison_497810.pdf
madison-493304.pdf
hampton_20190411.pdf
madison_folio_g_cp_efolio5972171.pdf
motelone-524544306.pdf
hampton_24361.pdf
madison-5036666.pdf
moxy_20191221_006.pdf
hampton-25789.pdf
madison-502875.pdf
madison-489347.pdf


In [11]:
def transform_invoice_data(json_raw, json_schema):
    system_prompt = f"""
    You are a data transformation tool that takes in JSON data and a reference JSON schema, and outputs JSON data according to the schema.
    Not all of the data in the input JSON will fit the schema, so you may need to omit some data or add null values to the output JSON.
    Translate all data into English if not already in English.
    Ensure values are formatted as specified in the schema (e.g. dates as YYYY-MM-DD).
    Here is the schema:
    {json_schema}

    """

    response = client.chat.completions.create(
        model="gpt-4o",
        response_format={ "type": "json_object" },
        messages=[
            {
                "role": "system",
                "content": system_prompt
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": f"Transform the following raw JSON data according to the provided schema. Ensure all data is in English and formatted as specified by values in the schema. Here is the raw JSON: {json_raw}"}
                ]
            }
        ],
        temperature=0.0,
    )
    return json.loads(response.choices[0].message.content)



def main_transform(extracted_invoice_json_path, json_schema_path, save_path):
    # Load the JSON schema
    with open(json_schema_path, 'r', encoding='utf-8') as f:
        json_schema = json.load(f)

    # Ensure the save directory exists
    os.makedirs(save_path, exist_ok=True)

    # Process each JSON file in the extracted invoices directory
    for filename in os.listdir(extracted_invoice_json_path):
        if filename.endswith(".json"):
            file_path = os.path.join(extracted_invoice_json_path, filename)

            # Load the extracted JSON
            with open(file_path, 'r', encoding='utf-8') as f:
                json_raw = json.load(f)

            # Transform the JSON data
            transformed_json = transform_invoice_data(json_raw, json_schema)

            # Save the transformed JSON to the save directory
            transformed_filename = f"transformed_{filename}"
            transformed_file_path = os.path.join(save_path, transformed_filename)
            with open(transformed_file_path, 'w', encoding='utf-8') as f:
                json.dump(transformed_json, f, ensure_ascii=False, indent=2)


extracted_invoice_json_path = "./extracted_invoice_json"
json_schema_path = "./invoice_schema.json"
save_path = "./transformed_invoice_json"

main_transform(extracted_invoice_json_path, json_schema_path, save_path)

In [12]:
import os
import json
import sqlite3

def ingest_transformed_jsons(json_folder_path, db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Create necessary tables
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS Hotels (
        hotel_id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        street TEXT,
        city TEXT,
        country TEXT,
        postal_code TEXT,
        phone TEXT,
        fax TEXT,
        email TEXT,
        website TEXT
    )
    ''')

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS Invoices (
        invoice_id INTEGER PRIMARY KEY AUTOINCREMENT,
        hotel_id INTEGER,
        invoice_number TEXT,
        reservation_number TEXT,
        date TEXT,
        room_number TEXT,
        check_in_date TEXT,
        check_out_date TEXT,
        currency TEXT,
        total_net REAL,
        total_tax REAL,
        total_gross REAL,
        total_charge REAL,
        total_credit REAL,
        balance_due REAL,
        guest_company TEXT,
        guest_address TEXT,
        guest_name TEXT,
        FOREIGN KEY(hotel_id) REFERENCES Hotels(hotel_id)
    )
    ''')

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS Charges (
        charge_id INTEGER PRIMARY KEY AUTOINCREMENT,
        invoice_id INTEGER,
        date TEXT,
        description TEXT,
        charge REAL,
        credit REAL,
        FOREIGN KEY(invoice_id) REFERENCES Invoices(invoice_id)
    )
    ''')

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS Taxes (
        tax_id INTEGER PRIMARY KEY AUTOINCREMENT,
        invoice_id INTEGER,
        tax_type TEXT,
        tax_rate TEXT,
        net_amount REAL,
        tax_amount REAL,
        gross_amount REAL,
        FOREIGN KEY(invoice_id) REFERENCES Invoices(invoice_id)
    )
    ''')

    # Loop over all JSON files in the specified folder
    for filename in os.listdir(json_folder_path):
        if filename.endswith(".json"):
            file_path = os.path.join(json_folder_path, filename)

            # Load the JSON data
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)

            # Insert Hotel Information
            cursor.execute('''
            INSERT INTO Hotels (name, street, city, country, postal_code, phone, fax, email, website)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                data["hotel_information"]["name"],
                data["hotel_information"]["address"]["street"],
                data["hotel_information"]["address"]["city"],
                data["hotel_information"]["address"]["country"],
                data["hotel_information"]["address"]["postal_code"],
                data["hotel_information"]["contact"]["phone"],
                data["hotel_information"]["contact"]["fax"],
                data["hotel_information"]["contact"]["email"],
                data["hotel_information"]["contact"]["website"]
            ))
            hotel_id = cursor.lastrowid

            # Insert Invoice Information
            cursor.execute('''
            INSERT INTO Invoices (hotel_id, invoice_number, reservation_number, date, room_number, check_in_date, check_out_date, currency, total_net, total_tax, total_gross, total_charge, total_credit, balance_due, guest_company, guest_address, guest_name)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                hotel_id,
                data["invoice_information"]["invoice_number"],
                data["invoice_information"]["reservation_number"],
                data["invoice_information"]["date"],
                data["invoice_information"]["room_number"],
                data["invoice_information"]["check_in_date"],
                data["invoice_information"]["check_out_date"],
                data["totals_summary"]["currency"],
                data["totals_summary"]["total_net"],
                data["totals_summary"]["total_tax"],
                data["totals_summary"]["total_gross"],
                data["totals_summary"]["total_charge"],
                data["totals_summary"]["total_credit"],
                data["totals_summary"]["balance_due"],
                data["guest_information"]["company"],
                data["guest_information"]["address"],
                data["guest_information"]["guest_name"]
            ))
            invoice_id = cursor.lastrowid

            # Insert Charges
            for charge in data["charges"]:
                cursor.execute('''
                INSERT INTO Charges (invoice_id, date, description, charge, credit)
                VALUES (?, ?, ?, ?, ?)
                ''', (
                    invoice_id,
                    charge["date"],
                    charge["description"],
                    charge["charge"],
                    charge["credit"]
                ))

            # Insert Taxes
            for tax in data["taxes"]:
                cursor.execute('''
                INSERT INTO Taxes (invoice_id, tax_type, tax_rate, net_amount, tax_amount, gross_amount)
                VALUES (?, ?, ?, ?, ?, ?)
                ''', (
                    invoice_id,
                    tax["tax_type"],
                    tax["tax_rate"],
                    tax["net_amount"],
                    tax["tax_amount"],
                    tax["gross_amount"]
                ))

    conn.commit()
    conn.close()



In [13]:

def execute_query(db_path, query, params=()):
    """
    Execute a SQL query and return the results.

    Parameters:
    db_path (str): Path to the SQLite database file.
    query (str): SQL query to be executed.
    params (tuple): Parameters to be passed to the query (default is an empty tuple).

    Returns:
    list: List of rows returned by the query.
    """
    try:
        # Connect to the SQLite database
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        # Execute the query with parameters
        cursor.execute(query, params)
        results = cursor.fetchall()

        # Commit if it's an INSERT/UPDATE/DELETE query
        if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')):
            conn.commit()

        return results
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
        return []
    finally:
        # Close the connection
        if conn:
            conn.close()


# Example usage
transformed_invoices_path = "./transformed_invoice_json"
db_path = "./hotel_DB.db"
ingest_transformed_jsons(transformed_invoices_path, db_path)

query = '''
    SELECT
        h.name AS hotel_name,
        i.total_gross AS max_spent
    FROM
        Invoices i
    JOIN
        Hotels h ON i.hotel_id = h.hotel_id
    ORDER BY
        i.total_gross DESC
    LIMIT 1;
    '''

results = execute_query(db_path, query)
for row in results:
    print(row)

('Citadines Michel Hamburg', 903.63)
