## Parse the total sum from recipt OCR data using xgboost classifier
XGBoost model is trained using SROIE2019 Dataset
Implemented as a binary classifier of tokens detected by OCR (Paddle OCR) pipeline to reprepsent total amount by recpit (1) or not (0)
Model rely on spatial features derived from bbox: coordinates, continous and discrete representation of coordinates (row/column) in recipt, relative features (is the same row as "TOTAL" token), and textual features: is token a digit, digit value etc.

In [7]:
import os
import cv2
import json
import re
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.utils.class_weight import compute_class_weight
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder

In [8]:
# CONSTANTS
# data load
TRAIN_FOLDER = "/home/gleb_siz/ml_training/data/SROIE2019/train"
TEST_FOLDER = "/home/gleb_siz/ml_training/data/SROIE2019/test"
FEATURES = [
## file level
    "file_aspect_ratio",
    "x_max",
    "token_width",
    "token_heigh",
    "aspect_ratio",
    "row",
    "col",
    "row_rank",
    "col_rank",
    "has_total_keyword_in_row",
    "tokens_in_col",
    "tokens_in_row",
    "text_length",
    "is_digit",
    'font_size',
    'row_dist_from_total',
    "value",
    "rows_in_col",
    "cols_in_row",
    "has_total_below",

    # "n_tokens",
    # "width",
    # "heigh",
    # "tokens_top",
    # "tokens_bottom",
    # "tokens_left",
    # "tokens_right",  
    # "y_max",
    # "section",
    # "section_rank",
    # "tokens_in_section",
    ]

total_keywords = [
    "total", 
    # "amount", 
    "sum", 
    "suma", 
    "suma pln",
    "sprzedaz",
    # "subtotal", 
    # "balance", 
    # "due", 
    # "payment", 
    # "payable", 
    # "importe", 
    # "together", 
    # "totale"
]
pattern = re.compile(r'\b(' + '|'.join(total_keywords) + r')\b', re.IGNORECASE)
PRICE_PATTERN = re.compile(
    r'^[A-Za-z $€¥£]*[:=]?\s*\$?\s*[RM]*[+-]?([\d]*[\d.,]+)[ RMDHS]*$'
)
# Combined regex for numeric and short textual dates
date_pattern = re.compile(
    r'('
    r'\b\d{4}[-/\.]\d{1,2}[-/\.]\d{1,2}\b|'        # YYYY-MM-DD or YYYY/MM/DD
    r'\b\d{1,2}[-/\.]\d{1,2}[-/\.]\d{2,4}\b|'      # DD/MM/YYYY or DD.MM.YY
    r'\b\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|'
    r'January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{2,4}\b'
    r')'
)

In [None]:
# UTILS

def match_price(text):
    match = re.search(PRICE_PATTERN, text)
    if not match:
        return False
    else:
        return True

def extract_price(text):
    match = re.search(PRICE_PATTERN, text)
    if not match:
        return None
    num = match.group(1).replace(',', '.')        # normalize commas
    parts = num.split('.')                        # split by dot
    if len(parts) == 1:
        return float(parts[0])                    # just a plain integer
    integer_part = ''.join(parts[:-1])            # join everything except last
    decimal_part = parts[-1]
    try:
        return float(f"{integer_part}.{decimal_part}")
    except ValueError:
        return None


def load_ocr(path):
    ocr_output_df = pd.DataFrame()
    for f in os.listdir(f"{path}/box"):
        file = f"{path}/box/{f}"
        with open(file) as fl:
            try:
                lines = fl.readlines()
                df = pd.DataFrame(lines, columns=['raw'])
                df['file'] = f
                ocr_output_df = pd.concat([ocr_output_df, df])
            except Exception as e:
                print("Failed to process:", file)
    ocr_output_df[['x1', 'y1', 'x2', 'y2', 'x3', 'y3', 'x4', 'y4']] = ocr_output_df.apply(lambda x: pd.Series(x['raw'].split(',')[:8]), axis=1)
    ocr_output_df['text'] = ocr_output_df.apply(lambda x: ','.join(x['raw'].split(',')[8:]).replace('\n', ''), axis=1)
    ocr_output_df['file'] = ocr_output_df['file'].apply(lambda x: x.split('.')[0])
    ocr_output_df = ocr_output_df.replace('\n', '')
    ocr_output_df = ocr_output_df.dropna()
    for col in ['x1', 'y1', 'x2', 'y2', 'x3', 'y3', 'x4', 'y4']:
        ocr_output_df[col] = ocr_output_df[col].astype(int)
    ocr_output_df['x_max'] = ocr_output_df.apply(lambda x: max(x['x1'], x['x2'], x['x3'], x['x4']), axis=1)
    ocr_output_df['y_max'] = ocr_output_df.apply(lambda x: max(x['y1'], x['y2'], x['y3'], x['y4']), axis=1)
    ocr_output_df['x_min'] = ocr_output_df.apply(lambda x: min(x['x1'], x['x2'], x['x3'], x['x4']), axis=1)
    ocr_output_df['y_min'] = ocr_output_df.apply(lambda x: min(x['y1'], x['y2'], x['y3'], x['y4']), axis=1)
    return ocr_output_df


def load_img_data(path):
    img_data = []
    for f in os.listdir(f"{path}/img"):
        file = f"{path}/img/{f}"
        img = cv2.imread(file)
        heigh, width, _ = img.shape
        img_data.append((f, width, heigh))
    img_df = pd.DataFrame(img_data, columns=['file', 'width', 'heigh'])
    img_df['file'] = img_df['file'].apply(lambda x: x.split('.')[0])
    return img_df


def load_entity_data(path):
    labels_df = pd.DataFrame()
    labels = []
    for f in os.listdir(f"{path}/entities"):
        file = f"{path}/entities/{f}"
        with open(file) as fl:
            item = json.loads(fl.read())
            for k, v in item.items():
                labels.append({'label': k, 'text': v, 'file': f.split('.')[0]})
            labels_df = pd.DataFrame(labels)
    return labels_df

def match_labels(train, labels):
    matched_labels = []

    for _, row in train.iterrows():
        file = row['file']
        text = str(row['text'])
        subset = labels[labels['file'] == file]

        matched_label = None
        for _, lab in subset.iterrows():
            lab_text = str(lab['text'])
            # Check substring both ways for robustness
            if lab_text in text or text in lab_text:
                matched_label = lab['label']
                break

        matched_labels.append(matched_label if matched_label else 'other')

    train['label'] = matched_labels
    return train


def add_features(df):
    
    df['width'] = df.groupby(['file'])['x_max'].transform("max")
    df['heigh'] = df.groupby(['file'])['y_max'].transform("max")
    df['file_aspect_ratio'] = df.apply(lambda x: x['width'] / (x['heigh'] + 0.00001), axis=1)
    # df['n_tokens'] = df.groupby('file')['text'].transform('count')

    df['x_max'] = (df['x_max'] / df['width']).round(2)
    df['y_max'] = (df['y_max'] / df['heigh']).round(2)
    df['x_min'] = (df['x_min'] / df['width']).round(2)
    df['y_min'] = (df['y_min'] / df['heigh']).round(2)
    df['token_width'] = df['x_max'] - df['x_min']
    df['token_heigh'] = df['y_max'] - df['y_min']
    df['avg_font'] = df.groupby('file')['token_heigh'].transform("mean")
    df['font_size'] = df['token_heigh'] / df['avg_font']
    df['aspect_ratio'] = df.apply(lambda x: x['token_width'] / (x['token_heigh'] + 0.00001), axis=1)
    df['y_center'] = (df['y_max'] + df['y_min']) / 2
    df['x_center'] = (df['x_max'] + df['x_min']) / 2
    df['x_center_file'] = df.groupby(['file'])['x_center'].transform("mean").round(2)
    df['y_center_file'] = df.groupby(['file'])['y_center'].transform("mean").round(2)
    df['row'] = df['y_center'].round(2)
    df['col'] = df['x_center'].round(1)
    df['row_rank'] = df.groupby('file')['row'].rank(method='dense', ascending=True)
    df['col_rank'] = df.groupby('file')['col'].rank(method='dense', ascending=True)
    df['tokens_in_col'] = df.groupby(['file', 'col'])['text'].transform('count')
    df['tokens_in_row'] = df.groupby(['file', 'row'])['text'].transform('count') 
    df['rows_in_col'] = df.groupby(['file', 'col'])['row_rank'].transform('max')
    df['cols_in_row'] = df.groupby(['file', 'row'])['col_rank'].transform('max') 
    df['has_total_keyword'] = df['text'].apply(lambda t: bool(pattern.search(t)))
    df['has_total_keyword_in_row'] = df.groupby(['file', 'row'])['has_total_keyword'].transform("max")

    totals = df[df['has_total_keyword']][['file', 'row']]
    df = df.merge(totals, on=['file'], how='left', suffixes=('', '_total'))
    df['row_dist_from_total']= np.abs(df['row'] - df['row_total'])
    df = df.sort_values(['file', 'text', 'x_center', 'y_center', 'row_dist_from_total']).groupby(['file', 'text', 'x_center', 'y_center'], as_index=False).first()
    df = df.drop('row_total', axis=1)

    df = df.sort_values(['file', 'row'])
    df['has_total_below'] = (
        df.groupby('file')['has_total_keyword']
        .transform(lambda x: x.iloc[::-1].cummax().iloc[::-1])
    )
    df['has_total_below'] = df.groupby('file')['has_total_below'].shift(-1).fillna(False)

    df['text_length'] = df['text'].apply(lambda x: len(x))
    df['is_digit'] = df['text'].apply(match_price)
    df['value'] = df['text'].apply(extract_price)

    df['contains_date'] = df['text'].apply(lambda x: bool(date_pattern.search(x)))
    df['tokens'] = df.groupby(['file', 'row'])['text'].transform(''.join)
    
    df = df.reset_index()

    # df['section'] = pd.cut(df['x_center'], bins=[0.0, 0.25, 0.5, 0.75, 1.0], labels=[1, 2, 3, 4], include_lowest=True)
    # df['section_rank'] = df.groupby('file')['section'].rank(method='dense', ascending=True)
    # df['section'] = LabelEncoder().fit_transform(df['section'])
    # df['tokens_in_section'] = df.groupby(['file', 'section'])['text'].transform('count') 
    # df['top'] = df['row'] < 0.5
    # df['bottom'] = df['row'] >= 0.5
    # df['left'] = df['col'] < 0.5
    # df['right'] = df['col'] >= 0.5
    # df['tokens_top'] = df.groupby(['file'])['top'].transform('sum')
    # df['tokens_bottom'] = df.groupby(['file'])['bottom'].transform('sum')
    # df['tokens_left'] = df.groupby(['file'])['left'].transform('sum')
    # df['tokens_right'] = df.groupby(['file'])['right'].transform('sum')
    
    return df


def fix_total_label(df, threshold=0.015):
    # word total in a row with total label by least diff
    total_keywords = df.copy()
    totals = df.copy()
    total_keywords = total_keywords[total_keywords['has_total_keyword']][['file', 'text', 'has_total_keyword', 'y_center', 'y_max']]
    total_keywords['label'] = 'total'
    totals = totals[totals['label']=='total']

    totals = totals.merge(total_keywords, on=['file'], suffixes=['', '_total'])
    totals['diff_wtotal'] = totals['y_center'] - totals['y_center_total']
    totals = totals.loc[lambda x: x['diff_wtotal'].abs() <= threshold]

    totals = totals.sort_values(['file', 'label', 'y_min'], ascending=[True, True, False])
    totals = totals.groupby(['file', 'label'], as_index=False).first()
    totals = totals[['file', 'text', 'x_min', 'y_min', 'x_max', 'y_max', 'label']]

    df = df.merge(totals, on=['file', 'text', 'x_min', 'y_min', 'x_max', 'y_max',], how='left', suffixes=('', '_true'))
    df['label_true'] = df['label_true'].fillna('other')
    df.loc[lambda x: x['label'] == 'total', 'label'] = df.loc[lambda x: x['label'] == 'total', 'label_true']
    df['label'] = df['label'].apply(lambda x: 1 if x == 'total' else 0)
    
    # ad-hoc fix for own data input
    df.loc[lambda x: (x['file'] == '5807493927290997517')
           & (x['text'] == '"4,58"')
           & (x['y_center'] >= 0.330 ), "label"
           ] = 1

    return df


def plot_labels(df, image_source="/home/gleb_siz/ml_training/data/SROIE2019/train/img"):
    file = f"{image_source}/{df['file'].iloc[0]}.jpg"
    img = cv2.imread(file)
    for i, row in df.iterrows():
        x1, y1, x2, y2 = row.x1, row.y1, row.x3, row.y3
        if row.label ==1:
            color = (255, 0, 0)
        else:
            color = (0, 255, 0)
        cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
    img = cv2.resize(img, (800, 1000))
    cv2.imshow("OCR Tokens", img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [None]:
ocr_df = load_ocr(TRAIN_FOLDER)
img_df = load_img_data(TRAIN_FOLDER)
labels_df = load_entity_data(TRAIN_FOLDER)

In [None]:
# TRAIN DATA
df = ocr_df.copy()
train_df = match_labels(df, labels_df)
train_df = add_features(train_df)
features_df = fix_total_label(train_df)
with_total_files = features_df.loc[lambda x: x['label'] == 1]['file'].unique()
features_df = features_df[features_df['file'].isin(with_total_files)]
y = features_df['label']
X = features_df[FEATURES]

# TEST DATA
test_ocr_df = load_ocr(TEST_FOLDER)
test_img_df = load_img_data(TEST_FOLDER)
test_labels_df = load_entity_data(TEST_FOLDER)

df = test_ocr_df.copy()
test_df = match_labels(df, test_labels_df)
test_df = add_features(test_df)
test_features_df = fix_total_label(test_df)
with_total_files = test_features_df.loc[lambda x: x['label'] == 1]['file'].unique()
test_features_df = test_features_df[test_features_df['file'].isin(with_total_files)]
y_test = test_features_df['label']
X_test = test_features_df[FEATURES]

In [None]:
# y_pred = model.predict(X_test)
test_output = X_test
test_output['label'] = y_pred
test_output['text'] = test_df['text']
test_output['file'] = test_df['file']
test_output[['neg', 'pos']] = y_pred_proba

In [None]:
test_output.loc[lambda x: ~x['file'].isin(with_total_files), :].sort_values(['file', 'pos'], ascending=[True, False]).groupby('file', as_index=False).first()[
    ['file', 'text', 'label', 'pos']
]

In [None]:
# # ==============================================
# # 6️⃣ Train XGBoost
# # ==============================================

le = LabelEncoder()

y_train = le.fit_transform(y)
y_test = le.fit_transform(y_test)
classes = np.unique(y_train)
weights = compute_class_weight( 'balanced', classes=classes, y=y_train)
class_weight_dict = dict(zip(classes, weights))

# Assign weight to each sample
sample_weights = np.array([class_weight_dict[label] for label in y_train])

model = xgb.XGBClassifier(
    # objective="multi:softmax",
    objective="binary:logistic",
    # scale_pos_weight=300,
    # num_class=len(le.classes_),
    n_estimators=500,
    learning_rate=0.1,
    max_depth=5,
    subsample=0.9,
    colsample_bytree=0.8,
    reg_lambda=1.0,
    # reg_alpha=0.1,
    # min_child_weight=2,
    use_label_encoder=False,
    # eval_metric="mlogloss",
    early_stopping_rounds=50,
)
_, X_val, _, y_val = train_test_split(
    X_test, y_test, test_size=0.2, stratify=y_test, random_state=42
)
model.fit(X, 
          y_train, 
           eval_set=[(X_val, y_val),
                    ],
          sample_weight=sample_weights,
          verbose=5
          )

In [None]:
# Save model
from datetime import datetime 
best = "~/ml_training/model/best.ubj"
datestamp = datetime.now().strftime("%Y%m%d%H%M")
archive = "~/ml_training/model/archive/best_{datestamp}.ubj"
model.save_model(best)
model.save_model(archive)

In [None]:
# # ==============================================
# # 7️⃣ Evaluate
# # ==============================================

model.load_model(best)
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
y_pred_proba = model.predict_proba(X_test)  # shape: (n_samples, n_classes)
roc_auc = roc_auc_score(y_test, y_pred, 
                        # multi_class='ovr'
                        )  # or 'ovo'
print("ROC-AUC:", roc_auc)

In [None]:
# Feature importance plot
xgb.plot_importance(model, importance_type='gain')

In [None]:
# useful layout graphs
import matplotlib.pyplot as plt
df = features_df[features_df['label'] == 1]
plt.scatter(df['x_min'], df['y_min'], c=df['label'].astype('category').cat.codes)
plt.gca().invert_yaxis()  # receipts have origin at top
plt.show()

In [None]:
# Real data testing
from paddleocr import PaddleOCR
from paddlex.inference.pipelines.ocr.result import OCRResult
import cv2
from PIL import Image

# custom preprocessing
ocr = PaddleOCR(
    lang="pl",
    use_doc_orientation_classify=True, 
    use_doc_unwarping=False, 
    use_textline_orientation=False) # supports many langs
outputs = pd.DataFrame()
path = '/home/gleb_siz/ml_training/data/validation/img'
files = os.listdir(path)
for file in files:
    results = ocr.predict(f"{path}/{file}")
    df = pd.DataFrame(results[0]['rec_boxes'], columns=['x_min', 'y_min', 'x_max', 'y_max'])
    df['file'] = file
    df['text'] = results[0]['rec_texts']
    df['score'] = results[0]['rec_scores']
    df[df['score']>0.50].sort_values('text')
    df = df[['x_min', 'y_min', 'x_max', 'y_max', 'text', 'file']]
    for col in ['x_min', 'y_min', 'x_max', 'y_max',]:
        df[col] = df[col].astype(int)
    outputs = pd.concat([outputs, df])
outputs = outputs.reset_index()


In [None]:
outputs_df = outputs.copy()
X_sample = add_features(outputs_df)
# X_sample = X_sample[X_sample['is_digit']]
y_sample = model.predict(X_sample[FEATURES])
probabilities = model.predict_proba(X_sample[FEATURES])
X_sample['pred'] = y_sample
X_sample = pd.concat([X_sample, pd.DataFrame(probabilities, columns=["neg", "pos"])], axis = 1)
X_sample.sort_values(['file', 'pos'], ascending=[True, False]).groupby('file', as_index=False).first()[['file', 'text', 'value', 'pred', 'pos']]

In [None]:
# file	text	value	pred	pos
# 0	5807493927290997513.jpg	9,99	9.99	0	0.045834
# 1	5807493927290997514.jpg	45,97	45.97	1	0.999933
# 2	5807493927290997515.jpg	118,50	118.50	1	0.997495
# 3	5807493927290997516.jpg	8,29	8.29	1	0.985464
# 4	5807493927290997517.jpg	4,58	4.58	0	0.103868
# 5	5807493927290997518.jpg	PLN 49,98	49.98	1	0.998811
# 6	5807493927290997519.jpg	49,98	49.98	0	0.314266
# 7	5807493927290997524.jpg	24,96	24.96	1	0.801430


In [None]:
# SHAP analysis useful for feature actuall contribution into output, 
# more robust feature importance
import shap

# assume model and X are your trained model and feature DataFrame
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_sample[FEATURES])
shap.summary_plot(shap_values, X_sample[FEATURES])

In [10]:
# E2E run
import os
import pandas as pd
import xgboost as xgb
from paddleocr import PaddleOCR
from paddlex.inference.pipelines.ocr.result import OCRResult
import cv2
from PIL import Image
path = '/home/gleb_siz/ml_training/data/validation/img'
model_path = '/home/gleb_siz/ml_training/model/best.ubj'
# custom preprocessing
ocr = PaddleOCR(
    lang="pl",
    use_doc_orientation_classify=True, 
    use_doc_unwarping=False, 
    use_textline_orientation=False) # supports many langs
model = xgb.XGBClassifier()
model.load_model(model_path)
outputs = pd.DataFrame()

files = os.listdir(path)
for file in files:
    results = ocr.predict(f"{path}/{file}")
    df = pd.DataFrame(results[0]['rec_boxes'], columns=['x_min', 'y_min', 'x_max', 'y_max'])
    df['file'] = file
    df['text'] = results[0]['rec_texts']
    df['score'] = results[0]['rec_scores']
    df[df['score']>0.50].sort_values('text')
    df = df[['x_min', 'y_min', 'x_max', 'y_max', 'text', 'file']]
    for col in ['x_min', 'y_min', 'x_max', 'y_max',]:
        df[col] = df[col].astype(int)
    outputs = pd.concat([outputs, df])
outputs = outputs.reset_index()
outputs_df = outputs
X_sample = add_features(outputs_df)
y_sample = model.predict(X_sample[FEATURES])
X_sample['pred'] = y_sample
X_sample[X_sample['pred']==1][['file', 'text', 'value']]

[32mCreating model: ('PP-LCNet_x1_0_doc_ori', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/gleb_siz/.paddlex/official_models/PP-LCNet_x1_0_doc_ori`.[0m
[32mCreating model: ('PP-OCRv5_server_det', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/gleb_siz/.paddlex/official_models/PP-OCRv5_server_det`.[0m
[32mCreating model: ('latin_PP-OCRv5_mobile_rec', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/gleb_siz/.paddlex/official_models/latin_PP-OCRv5_mobile_rec`.[0m
  df['has_total_below'] = df.groupby('file')['has_total_below'].shift(-1).fillna(False)


Unnamed: 0,file,text,value
18,5807493927290997513.jpg,999,9.99
75,5807493927290997514.jpg,2598,25.98
83,5807493927290997514.jpg,4597,45.97
136,5807493927290997515.jpg,11850,118.5
163,5807493927290997516.jpg,499,4.99
172,5807493927290997516.jpg,829,8.29
199,5807493927290997517.jpg,458,4.58
265,5807493927290997518.jpg,"PLN 49,98",49.98
300,5807493927290997519.jpg,4998,49.98
344,5807493927290997524.jpg,2496,24.96


In [12]:
X_sample = add_features(outputs_df)
y_sample = model.predict(X_sample[FEATURES])
X_sample['pred'] = y_sample
X_sample[X_sample['pred']==1][['file', 'text', 'value']]

  df['has_total_below'] = df.groupby('file')['has_total_below'].shift(-1).fillna(False)


Unnamed: 0,file,text,value
18,5807493927290997513.jpg,999,9.99
73,5807493927290997514.jpg,1999,19.99
75,5807493927290997514.jpg,2598,25.98
81,5807493927290997514.jpg,566,5.66
83,5807493927290997514.jpg,4597,45.97
136,5807493927290997515.jpg,11850,118.5
163,5807493927290997516.jpg,499,4.99
172,5807493927290997516.jpg,829,8.29
199,5807493927290997517.jpg,458,4.58
265,5807493927290997518.jpg,"PLN 49,98",49.98
