In [None]:
import numpy as np
import csv
import difflib
#from fuzzywuzzy import fuzz
import re
import datetime
import os
import heapq
import tqdm
import pickle

## Loading the Data

In [None]:
# Reads in OCR file
def read_ocr(filepath):
    out = []
    with open(filepath, 'r') as f:
        for line in f.readlines():
            splitted = line.split(',', 8)
            coords = [(splitted[i], splitted[i+1]) for i in range(0, 8, 2)]
            text = splitted[8]
            out.append((coords, text))
    return out

# Reads in the Users.csv
def read_users(filepath):
    user_input = []


    # Might be better to put it in this format, we'll see later
    user_documentid = []
    user_amount = []
    user_date = []  
    user_vendor_name = []
    user_vendor_address = []
    

    with open(filepath, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            # Tuple containing info for a single row
            row_input = [row['documentid'], 
                         row['amount'], 
                         row['date'], 
                         row['vendor_name'], 
                         row['vendor_address']]

            user_input.append(row_input)


            # Delete this block if we use tuple version
            user_documentid.append(row['documentid'])
            user_amount.append(row['amount'])
            user_date.append(row['date'])
            user_vendor_name.append(row['vendor_name'])
            user_vendor_address.append(row['vendor_address'])
    
    return (user_input)

users_train = read_users("new_Users_train.csv")
users_test = read_users("new_Users_test.csv")
users_all = users_train + users_test
users_train_filenames = set([u[0] for u in users_train])
users_test_filenames = set([u[0] for u in users_test])
users_all_filenames = users_train_filenames.union(users_test_filenames)
ocr_filenames_cocr_train = sorted([f for f in os.listdir("custom_ocr") if f.endswith(".csv") and f[:-4] in users_train_filenames])
ocr_filenames_train = sorted([f for f in os.listdir("ocr") if f.endswith(".csv") and f[:-4] in users_train_filenames])
ocr_filenames_test = sorted([f for f in os.listdir("ocr") if f.endswith(".csv") and f[:-4] in users_test_filenames])
ocr_filenames_cocr_test = sorted([f for f in os.listdir("custom_ocr") if f.endswith(".csv") and f[:-4] in users_test_filenames])
ocr_data_cocr_train = [read_ocr(os.path.join("custom_ocr", f)) for f in ocr_filenames_cocr_train]
ocr_data_train = [read_ocr(os.path.join("ocr", f)) for f in ocr_filenames_train]
ocr_data_test = [read_ocr(os.path.join("ocr", f)) for f in ocr_filenames_test]
ocr_data_cocr_test = [read_ocr(os.path.join("custom_ocr", f)) for f in ocr_filenames_cocr_test]
combined_ocr_data = ocr_data_cocr_train + ocr_data_train + ocr_data_test + ocr_data_cocr_test
combined_ocr_filenames = ocr_filenames_cocr_train + ocr_filenames_train + ocr_filenames_test + ocr_filenames_cocr_test


## Matching Special Texts
### Date
We use a set of Regular Expressions to recognize potential date text from the OCR data. Each date format is associated with a *confidence* value. For example, we have higher confidence on date formats with English month name (Jan/Feb/...) than we do on numeral separator-free YYYYMMDD format.
### Price
Any number of format d+.dd (where d is decimal) is match as potential price.

In [None]:
#year_regex = re.compile(r"(19[7-9]\d)|(20[0-2]\d)|((9|0|1)\d|2(0-3))")
#year_regex = re.compile(r"(19[7-9]\d)|(20[0-2]\d)|(1\d)")
year_regex = re.compile(r"(20)?1\d")
month_regex = re.compile(r"(0?\d)|10|11|12|Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec", flags=re.IGNORECASE)
months_name_map = {(v.lower()): (i + 1) for (i, v) in enumerate(["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"])}
day_regex = re.compile(r"[0-3]?\d")

date_candidates = [(re.compile(s, flags=re.IGNORECASE), confidence) for (s, confidence) in [
    (r"(\d{1,4})/(\d{1,2})/(\d{1,4})", 0.9),
    (r"(\d{1,4})\-(\d{1,2})\-(\d{1,4})", 0.8),
    (r"(\d{1,4})[/|\-|\s](Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w{0,6}[/|\-|\s](\d{1,4})", 1.0),
    (r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w{0,6}[/|\-|\s](\d{1,2}),?[/|\-|\s](\d{1,4})", 1.66),
    (r"(\d{1,4})\.(\d{1,2})\.(\d{1,4})", 0.6),
    (r"(\d{4})(\d{2})(\d{2})", 0.3),
    (r"(\d{2})(\d{2})(\d{4})", 0.3),
]]

def result_ok(s, res):
    beg = res.start()
    end = res.end()
    return (beg == 0 or not s[beg - 1].isnumeric()) and (end == len(s) or not s[end].isnumeric())

def find_date(inputs: list[str]) -> list[tuple[int, int, int, float, int]]:
    out = []
    for (idx,s) in enumerate(inputs):
        for (can, confidence) in date_candidates:
            res = can.search(s)
            if res and result_ok(s, res):
                s1, s2, s3 = res.group(1, 2, 3)
                if (rday := day_regex.fullmatch(s1)) and (rmonth := month_regex.fullmatch(s2)) and (ryear := year_regex.fullmatch(s3)):
                    syear = s3
                    smonth = s2
                    iday = int(s1)
                elif (rday := day_regex.fullmatch(s3)) and (rmonth := month_regex.fullmatch(s2)) and (ryear := year_regex.fullmatch(s1)):
                    syear = s1
                    smonth = s2
                    iday = int(s3)
                elif (rday := day_regex.fullmatch(s2)) and (rmonth := month_regex.fullmatch(s1)) and (ryear := year_regex.fullmatch(s3)):
                    confidence *= 0.6
                    syear = s3
                    smonth = s1
                    iday = int(s2)
                else:
                    continue
                iyear = int(syear) % 100
                if len(syear) != 4:
                    confidence *= 0.75
                imonth = int(smonth) if smonth.isnumeric() else months_name_map[smonth.lower()]
                out.append((iyear, imonth, iday, confidence, idx))
                break

    if len(out) == 0:
        return [(0, 0, 0, 0.0, 0)]
    return out

money_regex = re.compile(r"\d+\.\d{2}")
def find_money(inputs: list[str]) -> list[tuple[str, int]]:
    out = []
    for (idx,s) in enumerate(inputs):
        res = money_regex.search(s)
        if res and result_ok(s, res):
            out.append((res.group(), idx))
    return out



In [None]:
"""
Compute fuzzy distance between two strings.
"""
def text_distance(ocrTest, userData, hq=False):
    global last_seq2
    if userData == last_seq2:
        pass
    else:
        cached_matcher.set_seq2(userData)
        last_seq2 = userData
    cached_matcher.set_seq1(ocrTest)
    return cached_matcher.ratio() if hq else cached_matcher.quick_ratio()

cached_matcher = difflib.SequenceMatcher(None, "", "")
last_seq2 = ""

## Matching OCR text fragments to user entry text
Once we've figured out which OCR fragment is date, which is price, etc., we start comparing them to their respective field in the user entry CSV. Each data type is compared differently.

### Date
For date, we first parse the OCR fragments into integer year, month, day, and compare that to the spreadsheet with fuzzy matching (to account for typos).

### Amount
We parse all the potentail amount numbers we find on the receipt as floats, and try to guess the "total" amount. This is usually the second or third biggest number (the top three are "cash", "amount", and "change"). Each guess is compared fuzzily to the amount string from the user entry CSV.

### Vendor Name
For vendor name, we simply fuzzy match each fragment of OCR text with the user entry name, and take the closest one.

### Address
We guess which text fragments might contain address data by detecting "address words" such as "lot" or "jalan", and city/state names such as "Selanggor" or "Perlis". We then remove common "address words" and compare every fragment with the address string in the user entry CSV by counting the number of intersecting words. We score how likely the OCR data matches the entry address by summing up the result of these comparisons weighted by confidence.




In [None]:
def get_date_score(ocr_fragments: list[str], spreadsheet_date: str) -> tuple[int, float]:
    return max([
        (idx, confidence * text_distance(
            f"{iyear}-{imonth}-{iday}",
            spreadsheet_date[(spreadsheet_date.find('-')-2):]
        ))
        for (iyear, imonth, iday, confidence, idx) in find_date(ocr_fragments)
    ], key=lambda sv: sv[1])

def read_amount(s: str) -> float:
    try:
        return float(s)
    except ValueError:
        return 0.0

def score_amount(d1: float, d2: float) -> float:
    #return text_distance(str(round(d1)), str(round(d2))) + text_distance(f"{(d1 % 1):.02f}", f"{(d2 % 1):.02f}")
    return text_distance(f"{d1:.2f}", f"{d2:.2f}", hq=True)

def get_amount_score(ocr_fragments: list[str], spreadsheet_amount: str) -> tuple[int, float]:
    parsed = sorted([(read_amount(s), idx) for (s, idx) in find_money(ocr_fragments)], key=lambda si: -si[0])
    l3 = len(parsed)-3
    if sum([v for (v, _idx) in parsed[1:3]]) == parsed[0][0] and len(parsed) > 3:
        start_confidences = [0.7, 1.0, 1.0]
    else:
        start_confidences = [0.8, 0.75, 0.70]
    amounts = [(p, confidence, idx) for ((p, idx), confidence) in zip(parsed, start_confidences  + [0.6, 0.5, 0.4] + [0.3] * l3)]
    ps = read_amount(spreadsheet_amount)
    return max([
        (idx, confidence * score_amount(
            p,
            ps
        ))
        for (p, confidence, idx) in amounts
    ], key=lambda sv: sv[1])


PROBABLE_ADDRESS_WORDS = set([s.lower() for s in [
    "Johor", "Kedah", "Kelantan", "Malacca", "Negeri", "Pahang", "Penang", "Perak", "Perlis", "Sabah", "Sarawak", "Selangor", "Terengganu", "Kuala", "Lumpur", "Labuan", "Victoria", "Putrajaya",
    "George", "Town", "Kuala", "Lumpur", "Ipoh", "Kuching", "Johor", "Bahru", "Putrajaya", "Kota", "Kinabalu", "Shah", "Alam", "Malacca", "City", "Alor", "Setar", "Miri", "Petaling", "Jaya", "Kuala", "Terengganu", "Iskandar", "Puteri", "Seberang", "Perai", "Seremban", "Subang", "Jaya", "Pasir", "Gudang", "Kuantan", "Bandaraya", 
    "jalan", "jln", "lot", "no.", "no", "flr", "floor"
]])
COMMON_ADDRESS_WORDS = set(["jalan", "jln", "lot", "no.", "no", "flr", "floor"])

address_sep_regex = re.compile(r"\s|\-|_|@|,")

def get_address_score(ocr_fragments: list[str], spreadsheet_address: str) -> float:
    addr_words = set([w.lower() for w in address_sep_regex.split(spreadsheet_address) if len(w)>0])
    addr_words.difference_update(COMMON_ADDRESS_WORDS)
    total_confidence = 0.0
    total_div = 0
    for s in ocr_fragments:
        splitted = set([w for w in address_sep_regex.split(s) if len(w)>0])
        if len(splitted.intersection(PROBABLE_ADDRESS_WORDS)) > 0:
            confidence = 1.0
        else:
            confidence = 0.3
        total_div += len(splitted) * confidence
        splitted.difference_update(COMMON_ADDRESS_WORDS)
        total_confidence += confidence * len(splitted.intersection(addr_words))

    return (total_confidence / total_div) if total_div > 0 else 0.0

def get_vendor_score(ocr_fragments: list[str], spreadsheet_vendor: str) -> float:
    max_vendor_score = 0.0
    for s in ocr_fragments:
        if (vendor_score := text_distance(spreadsheet_vendor, s, hq=True)) > max_vendor_score:
            max_vendor_score = vendor_score
    return max_vendor_score

Now we compute all those scores (amount, date, name, address) and save them in a table.

In [None]:
'''
Given a set of OCR data, array of user-inputed data, 
and gamma weights for the three user input fields, finds 
the user entry row that best matches to the OCR file.

Inputs:
    ocr_data - an array returned by read_ocr() that contains all of some OCR file
    user_input - a dataset of user-entered data in format [[id, price, date, vendor, address]]
    gamma - array of weights for price, date, vendor, and address, respectively

Outputs: an integer representing the row index of the user entry containing data
        that best fits ocr_data
'''
NEG_INF = float('-inf')

def compute_row_score(ocr_data, row_data):
    row_max_vendor_score = NEG_INF
    ocr_texts = [text.lower().strip() for (_coords, text) in ocr_data]

    row_best_date_ocr, row_max_date_score = get_date_score(ocr_texts, row_data[2].lower().strip())
    row_best_amount_ocr, row_max_amount_score = get_amount_score(ocr_texts, row_data[1].lower().strip())
    row_max_address_score = get_address_score(ocr_texts, row_data[4].lower().strip())
    row_max_vendor_score = get_vendor_score(ocr_texts, row_data[3].lower().strip())
    return row_max_amount_score, row_max_date_score, row_max_vendor_score, row_max_address_score

def make_table(user_rows, ocr_data):
    return [
        [
            compute_row_score(one_ocr_data, user_row)
            for user_row in user_rows
        ]
        for one_ocr_data in tqdm.tqdm(ocr_data)
    ]


In [None]:
#table = make_table(users_all, combined_ocr_data)
#pickle.dump(table, open("table.pickle", 'wb'))
table = pickle.load(open("table.pickle", 'rb'))
tablenp = np.array(table)

## Weighting the Scores
Our final score is defined as
$$\textrm{FinalScore} = \gamma_1 \cdot \textrm{AmountScore} + \gamma_2 \cdot \textrm{DateScore} + \gamma_3 \cdot \textrm{VendorNameScore} + \gamma_4 \cdot \textrm{AddressScore}$$

We find the optimal $\gamma$ values by basically brute-forcing.

In [None]:
def generate_answer_array(user_rows, ocr_filenames):
    ans_map = {row[0]: idx for (idx, row) in enumerate(user_rows)}
    return np.array([
        ans_map[fn[:-4]]
        for fn in ocr_filenames
    ])

"""
Turn the OCR x ENTRIES x 4 table into an OCR x ENTRIES table with the given 4-items gamma array.
"""
def apply_gamma(tablenp, gamma):
    gamma = np.array(gamma)
    dotted = np.dot(tablenp, gamma)
    return dotted

"""
Given a 4-items gamma array, return
1. An array of bool with dimension (NumOcrs,). True means that OCR was correctly matched.
2. The accuracy (in [0.0, 1.0] range).
"""
def evaluate_gamma(tablenp, gamma, ans_array):
    dotted = apply_gamma(tablenp, gamma)
    answers = np.argmax(dotted, axis=1)
    corrects = answers == ans_array
    correct_count = np.count_nonzero(corrects)
    total_count = len(corrects)
    ratio = correct_count/total_count
    return corrects, ratio

def evaluate_gamma_top_n(tablenp, gamma, ans_array, n: int):
    dotted = apply_gamma(gamma)
    answers = np.argsort(dotted, axis=1)
    corrects = answers[:, -n:] == np.expand_dims(ans_array, 1)
    included = np.sum(corrects, axis=1)
    ratio = np.count_nonzero(included) / len(included)
    return ratio


In [None]:
table_train = tablenp[len(ocr_data_cocr_train):len(ocr_data_cocr_train)+len(ocr_data_train),:len(users_train)]
ans_train = generate_answer_array(users_train, ocr_filenames_train)
table_train_pco = tablenp[:len(ocr_data_cocr_train)+len(ocr_data_train),:len(users_train)]
ans_train_pco = generate_answer_array(users_train, ocr_filenames_cocr_train + ocr_filenames_train)
table_test = tablenp[len(ocr_data_cocr_train)+len(ocr_data_train):-len(ocr_data_cocr_test),len(users_train):]
ans_test = generate_answer_array(users_test, ocr_filenames_test)
table_test_pco = tablenp[len(ocr_data_cocr_train)+len(ocr_data_train):,len(users_train):]
ans_test_pco = generate_answer_array(users_test, ocr_filenames_test + ocr_filenames_cocr_test)
table_both = tablenp[len(ocr_data_cocr_train):-len(ocr_data_cocr_test)]
ans_both = generate_answer_array(users_all, ocr_filenames_train + ocr_filenames_test)
table_both_pco = tablenp
ans_both_pco = generate_answer_array(users_all, combined_ocr_filenames)

In [None]:
# How does each score do on their own?
def test_each():
    ans = generate_answer_array(users_all, combined_ocr_filenames)
    print(evaluate_gamma(tablenp, [1, 0, 0, 0], ans)[1])
    print(evaluate_gamma(tablenp, [0, 1, 0, 0], ans)[1])
    print(evaluate_gamma(tablenp, [0, 0, 1, 0], ans)[1])
    print(evaluate_gamma(tablenp, [0, 0, 0, 1], ans)[1])
test_each()

0.3664
0.36
0.3328
0.3568


### Exhaustive Search of $\gamma$
For both visualization and optimization purposes, it is helpful to exhaustively explore a range of $\gamma$'s. We first established reasonable bounds for each $\gamma_i$, and then iteratively estimated the resulting accuracy scores.

In [None]:
'''
This function finds and writes to file the accuracy values for gammas, with step=n.

Inputs:
    n - the integer representing the step of the gamma
    gamma1 - array of gamma1 values
    gamma2 - array of gamma2 values
    gamma3 - array of gamma3 values
    gamma4 - array of gamma4 values
    array - 2D array of accuracy values
'''
def compute_gammas(gamma1, gamma2, gamma3, gamma4, tablenp, ans_arr):

    highest = float("-INF")
    highest_gamma = ""
    with open('gamma_accuracies_0.1.txt', 'w') as f:
        for i in gamma1:
            for j in gamma2:
                for k in gamma3:
                    for l in gamma4:
                        corrects, accuracy = evaluate_gamma(tablenp, [i, j, k, l], ans_arr)
                        line = str(i) + '\t' + str(j) + '\t' + str(k) + '\t' + str(l) + '\t' + str(accuracy) + '\n'
                        f.write(line)
                        print(line)
                        if accuracy > highest:
                            highest = accuracy
                            highest_gamma = line
    print(highest)
    print(highest_gamma)

                        
step = 0.1

#compute_gammas(np.arange(1, 2.5, step), np.arange(4, 6.5, step), np.arange(1, 3.5, step), np.arange(1, 3.5, step), tablenp[:-len(ocr_filenames_test), :-len(users_test)], generate_answer_array(users_all, ocr_filenames_train + ocr_filenames_cocr))
corrects, accuracy = evaluate_gamma(tablenp, [1.2, 4.1, 2.4, 2.1], generate_answer_array(users_all, combined_ocr_filenames))
print(accuracy)


0.9648


### Random $\gamma$ Exploration  
We explore how our choice of $\gamma$ impacts the final matching accuracy. This is computed by sampling a large number of gamma vectors according to a multivariate normal distribution and calculating the consequential accuracy.

In [None]:
def random_explore_gamma(n: int, sliced_table, ans_arr):
    rng = np.random.default_rng()
    best = 0.0
    gammas = np.abs(rng.multivariate_normal(np.zeros(4), np.identity(4), size=(n,)))
    accs = np.zeros(n)
    try:
        for (i, gamma) in enumerate(gammas):
            _, acc = evaluate_gamma(sliced_table, gamma, ans_arr)
            accs[i] = (acc)
            if acc > best:
                best = acc
                print(best)
    except KeyboardInterrupt:
        pass
    return gammas, accs

gammas, accs = random_explore_gamma(100000, table_train_pco, ans_train_pco)
best_idx = np.argmax(accs)
print(gammas[best_idx] / np.sum(gammas[best_idx]))
print(accs[best_idx])

0.8136272545090181
0.9619238476953907
0.9679358717434869
0.9719438877755511
0.9739478957915831
0.9759519038076152
[0.15175324 0.46592608 0.1358618  0.24645888]
0.9759519038076152


In [None]:
best_gamma = np.array([0.15175324, 0.46592608, 0.1358618,  0.24645888])

In [None]:
print(evaluate_gamma(table_train, best_gamma, ans_train)[1])
print(evaluate_gamma(table_train_pco, best_gamma, ans_train_pco)[1])
print(evaluate_gamma(table_test, best_gamma, ans_test)[1])
print(evaluate_gamma(table_test_pco, best_gamma, ans_test_pco)[1])
print(evaluate_gamma(table_both, best_gamma, ans_both)[1])
print(evaluate_gamma(table_both_pco, best_gamma, ans_both_pco)[1])

0.9732739420935412
0.9759519038076152
0.9823008849557522
0.9841269841269841
0.9715302491103203
0.9712


In [None]:
def display_errors(tb, gamma, ans, ocr_data, ocr_filenames, user_rows):
    applied = apply_gamma(tb, gamma)
    answers = np.argmax(applied, axis=1)
    ans_map = {row[0]: idx for (idx, row) in enumerate(user_rows)}
    out = [
        (
            idx,
            find_date([txt for (_, txt) in ocr_data[idx]]),
            find_money([txt for (_, txt) in ocr_data[idx]]),
            ocr_filenames[idx],
            user_rows[answers[idx]],
            user_rows[ans_map[ocr_filenames[idx][:-4]]]
        )
        for idx in np.argwhere(evaluate_gamma(tb, gamma, ans)[0] == False).flatten()
    ]
    print(len(out))
    return out
# display_errors(
#     table_test_pco,
#     best_gamma,
#     ans_test_pco,
#     ocr_data_test + ocr_data_cocr_test,
#     ocr_filenames_test + ocr_filenames_cocr_test,
#     users_test
# )
display_errors(
    table_both_pco,
    best_gamma,
    ans_both_pco,
    combined_ocr_data,
    combined_ocr_filenames,
    users_all
)

18


[(40,
  [(0, 0, 0, 0.0, 0)],
  [('23.90', 12),
   ('47.80', 13),
   ('8.90', 18),
   ('8.90', 19),
   ('3.00', 25),
   ('12.00', 26),
   ('0.20', 31),
   ('0.20', 32),
   ('0.60', 38),
   ('3.00', 39),
   ('71.90', 43),
   ('72.00', 45),
   ('0.10', 47),
   ('67.83', 52),
   ('4.07', 53)],
  '00d0849221655.csv',
  ['00d0547632877',
   '2.0',
   '2018-5-28',
   'MR. D.I.Y. (M) ZSDN BHD',
   'LOT 1851-A & 1851-B, JALAN KPB 6, KAWASAN PERINDUSTRIAN BALAKONG, 43300 SERI KEMBANFAN, SELANGOR'],
  ['00d0849221655',
   '71.90',
   '2018-2-3',
   'MR. D. I. Y. (M) SRDN  BHD',
   'LOT 18551-A & 1851-B, JALAN KPB 6, KAWASAN PERINDUSTRIAN BALAKONG, 43300 SERI KEMBANGAN, SELANGOR']),
 (60,
  [(17, 2, 10, 0.9, 8), (17, 2, 10, 0.9, 10)],
  [('7.00', 14),
   ('7.00', 17),
   ('0.42', 19),
   ('0.02', 21),
   ('7.40', 23),
   ('7.40', 25),
   ('0.00', 27)],
  '00d0123031653.csv',
  ['00d0826414442',
   '7.40',
   '22017-10-2',
   'B & BEST RESTAURNAT',
   'NO.12,JALAN SS4C/5,PETALING JAYA SELANGOR DARU

## Custom OCR Implementation
To match receipts that were not originally scanned by an OCR model, we apply the PaddleOCR package. We take the results and put them in an output directory, formatted according to the original OCR specifications.

In [None]:
#!pip install "tensorboard==2.10.1" --no-deps
#!pip install "paddlepaddle"
#!rm custom_ocr/*.csv
from paddleocr import PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, use_space_char=True, det_db_unclip_ratio=0.8, use_dilation=True, lang='en') # need to run only once to download and load model into memory

[2023/01/29 12:22:10] ppocr DEBUG: Namespace(help='==SUPPRESS==', use_gpu=False, use_xpu=False, use_npu=False, ir_optim=True, use_tensorrt=False, min_subgraph_size=15, precision='fp32', gpu_mem=500, image_dir=None, page_num=0, det_algorithm='DB', det_model_dir='/root/.paddleocr/whl/det/en/en_PP-OCRv3_det_infer', det_limit_side_len=960, det_limit_type='max', det_box_type='quad', det_db_thresh=0.3, det_db_box_thresh=0.6, det_db_unclip_ratio=0.8, max_batch_size=10, use_dilation=True, det_db_score_mode='fast', det_east_score_thresh=0.8, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_sast_score_thresh=0.5, det_sast_nms_thresh=0.2, det_pse_thresh=0, det_pse_box_thresh=0.85, det_pse_min_area=16, det_pse_scale=1, scales=[8, 16, 32], alpha=1.0, beta=1.0, fourier_degree=5, rec_algorithm='SVTR_LCNet', rec_model_dir='/root/.paddleocr/whl/rec/en/en_PP-OCRv3_rec_infer', rec_image_inverse=True, rec_image_shape='3, 48, 320', rec_batch_num=6, max_text_length=25, rec_char_dict_path='/root/venv/

In [None]:
def paddle_to_csv(paddle_result):
    csv_arr = []
    for subimage in paddle_result:
        for coords, (cur_text, confidence) in subimage:
            cur_row = []

            for coord in coords:
                cur_row.append(coord[0])
                cur_row.append(coord[1])

            cur_row.append(cur_text)
            csv_arr.append(cur_row)
    
    return csv_arr

In [None]:
import csv

unscanned = [f for f in os.listdir("img") if os.path.isfile(os.path.join("img", f)) and 
        f.endswith(".jpg") and 
        f[:-4]+".csv" not in all_ocr_filenames]

for (i,f) in enumerate(unscanned):
    cur_paddle = ocr.ocr(os.path.join("img", f))
    cur_out = paddle_to_csv(cur_paddle)
    new_csv = os.path.join("custom_ocr", f[:-4]+".csv")
    print(f"saving file {i}: {f}")
    with open(new_csv,"w") as f:
        csvWriter = csv.writer(f,delimiter=',')
        csvWriter.writerows(cur_out)
    
    

[2023/01/29 12:22:15] ppocr DEBUG: dt_boxes num : 49, elapse : 0.3086051940917969
[2023/01/29 12:22:16] ppocr DEBUG: cls num  : 49, elapse : 0.2546999454498291
[2023/01/29 12:22:19] ppocr DEBUG: rec_res num  : 49, elapse : 3.5685207843780518
saving file 0: 00d0592118004.jpg
[2023/01/29 12:22:20] ppocr DEBUG: dt_boxes num : 46, elapse : 0.26195406913757324
[2023/01/29 12:22:20] ppocr DEBUG: cls num  : 46, elapse : 0.2524094581604004
[2023/01/29 12:22:24] ppocr DEBUG: rec_res num  : 46, elapse : 4.063932418823242
saving file 1: 00d0937018331.jpg
[2023/01/29 12:22:24] ppocr DEBUG: dt_boxes num : 52, elapse : 0.21347618103027344
[2023/01/29 12:22:24] ppocr DEBUG: cls num  : 52, elapse : 0.2539067268371582
[2023/01/29 12:22:28] ppocr DEBUG: rec_res num  : 52, elapse : 3.53065824508667
saving file 2: 00d0667839698.jpg
[2023/01/29 12:22:28] ppocr DEBUG: dt_boxes num : 32, elapse : 0.34536194801330566
[2023/01/29 12:22:28] ppocr DEBUG: cls num  : 32, elapse : 0.16454124450683594
[2023/01/29 12

## Bipartite Matching
Using graph theory to improve our results: we apply a bipartite matching algorithm to ensure all OCR and user entries are as well matched as mathematically possible, according to our computed similarities. 

In [None]:
#!pip install networkx==3.0
import networkx as nx

'''
This function computes the optimal matchings between OCR files and user-inputted rows
of data based on a 2D array of scores. This function makes use of a bipartite matching 
algorithm that maximizes the total weight. 

Inputs:
    scores - a pre-calculated 2D array of scores, where scores[i][j] gives the likelihood
        that OCR file i corresponds to the j-th row of user entered data
Output: 
    returns a list of 2-element tuples corresponding to the selected edges in the matching 
'''
def compute_bipartite_matchings(scores):
    G = nx.Graph()

    ocr_nodes = [f"ocr{i}" for i in range(len(scores))]
    user_nodes = [f"usr{j}" for j in range(len(scores[0]))]

    G.add_nodes_from(ocr_nodes, bipartite = 0)
    G.add_nodes_from(user_nodes, bipartite = 1)

    for (i, score_row) in enumerate(scores):
        for (j, sc) in heapq.nlargest(15, enumerate(score_row), key=lambda iv: iv[1]):
            # add lower bound for valid score?
            G.add_edge(f"ocr{i}", f"usr{j}", weight = sc)
    print("edges added")
    return nx.max_weight_matching(G)

'''
Given a list of 2-element tuples as returned by compute_matchings,
checks the accuracy by comparing the filenames and the indices specified
in each tuple.
'''
def compute_bipartite_accuracy(matching):
    correct = 0
    total = 0

    for edge in matching:
        ocr_idx = int(edge[0][3:]) #if edge[0].startsWith("ocr") else int(edge[1][3:])
        usr_idx = int(edge[1][3:]) #if edge[1].startsWith("usr") else int(edge[0][3:])

        correct += (ans_map[all_ocr_filenames[ocr_idx][:-4]] == usr_idx)
        total += 1
    
    assert(total > 0)
    return correct / total


In [None]:
matchings = compute_bipartite_matchings(apply_gamma(best_gamma))
compute_bipartite_accuracy(matchings)
#matchings

0.9661921708185054

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=48896dc8-c471-4d04-a99c-128f77d26874' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>