In [None]:
# import libraries
import os
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from os import listdir
from os.path import join
from natsort import natsorted # needed for sorting filenames of the receipts
from datetime import datetime
from scipy.signal import argrelmin, argrelmax
import re
from PIL import Image, ImageDraw


SA_KEY=os.getenv("GOOGLE_SA_KEY")
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = SA_KEY

In [None]:
# Googles OCR function
def detect_text(path):
    """Detects text in the file."""
    from google.cloud import vision

    client = vision.ImageAnnotatorClient()

    with open(path, "rb") as image_file:
        content = image_file.read()

    image = vision.Image(content=content)

    response = client.text_detection(image=image)
    texts = response.text_annotations
    
    '''
    # commented out to supress printed output of the function
    print("Texts:")
    for text in texts:
        print(f'\n"{text.description}"')

        vertices = [
            f"({vertex.x},{vertex.y})" for vertex in text.bounding_poly.vertices
        ]

        print("bounds: {}".format(",".join(vertices)))
    '''
    if response.error.message:
        raise Exception(
            "{}\nFor more info on error messages, check: "
            "https://cloud.google.com/apis/design/errors".format(response.error.message)
            )
    return response

In [None]:
# find all receipts in the specified path and sort them in ascending order
path = '../rewe_scanned/'
files = natsorted(listdir(path))
files.pop(0) # remove DSstore file
files

In [None]:

def draw_boxes(image, bounds, color):
    """Draws a border around the image using the hints in the vector list.

    Args:
        image: the input image object.
        bounds: list of coordinates for the boxes.
        color: the color of the box.

    Returns:
        An image with colored bounds added.
    """
    draw = ImageDraw.Draw(image)

    for bound in bounds:
        draw.polygon(
            [
                bound.vertices[0].x,
                bound.vertices[0].y,
                bound.vertices[1].x,
                bound.vertices[1].y,
                bound.vertices[2].x,
                bound.vertices[2].y,
                bound.vertices[3].x,
                bound.vertices[3].y,
            ],
            None,
            color, width=3
        )
    return image

In [None]:
def process_receipts(path,filename):    

    #path = '../rewe_scanned/'
    #filename = 'Penny_3.jpg'

    # Apply function to an receipt
    response = detect_text(join(path,filename))

    # The text_annotations contain the recognized text and the corresponding bounding boxes
    # the first entry contains the whole text from the receipt and the consecutive entries
    # contain the text/coordinates from the individual bounding boxes
    texts = response.text_annotations

    # Build dataframe, where bl: bottom_left, br: bottom_right, tr: top_right, tl: top_left
    # denote the corners of the BBs
    columns = ["String", "x_bl", "y_bl", "x_br", "y_br","x_tr","y_tr","x_tl","y_tl"] # uncomment if you need x coords as well
    #columns = ["String", "y_bl", "y_br","y_tr","y_tl"]
    df = pd.DataFrame(columns=columns)
    bounds = []
    for i, text in enumerate(texts[1:]):
        df.loc[i, "String"] = text.description
        bounds.append(text.bounding_poly)
        for j in range(4):
            df.iloc[i,2*j+1] = text.bounding_poly.vertices[j].x  # uncomment if you need x coords as well 
            #df.iloc[i,j+1] = text.bounding_poly.vertices[j].y
            df.iloc[i,2*j+2] = text.bounding_poly.vertices[j].y  # uncomment if you need x coords as well

    # convert the coords to integers for calculation of the mean BB positions
    df[['y_bl','y_br','y_tr','y_tl']] = df[['y_bl','y_br','y_tr','y_tl']].astype('int')
    # calulate mean BB positions
    df['mean_y'] = df.eval('(y_bl+y_br+y_tr+y_tl)/4')

    # sort DF by mean height to match text that appears in the same line
    df = df.sort_values(by=['mean_y']).reset_index(drop=True)

    # select only the block of the receipt where the products are listed
    product_list_start_ind = int(df[df.String== 'EUR'].index.values[0])+1
    try:
        product_list_end_ind = int(df[df.String=='SUMME'].index.values)
    except:
        product_list_end_ind = int(df[df.String=='SUM'].index.values)

    df_products = df[product_list_start_ind:product_list_end_ind]

    #image = Image.open('../rewe_scanned/Penny_3.jpg')
    #image_boxed = draw_boxes(image,bounds,'blue')
    #image_boxed.show()

    def find_date(input_string):
        # define the date pattern in the format 'TT.MM.YYYY' and include possible whitespaces
        date_pattern = r'\b\d{2}\.\s?\d{2}\.\s?\d{4}\b'
        # search for pattern in input_string
        found = re.search(date_pattern, input_string)
        # Check if date was found
        if found:
            return found.group(0)
        else:
            return "Date not found"
    
    date = find_date(texts[0].description).replace(' ','')
    date_dt = datetime.strptime(date,'%d.%m.%Y').date()

    df_products.reset_index(drop=True,inplace=True)

    x = np.linspace(1,df_products.shape[0],num=df_products.shape[0])
    slope = (max(df_products.mean_y)-min(df_products.mean_y))/df_products.shape[0]
    y_flat = df_products.mean_y - slope*x-min(df_products.mean_y)

    # for local maxima
    max_ind = argrelmax(y_flat.to_numpy())[0]
    max_ind = np.append(0,max_ind[:-1])

    # for local minima
    min_ind = argrelmin(y_flat.to_numpy())[0]

    pd.options.mode.chained_assignment = None  # default='warn'

    # label the rows of the dataframe with the corresponding lines on the receipt
    df_products['line']=''
    for i in range(len(max_ind)):
        df_products['line'].iloc[max_ind[i]:min_ind[i]+1] = i
    df_products['line'].iloc[min_ind[i]+1:df_products.shape[0]] = i+1 # make sure the last line gets labeled as well

    df_products = df_products.sort_values(by=['line','x_bl']).reset_index(drop=True)

    df_sorted = df_products.groupby('line')['String'].apply(lambda x: ' '.join(x)).reset_index()

    # sort out lines that do not contain any price information
    df_sorted = df_sorted[df_sorted['String'].str.contains(' B',case=True)|df_sorted['String'].str.contains(' A *',case=True)].reset_index(drop=True)

    # remove the tax remarks at the end of the strings
    df_sorted['String'] = df_sorted['String'].str.replace(r' B$','',regex=True)
    df_sorted['String'] = df_sorted['String'].str.replace(r' A \*$','',regex=True)
    df_sorted['String'] = df_sorted['String'].str.replace(r' A$','',regex=True)

    def extract_price(input_str):
        # Search for pattern: [whitespace][letter or percentage symbol]
        match = re.search(r' [A-Za-z%]', input_str[::-1])
        if match:
            # position of the matching pattern
            position = match.start()
            # residual string starting from the position of the matched pattern
            res_str = input_str[-position:]
        else:
            res_str = input_str
        return res_str
    
    df_sorted['price'] = df_sorted['String'].apply(extract_price)
    # dirty-fix section
    df_sorted['price'] = df_sorted['price'].apply(lambda x: re.sub(f'^{re.escape("14 ")}', '',x))
    df_sorted['price'] = df_sorted['price'].apply(lambda x: re.sub(f'^{re.escape("2.0 ")}', '',x))
    df_sorted['price'] = df_sorted['price'].apply(lambda x: re.sub(f'^{re.escape("W. ")}', '',x))
    df_sorted['price'] = df_sorted['price'].apply(lambda x: re.sub(f'^{re.escape("102 ")}', '',x))
    df_sorted['price'] = df_sorted['price'].apply(lambda x: re.sub(f'^{re.escape("10x220 ")}', '',x))

    # remove the price from the string
    def replace_str(row):
        return row['String'].replace(row['price'],'')

    df_sorted['String'] = df_sorted.apply(replace_str,axis=1)

    # formatting of price column
    df_sorted['price'] = df_sorted['price'].str.lstrip('.B')
    df_sorted['price'] = df_sorted['price'].str.replace(',','.')
    df_sorted['price'] = df_sorted['price'].str.replace(' ','')
    df_sorted['price'] = df_sorted['price'].astype('float')

    df_sorted.drop('line',axis=1,inplace=True)
    df_sorted.rename(columns={'String':'product_abbr'},inplace=True)

    df_sorted['receipt_id'] = filename
    df_sorted['date'] = date_dt

    return df_sorted

In [None]:
df_list = []

for file in files:
    df = process_receipts(path,file)
    df_list.append(df)

df_all = pd.concat(df_list,ignore_index=True)


In [None]:
pd.set_option('display.max_rows', None)
df_all

In [None]:
df_all.to_csv('../data/all_scanned_receipts_cleaned.csv')