In [None]:
import os, re, random
import numpy as np
import pandas as pd
import torch

from typing import List, Dict
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score

from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, TrainingArguments,Trainer




In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

CSV_PATH = "/content/drive/MyDrive/WQF7002_GrpAssign/sroie_ocr_output_v2.csv"
OUT_DIR = "/content/drive/MyDrive/WQF7002_GrpAssign"
os.makedirs(OUT_DIR, exist_ok=True)

MODEL_DIR = os.path.join(OUT_DIR, "item_classifier_model")
AUTO_LABELED_CSV = os.path.join(OUT_DIR, "m4_items_auto_labeled.csv")

BASE_MODEL = "distilbert-base-uncased"

In [None]:
LABELS = [
    "fresh_food",
    "processed_food",
    "sugary_drink",
    "single_use_plastic",
    "household_chemical",
    "eco_friendly",
    "non_essential",
    "other",
]
label2id = {l:i for i,l in enumerate(LABELS)}
id2label = {i:l for l,i in label2id.items()}


In [None]:
#text normalization
def normalize_text(s: str) -> str:
    if not isinstance(s, str): return ""
    s = s.lower()
    s = re.sub(r"\b(rm|myr|usd)\b", " ", s)
    s = re.sub(r"\b\d{1,3}(?:,\d{3})*(?:\.\d{2})\b", " ", s)  # prices
    s = re.sub(r"\b\d+\b", " ", s)  # numbers
    s = re.sub(r"[^a-z\s\-\&\/]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

#filter out non-item lines
def looks_like_non_item(line: str) -> bool:
    if not isinstance(line, str):
        return True

    low = line.strip().lower()
    if len(low) < 2:
        return True

    # lines that are mostly non-letters
    if sum(c.isalpha() for c in low) <= 1:
        return True

    # receipt summary keywords
    blacklist = [
        "total", "subtotal", "tax", "gst", "sst", "vat",
        "cash", "change", "invoice", "receipt", "thank",
        "date", "time", "table", "cashier", "server",
        "rounding", "service", "summary", "amount", "balance",
        "tel", "phone", "address",
    ]
    if any(k in low for k in blacklist):
        return True

    # money-heavy lines often contain RM
    if " rm " in f" {low} ":
        return True

    return False


#extract item from ocr_text class
def extract_candidate_item_lines(ocr_text: str, max_lines: int = 25) -> List[str]:
    if not isinstance(ocr_text, str) or not ocr_text.strip():
        return []

    lines = [ln.strip() for ln in ocr_text.splitlines() if ln.strip()]
    items = []

    for ln in lines:
        # remove trailing price at end of line (common)
        ln2 = re.sub(r"\s+\d{1,3}(?:,\d{3})*(?:\.\d{2})\s*$", "", ln.strip())

        if looks_like_non_item(ln2):
            continue

        clean = normalize_text(ln2)
        if clean and len(clean) >= 2:
            items.append(clean)

    # de-duplicate while preserving order
    seen, uniq = set(), []
    for it in items:
        if it not in seen:
            uniq.append(it)
            seen.add(it)

    return uniq[:max_lines]


#check on the code functionality
df_ocr = pd.read_csv(CSV_PATH)
print(df_ocr.columns)
print("Receipts:", len(df_ocr))

Index(['image_name', 'ocr_text', 'total_price', 'items'], dtype='object')
Receipts: 626


In [None]:
rows = []
for _, r in df_ocr.iterrows():
    rid = r["image_name"]
    total_price = r.get("total_price", None)
    item_lines = extract_candidate_item_lines(r.get("ocr_text", ""), max_lines=25)

    for line in item_lines:
        rows.append({
            "receipt_id": rid,
            "total_price": total_price,
            "item_line": line
        })

df_items = pd.DataFrame(rows).drop_duplicates(subset=["receipt_id", "item_line"]).reset_index(drop=True)
print("Extracted item lines:", len(df_items))
df_items.head()


Extracted item lines: 9280


Unnamed: 0,receipt_id,total_price,item_line
0,X51005441408.jpg,71.95,pub & bistro own by cnu trading
1,X51005441408.jpg,71.95,jalan ss /
2,X51005441408.jpg,71.95,damansara utama
3,X51005441408.jpg,71.95,petaling jaya
4,X51005441408.jpg,71.95,inv no pax s


In [None]:
#keyword rules per category
CATEGORY_RULES = {
    "fresh_food": [
        "apple","banana","orange","grape","tomato","onion","potato","carrot",
        "lettuce","spinach","egg","chicken","fish"
    ],
    "processed_food": [
        "chips","crisps","candy","chocolate","biscuit","cookie",
        "instant","noodle","sausage","nugget","frozen"
    ],
    "sugary_drink": [
        "coke","cola","pepsi","sprite","fanta",
        "soft drink","soda","energy drink","red bull","milk tea","bubble tea"
    ],
    "single_use_plastic": [
        "plastic","bottle","straw","disposable","cup","fork","spoon","plate","bag"
    ],
    "household_chemical": [
        "detergent","bleach","cleaner","soap","shampoo","toothpaste","disinfect","sanitizer"
    ],
    "eco_friendly": [
        "reusable","refill","bulk","biodegradable","recycled","paper bag"
    ],
    "non_essential": [
        "toy","gadget","perfume","cosmetic","makeup","accessory","decor"
    ],
}


In [None]:
#auto label
def auto_label_item(text: str):
    if not isinstance(text, str) or not text.strip():
        return "other", 0.0

    t = text.lower()
    best_label = "other"
    best_score = 0

    for label, keywords in CATEGORY_RULES.items():
        score = sum(kw in t for kw in keywords)
        if score > best_score:
            best_score = score
            best_label = label

    if best_score == 0:
        return "other", 0.25
    if best_score == 1:
        return best_label, 0.60
    return best_label, min(0.90, 0.60 + 0.10 * best_score)

labs, confs = [], []
for line in df_items["item_line"]:
    lab, conf = auto_label_item(line)
    labs.append(lab)
    confs.append(conf)

df_items["auto_label"] = labs
df_items["auto_confidence"] = confs

df_items[["item_line","auto_label","auto_confidence"]].head(20)

CONF_THRESHOLD = 0.50
df_train = df_items[df_items["auto_confidence"] >= CONF_THRESHOLD].copy()

# final label column
df_train["final_label"] = df_train["auto_label"]

print("Training rows after confidence filter:", len(df_train))
print(df_train["final_label"].value_counts())

df_train.to_csv(AUTO_LABELED_CSV, index=False)
print("Saved auto-labeled dataset:", AUTO_LABELED_CSV)


Training rows after confidence filter: 189
final_label
fresh_food            87
single_use_plastic    36
processed_food        31
sugary_drink          21
household_chemical    10
eco_friendly           4
Name: count, dtype: int64
Saved auto-labeled dataset: /content/drive/MyDrive/WQF7002_GrpAssign/m4_items_auto_labeled.csv


In [None]:
#train test split
X = df_train["item_line"].astype(str).apply(normalize_text).tolist()
y = df_train["final_label"].tolist()

# remove empty texts
clean_pairs = [(x, yy) for x, yy in zip(X, y) if len(x) >= 2 and yy in LABELS]
X, y = zip(*clean_pairs)
X, y = list(X), list(y)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.30, random_state=SEED,
    stratify=y if len(set(y)) > 1 else None
)

print("Train:", len(X_train), "Test:", len(X_test))


Train: 132 Test: 57


In [None]:
#fine tune DistillBERT
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)

def to_hf_dataset(texts, labels):
    ds = Dataset.from_dict({"text": texts, "label": [label2id[l] for l in labels]})
    ds = ds.map(lambda b: tokenizer(b["text"], truncation=True), batched=True)
    return ds

train_ds = to_hf_dataset(X_train, y_train)
test_ds  = to_hf_dataset(X_test,  y_test)

collator = DataCollatorWithPadding(tokenizer=tokenizer)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Map:   0%|          | 0/132 [00:00<?, ? examples/s]

Map:   0%|          | 0/57 [00:00<?, ? examples/s]

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    BASE_MODEL,
    num_labels=len(LABELS),
    id2label=id2label,
    label2id=label2id
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average="macro")
    }


Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
from transformers import EarlyStoppingCallback

args = TrainingArguments(
    output_dir=MODEL_DIR,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=20,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="macro_f1",
    greater_is_better=True,
    seed=SEED,
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_ds,
    eval_dataset=test_ds,
    tokenizer=tokenizer,
    data_collator=collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

trainer.train()
trainer.evaluate()


  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Accuracy,Macro F1
1,No log,1.51272,0.45614,0.104418
2,No log,1.357352,0.508772,0.179762
3,No log,1.211335,0.666667,0.365464
4,No log,1.07271,0.684211,0.416875
5,No log,0.945808,0.736842,0.471166
6,No log,0.822582,0.824561,0.539511
7,No log,0.723804,0.842105,0.561779
8,No log,0.640419,0.877193,0.594775
9,No log,0.570309,0.877193,0.593755
10,No log,0.519901,0.859649,0.572675




{'eval_loss': 0.6404190063476562,
 'eval_accuracy': 0.8771929824561403,
 'eval_macro_f1': 0.5947747636153433,
 'eval_runtime': 4.3681,
 'eval_samples_per_second': 13.049,
 'eval_steps_per_second': 0.458,
 'epoch': 10.0}

In [None]:
#evaluate
pred = trainer.predict(test_ds)
pred_ids = np.argmax(pred.predictions, axis=-1)

true_ids = np.array([label2id[l] for l in y_test])
true_names = [id2label[i] for i in true_ids]
pred_names = [id2label[i] for i in pred_ids]

print(classification_report(true_names, pred_names, digits=4))

cm = confusion_matrix(true_names, pred_names, labels=LABELS)
cm_df = pd.DataFrame(cm, index=[f"T:{l}" for l in LABELS], columns=[f"P:{l}" for l in LABELS])
cm_df




                    precision    recall  f1-score   support

      eco_friendly     0.0000    0.0000    0.0000         1
        fresh_food     0.9286    1.0000    0.9630        26
household_chemical     0.0000    0.0000    0.0000         3
    processed_food     0.9000    0.9000    0.9000        10
single_use_plastic     0.7500    0.8182    0.7826        11
      sugary_drink     0.8571    1.0000    0.9231         6

          accuracy                         0.8772        57
         macro avg     0.5726    0.6197    0.5948        57
      weighted avg     0.8164    0.8772    0.8453        57



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Unnamed: 0,P:fresh_food,P:processed_food,P:sugary_drink,P:single_use_plastic,P:household_chemical,P:eco_friendly,P:non_essential,P:other
T:fresh_food,26,0,0,0,0,0,0,0
T:processed_food,1,9,0,0,0,0,0,0
T:sugary_drink,0,0,6,0,0,0,0,0
T:single_use_plastic,1,0,1,9,0,0,0,0
T:household_chemical,0,1,0,2,0,0,0,0
T:eco_friendly,0,0,0,1,0,0,0,0
T:non_essential,0,0,0,0,0,0,0,0
T:other,0,0,0,0,0,0,0,0


In [None]:
trainer.save_model(MODEL_DIR)
tokenizer.save_pretrained(MODEL_DIR)
print("Saved model to:", MODEL_DIR)


Saved model to: /content/drive/MyDrive/WQF7002_GrpAssign/item_classifier_model


In [None]:
import shutil, os

for d in os.listdir(MODEL_DIR):
    if d.startswith("checkpoint-"):
        shutil.rmtree(os.path.join(MODEL_DIR, d))


In [None]:
#predict items for UI
@torch.inference_mode()
def predict_items(item_lines: List[str], threshold: float = 0.45) -> List[Dict]:
    tok = AutoTokenizer.from_pretrained(MODEL_DIR)
    mdl = AutoModelForSequenceClassification.from_pretrained(MODEL_DIR)
    mdl.eval()

    cleaned = [normalize_text(x) for x in item_lines]
    enc = tok(cleaned, padding=True, truncation=True, return_tensors="pt")
    probs = torch.softmax(mdl(**enc).logits, dim=-1).cpu().numpy()

    results = []
    for raw, cln, p in zip(item_lines, cleaned, probs):
        best_id = int(np.argmax(p))
        conf = float(np.max(p))
        label = id2label[best_id]
        if conf < threshold:
            label = "other"
        results.append({
            "line": raw,
            "clean": cln,
            "category": label,
            "confidence": round(conf, 4)
        })
    return results

print(predict_items(["COKE 1.5L"]))


[{'line': 'COKE 1.5L', 'clean': 'coke l', 'category': 'sugary_drink', 'confidence': 0.6044}]
