This Notebook gives a demonstration of how to create a transformer based model for processing receipt data in sequence fashion. It leverages the BERT architecture (J Devlin, et al), used for token classiciation, and fine tuned on the dataset constructed from the receipts.

In [51]:
# Imports

!pip install transformers
from transformers import BertTokenizer, BertForTokenClassification
from transformers import AdamW, get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader
import torch
import numpy as np
import pandas as pd
from pprint import pprint as pp
from sklearn.model_selection import train_test_split
import copy
import pdb
from google.colab import drive
drive.mount('/content/drive')


# Global Data Structures

# Convert labels to ordinal values
label2idx = {
    'pad' : 0,
    'store_name' : 1,
    'store_address' : 2,
    'store_city' : 3,
    'store_state' : 4,
    'store_zip' : 5,
    'product_quantity' : 6,
    'product_description' : 7,
    'product_price' : 8,
    'receipt_subtotal' : 9,
    'store_number' : 10,
    'date' : 11,
    'time' : 12,
    'text' : 13
}

# Convert ordinal values back to labels
idx2label = {
    0 : 'pad',
    1 : 'store_name',
    2 : 'store_address',
    3 : 'store_city',
    4 : 'store_state',
    5 : 'store_zip',
    6 : 'product_quantity',
    7 : 'product_description',
    8 : 'product_price',
    9 : 'receipt_subtotal',
    10 : 'store_number',
    11 : 'date',
    12 : 'time',
    13 : 'text'
}

# Hyper Parameters
epochs = 4
batch_size = 8
learning_rate = 1e-5
epsilon = 1e-08

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Here, the daata is read from a CSV file into code. This could be done in a similar fashion from a JSON file, with data formatted similarly to the example given - CSV wass used for convenience. The CSV file must be saved at the appropriate location within google drive, or read from a local device.

In [54]:
# READ DATA
# Read Data into DataFrame. Stored in CSV format for simplicity.
df = pd.read_csv('/content/drive/MyDrive/sample_receipt_data.csv')

# Randomize Order of input data.
randomized = df.sample(frac=1)

# Get data as np array of text, coordinates, & lables respectively.
coordinateList = []
for coordinates in randomized['coordinates'].tolist():
  coordinates = np.array([int(c) for c in coordinates.split()])
  coordinateList.append(coordinates)
coordinateList = np.array(coordinateList)
textList = np.array(randomized['text'].tolist())
labelList = np.array([label2idx[label] for label in randomized['label'].tolist()])

The next pre-processing step is done to order the text values from the receipt

In [56]:
# SORT COORDINATE LISTS IN ORDER, AS THEIR ITEMS WOULD BE READ ON A RECEIPT
# (First TOP to BOTTOM, then LEFT to RIGHT)

# Sort data by TOP-HEIGHT.
sortedIndices = coordinateList[:, 1].argsort()
coordinateList = np.flip( coordinateList[sortedIndices], axis=0 )
textList = np.flip( textList[sortedIndices], axis=0 )
labelList = np.flip( labelList[sortedIndices], axis=0 )

# Sort all data in the same height range from LEFT-TO-RIGHT
topIndex = 0
bottomIndex = 0
while bottomIndex < len(coordinateList):

  # Get bottom-height of the top-most remaining item, and the range of all
  # successive coordinates who's top is above this cutoff
  # (Range is from bottomIndex to topIndex)
  heightCutoff = coordinateList[topIndex][-1]
  while bottomIndex < len(coordinateList) and \
        coordinateList[bottomIndex][1] >= heightCutoff:
    bottomIndex += 1

  # Sort all items in this item's height range by their leftmost values.
  sortedIndices = coordinateList[topIndex:bottomIndex, 0].argsort()
  coordinateList[topIndex:bottomIndex] = coordinateList[topIndex:bottomIndex][sortedIndices]
  textList[topIndex:bottomIndex] = textList[topIndex:bottomIndex][sortedIndices]
  labelList[topIndex:bottomIndex] = labelList[topIndex:bottomIndex][sortedIndices]

  # Adjust the top-most remaining item
  topIndex = bottomIndex

# Print sorted receipt data
print("Sorted Receipt Data\n")
pp(list(zip(coordinateList, textList, labelList)))

Sorted Receipt Data

[(array([-49,  80, -24,  80, -49,  76, -24,  76]), 'Murphy', 1),
 (array([-21,  82,  -5,  82, -21,  78,  -5,  78]), 'USA', 1),
 (array([-2, 81, 17, 81, -2, 77, 17, 77]), '7528', 10),
 (array([-48,  76, -17,  76, -48,  72, -17,  72]), '2745 10th Ave N', 2),
 (array([-49,  71, -23,  71, -49,  68, -23,  68]), 'Palm Springs', 3),
 (array([-20,  70, -15,  70, -20,  65, -15,  65]), 'FL', 4),
 (array([-14,  70,  -2,  70, -14,  65,  -2,  65]), '33461', 5),
 (array([-48,  64, -29,  64, -48,  61, -29,  61]), '12/11/2017', 11),
 (array([-24,  64,  -7,  64, -24,  61,  -7,  61]), '22:40:50', 12),
 (array([-3, 67, 19, 67, -3, 61, 19, 61]), 'Store#7528', 10),
 (array([-42,  22, -38,  22, -42,  17, -38,  17]), '1', 6),
 (array([-37,  23,   4,  23, -37,  17,   4,  17]), '305s KING FF BPACK', 7),
 (array([16, 23, 25, 23, 16, 17, 25, 17]), '$4.69', 8),
 (array([-43,  15, -38,  15, -43,  11, -38,  11]), '1', 6),
 (array([-37,  15,   3,  15, -37,  11,   3,  11]), '305s KING FF BPACK', 

Since we only have one dummy instance of data, it is duplicated here to create placeholder data for demonstration purposes.

In [57]:
# ** Due to lack of data, DUPLICATE receipt sequence values to serve as dummy data
numData = 1000
ReceiptTexts = [copy.deepcopy(textList) for _ in range(numData)]
ReceiptLabels = [copy.deepcopy(labelList) for _ in range(numData)]

In order for the data to be consumed by the model, the text strings must be broken down into token-ized components. The purpose of the below code is to do this while also keeping track of the labels for these tokens, duplicating the label of the original larger string for each sub token.

In [67]:
# DATA PREPROCESSING: TOKENIZE TEXT VALUES

# (For each string, tokenize into a consumable format for the model, and 
# duplicate it's label for each respective token)

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
maxLength = 64

TokenizedReceiptTexts = []
TokenizedReceiptTypeIds = []
TokenizedReceiptAttentionMasks = []
TokenizedReceiptLabels = []
# Iterate over each receipt instance
for i in range(numData):
  nextReceiptText = ReceiptTexts[i]
  nextReceiptLabels = ReceiptLabels[i]

  assert( len(nextReceiptText) == len(nextReceiptLabels) )
  textLen = len(nextReceiptText)

  # Iterate over each text value in given receipt instance
  tokenizedReceiptText = []
  receiptTokenTypeIds = []
  receiptAttentionMasks = []
  tokenizedReceiptLabels = []
  for j in range(textLen):

    textValue = nextReceiptText[j]
    labelValue = nextReceiptLabels[j]

    # Tokenize the text; duplicate its labels accordingly
    try:
      tokenized = tokenizer.encode_plus(textValue, add_special_tokens=False)
      tokenizedText = tokenized['input_ids']
      tokenTypeIds = tokenized['token_type_ids']
      attentionMasks = tokenized['attention_mask']

      #print(textValue)
      #print(tokenizedText)
      #print(tokenTypeIds)
      #print(attentionMasks)
    except:
      pdb.set_trace()

    tokenizedLabels = [copy.deepcopy(labelValue) for _ in range(len(tokenizedText))]

    # Add new tokenized text / labels to new instance (to overwrite the original)
    tokenizedReceiptText.extend(tokenizedText)
    receiptTokenTypeIds.extend(tokenTypeIds)
    receiptAttentionMasks.extend(attentionMasks)
    tokenizedReceiptLabels.extend(tokenizedLabels)
  
  # Truncate to max length (if necessary); save space for 2 special tokens
  if len(tokenizedReceiptText) > maxLength-2:
    tokenizedReceiptText = tokenizedReceiptText[:maxLength-2]
    receiptTokenTypeIds = receiptTokenTypeIds[:maxLength-2]
    receiptAttentionMasks = receiptAttentionMasks[:maxLength-2]
    tokenizedReceiptLabels = tokenizedReceiptLabels[:maxLength-2]

  # Add special tokens
  tokenizedReceiptText.insert(0, 101)
  tokenizedReceiptText.append(102)
  receiptTokenTypeIds.insert(0, 0)
  receiptTokenTypeIds.append(0) 
  receiptAttentionMasks.insert(0, 1)
  receiptAttentionMasks.append(1)
  tokenizedReceiptLabels.insert(0, 0)
  tokenizedReceiptLabels.append(0)

  # Pad to max length (if necessary)
  textLength = len(tokenizedReceiptText)
  padLength = maxLength - textLength
  if padLength > 0:
    tokenizedReceiptText.extend([0 for _ in range(padLength)])
    receiptTokenTypeIds.extend([0 for _ in range(padLength)])
    receiptAttentionMasks.extend([0 for _ in range(padLength)])
    tokenizedReceiptLabels.extend([0 for _ in range(padLength)])

  # Overwrite receipt text with fully tokenized text
  TokenizedReceiptTexts.append(tokenizedReceiptText)
  TokenizedReceiptTypeIds.append(receiptTokenTypeIds)
  TokenizedReceiptAttentionMasks.append(receiptAttentionMasks)
  TokenizedReceiptLabels.append(tokenizedReceiptLabels)

Now, the data is prepared for model consumption by converting them to tensors (vectors with associated gradients for learaning), splitting into a train / test set, and batching into a pytorch dataloader for easy iteration. Of course, all of the data here is still dummy data / for the purposes of demonstration.

In [61]:
# Convert to Tensors
TokenizedReceiptTexts = torch.tensor(TokenizedReceiptTexts)
TokenizedReceiptTypeIds = torch.tensor(TokenizedReceiptTypeIds)
TokenizedReceiptAttentionMasks = torch.tensor(TokenizedReceiptAttentionMasks)
TokenizedReceiptLabels = torch.tensor(TokenizedReceiptLabels)

# Train Test Split
text_train, text_test, \
ttids_train, ttids_test, \
attn_train, attn_test, \
labels_train, labels_test = train_test_split(TokenizedReceiptTexts,
                                             TokenizedReceiptTypeIds,
                                             TokenizedReceiptAttentionMasks,
                                             TokenizedReceiptLabels)

# Prepare Data Loaders
train_dataloader = DataLoader(TensorDataset(text_train, ttids_train, attn_train, labels_train), batch_size=batch_size)

Now it's time to train the model! In this caase, the training data is 

In [None]:
# Train Model (Fine-Tune Bert for this token classification task)
# ** slow to run without GPU **
model = BertForTokenClassification.from_pretrained('bert-base-uncased')
optimizer = AdamW(model.parameters(), lr=learning_rate, eps=epsilon)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_dataloader)*epochs)

model.train()
for epoch in range(epochs):

  for step, batch in enumerate(train_dataloader):
    batch_ids, batch_mask, batch_types, batch_lbls = batch
    outputs = model(input_ids=batch_ids, token_type_ids=batch_types,
                                attention_mask=batch_mask, labels=batch_lbls)

    loss = outputs[0]
    loss.backward()
    optimizer.step()
    scheduler.step()
    model.zero_grad()

Now, the model should give predictions based on the training data. Of course, it will need a diverse set of training data to give proper predictions.

In [None]:
# Test Output
model.eval()
outputs = model(input_ids=text_test, token_type_ids=ttids_test, attention_mask=attn_test, labels=labels_test)
predictions = outputs[1]

In [63]:
# Helper Function to Group Model Output
def get_product_ranges_in_sequence(texts, labels, predictions):
  #pdb.set_trace()

  product_labels = []
  current_label_range = []
  product_values = []
  current_value_range = []
  for i in range(len(labels)):
    text = texts[i]
    label = labels[i]

    for j in range(len(label)):
      # Concatenate any contiguous labels that are "product" oriented
      # (Labels corresponding to product quantity, description, or price)
      if label[j] in [6, 7, 8]:
        current_label_range.append(idx2label[int(label[j])])
        current_value_range.append(text[j])

      elif current_label_range:
        product_labels.append(current_label_range)
        product_values.append(current_value_range)
        current_label_range = []
        current_value_range = []

  return product_labels, product_values

In [68]:
# GROUP MODEL OUTPUT FROM TEST PREDICTIONS
product_labels, product_values = get_product_ranges_in_sequence(text_test, labels_test, _)
product_groups = []

# Regroup each receipt instance
for instance in range(len(product_labels)):
  product_group = {}
  s = 0
  e = 0

  # Regroup each product within the receipt
  while e < len(product_labels[instance]):
      if product_labels[instance][e] != product_labels[instance][s]:
        regrouped_text = tokenizer.decode(product_values[instance][s:e])
        product_group[product_labels[instance][s]] = regrouped_text
        s = e
      e += 1

  product_groups.append(product_group)

print("Regrouped Output:\n")
pp(product_groups)

Regrouped Output:

[{'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king ff bpack',
  'product_price': '$ 4. 69',
  'product_quantity': '1'},
 {'product_description': '305s king 