# 0) Install and import dependencies

In [100]:
#!pip install langchain OpenAI torch

In [101]:
from langchain.llms import OpenAI
from transformers import BertModel, BertTokenizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from jsonschema import validate

import os
import time
import json
import pandas as pd
import numpy as np
import torch
import glob

In [102]:
UPDATERECEIPTS = False
UPDATEVENDOREMBEDDATABASE = False
UPDATEPRODUCTEMBEDDATABASE = False

# 1) Use ChatGPT to convert receipt text into structured JSON

* Make sure it generates correct data (use asserts to test all of this)
* Make sure edge cases are handled (ex: blank fields, fields not in correct datatype, dollar sign in total, phone number larger than 10 digits)
* Prevent language model from returning invalid json

## OpenAI API Key

In [103]:
openai_api_key = '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'

## ChatGPT Prompt

In [104]:
prompt = '''Please analyze the provided receipt and extract relevant information to fill in the following structured format:
{
  "ReceiptInfo": {
    "merchant": "(string value)",
    "address": "(string value)", (split into street address, city, and state)
    "city": "(string value)",
    "state": "(string value)",
    "phoneNumber": "(string value)",
    "tax": "(float value)", (in dollars)
    "total": "(float value)", (in dollars)
    "receiptDate": "(string value)",
    "receiptTime": "(string value)", (if available)
    "ITEMS": [
      {
        "description": "(string value)",
        "quantity": "(integer value)",
        "unitPrice": "(float value)",
        "totalPrice": "(float value)",
        "discountAmount": "(float value)" if any
      }, ...
    ]
  }
}
Remember to check for any discounts or special offers applied to the items and reflect these in the item details. Make sure to end the json object and make sure it's in json format.
1. tax, total, unitPrice, totalPrice, discountAmount in float value, and quantity in integer value
2. ignore all <UNKNOWN> in the text
3. Your response should start with { and end with },
4. make sure close all ReceiptInfo and use , to separate different ReceiptInfo

example: """Marley's Shop
123 Long Rd
Kailua, HI 67530
(808) 555-1234
CASHIER: JOHN
REGISTER #: 6
04/12/2023
Transaction ID: 5769009
PRICE   QTY  TOTAL
APPLES (1 lb)
2.99 2 5.98  1001
-1.00  999
Choco Dream Cookies
7.59 1 7.59   1001
SUBTOTAL
13.57
SALES TAX 8.5%
1.15
TOTAL
-14.72
VISA CARD            14.72
CARD#: **1234
REFERENCE#: 6789
THANK YOU FOR SHOPPING WITH US!
"""

from example should get:
{
  "ReceiptInfo": {
    "merchant": "Marley's Shop",
    "address": "123 Long Rd",
    "city": "Kailua",
    "state": "HI",
    "phoneNumber": "(xxx) xxx-xxxx",
    "tax": 1.15,
    "total": 14.72,
    "receiptDate": "04/12/2023",
    "receiptTime": "Transaction ID: 5769009",
    "ITEMS": [
      {
        "description": "APPLES (1 lb)",
        "quantity": 2,
        "unitPrice": 2.99,
        "totalPrice": 5.98,
        "discountAmount": 1.00
      },
      {
        "description": "Choco Dream Cookies",
        "quantity": 1,
        "unitPrice": 7.59,
        "totalPrice": 7.59,
        "discountAmount": 0
      }
    ]
  }
}
'''

## Read in all receipt texts, convert to list of JSON, and output to file

In [105]:
def read_text_files(folder_path):
    text_list = []

    if not os.path.isdir(folder_path):
        print("Invalid folder path.")
        return None

    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        
        if os.path.isfile(file_path) and filename.endswith('.txt'):
            with open(file_path, 'r') as file:
                file_content = file.read()
                text_list.append(file_content)  # Append file content as a string to the list
                
    return text_list

In [106]:
schema = {
        "type": "object",
        "properties": {
            "ReceiptInfo": {
                "type": "object",
                "properties": {
                    "merchant": {"type": "string"},
                    "address": {"type": "string"},
                    "city": {"type": "string"},
                    "state": {"type": "string"},
                    "phoneNumber": {"type": "string"},
                    "tax": {"type": "number"},
                    "total": {"type": "number"},
                    "receiptDate": {"type": "string"},
                    "ITEMS": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "description": {"type": "string"},
                                "quantity": {"type": "number"},
                                "unitPrice": {"type": "number"},
                                "totalPrice": {"type": "number"},
                                "discountAmount": {"type": "number"}
                            },
                        },
                    },
                },
            },
        },
    }

def process_and_validate_json(response, schema):
    # Find the index of the first '{'
    brace_index = response.find('{')
    
    # If '{' is found and it's not the first character
    if brace_index != -1:
        # Extract JSON from the substring starting from the first '{'
        extracted_json = response[brace_index:]
        
        # Validate the extracted JSON against the provided schema
        try:
            validate(instance=json.loads(extracted_json), schema=schema)
            return extracted_json
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
        except ValidationError as e:
            print(f"JSON validation error: {e}")
    
    # Return None if '{' is not found or it's the first character
    return None

In [107]:
def generate_response(input_text):
    llm = OpenAI(model="gpt-3.5-turbo-instruct", temperature=0, openai_api_key=openai_api_key, max_tokens=1056)
    response = llm(input_text)
    return process_and_validate_json(response, schema)

In [108]:
if UPDATERECEIPTS:
    folder_path = './receipts/text'
    file_path = f'./entities.json'

    receipts = read_text_files(folder_path)

    receipts_json = []
    errorReceipts = []
    files_processed = 0
    for receipt in receipts:
        receipt_json = json.loads(generate_response(prompt + receipt))
        receipts_json.append(receipt_json)
        files_processed += 1

    with open(file_path, 'w') as file:
        json.dump(receipts_json, file, indent=4)

# 2) Create vector databases

## Load BertTokenizer and BertModel

In [109]:
model_name = "BAAI/bge-large-en"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

## Convert word to embeddings dataframe

In [110]:
def generate_embeddings(word):
    inputs = tokenizer(word, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1)  # Mean pooling of token embeddings
    return embeddings
    
# Function to convert words in a DataFrame column to embeddings
def convert_to_embeddings_df(df):
    embeddings = [generate_embeddings(x) for x in df.iloc[:, 0]] 
    dfs = []
    for embedding in embeddings:
        dfs.append(pd.DataFrame(embedding))
    return pd.concat(dfs)       

## Create vector database for vendors and output to CSV

In [111]:
def getVendorEmbeddedDatabase():    
    folder_path = './vendor database/' 
    vendorDatabase = pd.DataFrame()
    csv_files = glob.glob(os.path.join(folder_path, '*.csv'))
    for file in csv_files:
        category = file.split('/')[-1]
        category_name = category.split('_')[0]
        newCategory = pd.read_csv(file, encoding='latin-1')
        newColumn = convert_to_embeddings_df(newCategory)
        newColumn['Category'] = category_name
        vendorDatabase = pd.concat([vendorDatabase, newColumn], ignore_index=True, axis=0)
    vendorDatabase.to_csv("./embeddedVendorDatabase.csv")

        
    return vendorDatabase

if UPDATEVENDOREMBEDDATABASE:
    getVendorEmbeddedDatabase()

In [112]:
def getProductEmbeddedDatabase():

    # Directory path containing subfolders with product CSV files
    root_folder = './product database/'
    productDatabase = pd.DataFrame()

    for root, dirs, files in os.walk(root_folder):
        for file in files:
            if file.endswith('.csv'):
                # Get the absolute path of the CSV file
                csv_file_path = os.path.join(root, file)
                category = csv_file_path.split('/')[-2]
                category_name = category.split('_')[0]
                #print(csv_file_path)
                newCategory = pd.read_csv(csv_file_path, encoding='latin-1')
                newColumn = convert_to_embeddings_df(newCategory)
                newColumn['Category'] = category_name
                productDatabase = pd.concat([productDatabase, newColumn], ignore_index=True, axis=0)
    productDatabase.to_csv("./embeddedProductDatabase.csv")

        
    return productDatabase

if UPDATEPRODUCTEMBEDDATABASE:
    getProductEmbeddedDatabase()

## Split vector database into X, y

In [113]:
def getEmbeddedDatabase(filePath):
    df = pd.read_csv(filePath)
    df = df.drop('Unnamed: 0', axis=1)
    
    # Creating variables from database values
    X = df.drop('Category', axis=1)
    y = df['Category']
    
    return X, y

In [115]:
def getReceiptTestData():
    # Read and parse the JSON file
    with open('./entities.json', 'r') as file:
        data = json.load(file)
        
    entry_number = 0

    # Initialize lists to store data
    merchants = []
    descriptions = []

    # Iterate through the data
    for entry in data:
        entry_number += 1 
        merchant = entry["ReceiptInfo"]["merchant"]
        items = entry["ReceiptInfo"]["ITEMS"]

        # Initialize a list to store cleaned descriptions for this entry
        cleaned_descriptions = []

        # Remove "number+space" occurrences in the descriptions and add to the list
        for item in items:
            description = item.get('description', 'No Description')
            cleaned_description = ' '.join(word for word in description.split() if not word.isdigit())
            cleaned_descriptions.append(cleaned_description)

        # Remove "UNKNOWN," "<UNKNOWN>," and "unknown" from the merchant field
        merchant = merchant.replace("UNKNOWN", "").replace("<UNKNOWN>", "").replace("unknown", "").replace("<>", "")

        # Add the merchant and descriptions to the respective lists
        merchants.append(merchant)
        descriptions.append(cleaned_descriptions)

    # Create a DataFrame and save as CSV
    entities_df = pd.DataFrame({
        'Merchants': merchants, 
        'Descriptions': descriptions
    })
    entities_df.to_csv('entities_database.csv', index=0)

In [116]:
def KNN(X_train, y_train, X_test):
    clf = KNeighborsClassifier(n_neighbors=20)
    clf.fit(X_train, y_train)

    return (clf.predict(X_test))
     

In [136]:
categories = ["Grocery/Supermarkets", "Restaurants/Food Services", "Clothing/Apparel", "Health/Beauty", "Electronics/Appliances", "Home/Garden", "Entertainment/Leisure"]

def getVendorCategory():#listOfItems, Title):
    
    getReceiptTestData()
    
    if UPDATEVENDOREMBEDDATABASE:
        getVendorEmbeddedDatabase()

        
    X_train, y_train = getEmbeddedDatabase(f'./embeddedVendorDatabase.csv')
    testindDB = pd.read_csv("entities_database.csv")
    merchants = testindDB['Merchants'].to_frame()
    testindDB = pd.read_csv("entities_database.csv")
    testindDB['Descriptions'] = testindDB['Descriptions'].apply(lambda lst: ''.join(lst))
    testindDB['Merchants'] = testindDB['Merchants'].str.cat(testindDB['Descriptions'], sep=' ')
    receiptEmbeddings = convert_to_embeddings_df(testindDB)
    X_test = receiptEmbeddings.values
    
    results = pd.DataFrame(KNN(X_train, y_train, X_test), columns=['KNN Prediction']) 
    result_df = pd.concat([merchants, results], axis=1)
    return result_df

vendorPrediction = getVendorCategory()
vendorPrediction.to_csv("./VendorCategoryPredictions.csv")



# 3) Same thing as 2 but you have to define the categories for the ingredients


* Use title of item plus something else (ex: category of vendor)

In [139]:
categories = []

def getProductCategory():#jsonObject, listOfItems, Title):
    getReceiptTestData()
    
    if UPDATEPRODUCTEMBEDDATABASE:
        getProductEmbeddedDatabase()
        
    X_train, y_train = getEmbeddedDatabase(f'./embeddedProductDatabase.csv')
    testindDB = pd.read_csv("entities_database.csv")
    merchants = testindDB['Merchants'].to_frame()
    testindDB['Descriptions'] = ' '.join(testindDB['Descriptions']) 
    testindDB['Items'] = testindDB['Merchants'].str.cat(testindDB['Descriptions'], sep=' ')
    receiptEmbeddings = convert_to_embeddings_df(testindDB)
    X_test = receiptEmbeddings.values
    
    results = pd.DataFrame(KNN(X_train, y_train, X_test), columns=['KNN Prediction']) 
    result_df = pd.concat([merchants, results], axis=1)
    return result_df

productPrediction = getProductCategory()
productPrediction.to_csv("./ProductCategoryPredictions.csv")



# 4) Create tests in python

* Functions that just test one test and shows that tests passed/failed
* At the end shows how many passed and how many failed

- Example:

     - handleVendor.py
     - all test functions tested in testHandleCategory.py (test all the functions in hangleVendor.py) asserts at the end of each test function

     - Fixtures in test file: testing all of the things that are needed for the code to run

- For part 2, test for:
    - If 7 categories, one of the 7 categories and one of the 7 categories
    - Edge cases (ex: error in formatting, must be string in list of possible categories)