In [42]:
%pip install -q \
    fuzzywuzzy \
    easyocr \
    pandas \
    requests

debug = False


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.1.2[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [43]:
import requests

def writeToEndpoint(endpoint, json):
    url = f'http://localhost:3000/api/{endpoint}'
    response = requests.post(url, json=json)
    if response.status_code == 201:
        if debug:
            print(f'Succes writing {endpoint}. Response:\n{response.text}')
    else:
        # if debug:
        print(f'\nError writing {endpoint}. \nRequest:{json}\nResponse:\n{response.text}')
    return response


In [79]:
# seed reference items using api

# import from csv
import pandas as pd

# import reference items
referenceitems_df = pd.read_csv('data/referenceitems.csv')

# write reference items to endpoint
for index, row in referenceitems_df.iterrows():
    response = writeToEndpoint('referenceItem', {
        "name": row['name'],
        "quantity": row['quantity'],
        "unitOfMeasure": row['unitofmeasure'],
        "price": row['price'],
        "pricePerWeight": row['priceperweight'],
        "referenceUrl": row['referenceurl']
    })
    new_referenceitem_id = response.json()['id']
    # update the id in the dataframe to remote id
    referenceitems_df.at[index, 'id'] = int(new_referenceitem_id)
referenceitems_df['id'] = referenceitems_df['id'].astype(int)
referenceitems_df.set_index('id', inplace=True)
referenceitems_df

# import from known products to panda dataframe
products_df = pd.read_csv('data/products_from_2022.csv')
products_df.reset_index(drop=True, inplace=True)
products_df['id'] = products_df.index
products_df.set_index('id', inplace=True)
products_df

# map known products to remote reference item ids
temp_referenceItem_map = referenceitems_df.copy()
temp_referenceItem_map['id'] = temp_referenceItem_map.index
temp_referenceItem_map
products_df['referenceItemId'] = products_df['referenceItem'].map(temp_referenceItem_map.set_index('name')['id'].to_dict())
try:
    products_df['referenceItemId'] = products_df['referenceItemId'].astype(int)
except:
    print('Failed to convert referenceItemId to int')
    print(products_df[products_df['referenceItemId'].isnull()])
products_df.drop(columns=['referenceItem'], inplace=True)
products_df

# write products to endpoint
for index, row in products_df.iterrows():
    response = writeToEndpoint('product', {
        "name": row['name'],
        "weight": row['weight'],
        "unitOfMeasure": row['unitOfMeasure'],
        "referenceItemId": row['referenceItemId']
    })
    new_product_id = response.json()['id']
    # update the id in the dataframe to remote id
    products_df.at[index, 'id'] = int(new_product_id)
products_df['id'] = products_df['id'].astype(int)
products_df.set_index('id', inplace=True)
products_df

Unnamed: 0_level_0,name,weight,unitOfMeasure,referenceItemId
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,Annie's Homegrown Gluten-Free Rice Pasta,170,g,199
2,Blue Diamond Nut Thins Crackers Almond,120,g,192
3,Bob's Red Mill Cornstarch,510,g,191
4,Bob's Red Mill Gluten Free Old Fashioned Rolle...,907,g,203
5,Bob's Red Mill Gluten Free Quick Cooking Rolle...,794,g,203
...,...,...,...,...
91,schar hot dog buns,4,count,213
92,schar gf buns,4,count,213
93,Schar pizza crust,2,count,221
94,loaf white soft gf,480,g,212


In [87]:
products_df.loc[7].index

Index(['name', 'weight', 'unitOfMeasure', 'referenceItemId'], dtype='object')

In [89]:
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
import re
import easyocr
import pandas as pd
import shutil
import numpy as np
import os
import json

reader = easyocr.Reader(['en']) # this needs to run only once to load the model into memory

def ocrImage(image_id, image_path):
    receipt_strings = reader.readtext(image_path)

    # guard clause
    if len(receipt_strings) < 1:
        return # no text found

    # transformation
    receipt_strings_df = pd.DataFrame(receipt_strings, columns=['boundingBox', 'text', 'confidence'])
    receipt_strings_df.drop(columns=['confidence'], inplace=True)
    receipt_strings_df['imageFileId'] = image_id
    # receipt_strings_df['imageFileId'] = receipt_strings_df['imageFileId'].astype(int)

    # push to API
    responses = []
    for index, row in receipt_strings_df.iterrows():
        # print(row['text'])
        # print(row['boundingBox'])
        # print(row['imageFileId'])
        boundingBox_str = [[int(value) for value in sublist] for sublist in row['boundingBox']]
        response = writeToEndpoint('receiptText', {
            'text': str(row['text']), 
            'boundingBox': str(boundingBox_str), 
            'imageFileId': int(row['imageFileId'])
        })
        responses.append(response.json())
    return responses

choices_dict = products_df[['name']].to_dict()['name']

def parseReceiptTextsForEligibleExpenses(receipt_texts, receipt_id):
    # receipt = fetch_receipt(receipt_id)
    # receipt_texts = receipt['data']['receipt']['receiptTexts']
    threshold = 80
    
    def clean_text(text):
        text = re.sub(r'^\d{1,10}', '', text)
        if len(text) <= 2:
            return ''
        if not any(char.isalnum() for char in text):
            return ''
        return text
    
    def getPriceFromReceiptText(receipt_text_id, receipt_texts=receipt_texts):
        # 1. get bounding box of receipt text id
        receipt_text = next(filter(lambda x: x['id'] == receipt_text_id, receipt_texts))
        bounding_box = json.loads(receipt_text['boundingBox']) # "[[165, 910], [657, 910], [657, 959], [165, 959]]"
        # geometries
        # 1 get height of current text (x3 for search bound)
        # x_values = [coord[0] for coord in bounding_box]
        y_values = [coord[1] for coord in bounding_box]
        # width  = max(x_values) - min(x_values)
        height = max(y_values) - min(y_values)
        buffer = 0.25*2 # 0.25 for each side
        search_box_height = 3 * height
        search_box_y = min(y_values) - buffer * height
        # for simplicity search whole width, so no need for x values
        # 2 get all receiptTexts within search bound
        def filter_candidates(y_min, y_max, receipt_texts=receipt_texts):
            candidate_receiptTexts = []
            for receipt_text_i in receipt_texts:
                bounding_box_i = json.loads(receipt_text_i['boundingBox'])
                y_values_i = [coord[1] for coord in bounding_box_i]
                if all(y_min <= y <= y_max for y in y_values_i):
                    candidate_receiptTexts.append(receipt_text_i)
            return candidate_receiptTexts
        candidate_receiptTexts = filter_candidates(search_box_y, search_box_y + search_box_height)
        def find_price_from_candidate_receiptTexts(candidate_receiptTexts):
            for candidate_receiptText in candidate_receiptTexts:
                text = candidate_receiptText['text']
                price = re.findall(r'\d+\.\d{2}', text)
                if len(price) > 0:
                    return float(price[0])
            return None
        price = find_price_from_candidate_receiptTexts(candidate_receiptTexts)
        return price

    for receipt_text in receipt_texts:
        receipt_text_id = receipt_text['id']
        receipt_text = receipt_text['text']
        clean_receipt_text = clean_text(receipt_text)
        if clean_text(clean_receipt_text) == '':
            continue
        best_match, best_score, best_match_id = process.extractOne(clean_receipt_text, choices_dict, scorer=fuzz.token_set_ratio)

        if best_score >= threshold:
            # avoid strict substring matches
            if len(clean_receipt_text)/len(best_match) < 0.4:
                continue
            # found a decent match
            # get corresponding referenceItem 
            product_id = best_match_id
            # get price
            item_price_each = getPriceFromReceiptText(receipt_text_id) or 0
            # create eligible expense
            # print(f'Creating expense for {receipt_text} with reference item id {reference_item_id}')
            writeToEndpoint('expense',{
                    'receiptTextId': receipt_text_id,
                    'productId': product_id,
                    'receiptId': receipt_id,
                    'priceEach': item_price_each, # TODO: find price
                    'quantity': 1, # TODO: find quantity
                }
            )

def upload_image(image_path, receipt_id):
    # fake upload, i.e. copy to frontend folder
    local_destination_folder = "../receipts-app/public/uploads/"
    remote_destination_folder = "/uploads/"
    shutil.copy(image_path, local_destination_folder)
    # image_url = local_destination_folder + image_path.split("/")[-1]
    image_url = remote_destination_folder + image_path.split("/")[-1]
    response = writeToEndpoint('imageFile', {
        "url": image_url, 
        "receiptId": receipt_id,
        })
    return response
# test
# upload_image('./receipts/IMG_4553.jpg')

# TODO: batch multiple images in array
def main_single_receipt(image_path):
    receipt_id = writeToEndpoint('receipt', {}).json()['id']
    image_id = upload_image(image_path, receipt_id).json()['id']
    receiptTexts = ocrImage(image_id, image_path) # for notebooks only, using local file
    print(f'Parsed receipt {receipt_id}: {receiptTexts}', end='\r')
    if receiptTexts is None:
        return None
    parseReceiptTextsForEligibleExpenses(receiptTexts, receipt_id)

# main_single_receipt('./receipts/IMG_4553.jpg')


In [90]:
main_single_receipt('./receipts/img_4576.jpg')

Parsed receipt 37: [{'id': 2671, 'text': 'DANDELION FOODS', 'boundingBox': '[[272, 146], [505, 146], [505, 195], [272, 195]]', 'imageFileId': 37, 'createdAt': '2024-08-09T03:07:45.097Z', 'updatedAt': '2024-08-09T03:07:45.097Z'}, {'id': 2672, 'text': '451 OTTAWA STREET', 'boundingBox': '[[260, 188], [515, 188], [515, 232], [260, 232]]', 'imageFileId': 37, 'createdAt': '2024-08-09T03:07:45.104Z', 'updatedAt': '2024-08-09T03:07:45.104Z'}, {'id': 2673, 'text': 'ALMONTE, ON, KOA 1A0', 'boundingBox': '[[244, 225], [547, 225], [547, 274], [244, 274]]', 'imageFileId': 37, 'createdAt': '2024-08-09T03:07:45.112Z', 'updatedAt': '2024-08-09T03:07:45.112Z'}, {'id': 2674, 'text': 'TEL', 'boundingBox': '[[245, 273], [295, 273], [295, 310], [245, 310]]', 'imageFileId': 37, 'createdAt': '2024-08-09T03:07:45.119Z', 'updatedAt': '2024-08-09T03:07:45.119Z'}, {'id': 2675, 'text': '# (613) 256-4545', 'boundingBox': '[[303, 263], [547, 263], [547, 312], [303, 312]]', 'imageFileId': 37, 'createdAt': '2024-08-

In [None]:
def list_files(directory, extension):
    return list(f for f in os.listdir(directory) if f.endswith('.' + extension))

all_files = list_files('./receipts', 'jpg')

for file in all_files:
    main_single_receipt(f'./receipts/{file}')