In [None]:
# !pip install transformers datasets scikit-learn seqeval
# !pip install pandas numpy

In [None]:
from datasets import load_dataset
import transformers

print(transformers.__version__)

In [None]:
# Load train dan validation dari masing-masing dataset
cord_train = load_dataset("naver-clova-ix/cord-v2", split="train")
cord_val = load_dataset("naver-clova-ix/cord-v2", split="validation")

donut_train = load_dataset("katanaml-org/invoices-donut-data-v1", split="train")
donut_val = load_dataset("katanaml-org/invoices-donut-data-v1", split="validation")

In [None]:
# Label Definitions
LABELS = [
    "O",
    "B-ITEM", "I-ITEM",
    "B-QUANTITY", "I-QUANTITY",
    "B-PRICE", "I-PRICE",
    "B-TOTAL", "I-TOTAL",
]
label2id = {label: idx for idx, label in enumerate(LABELS)}
id2label = {idx: label for label, idx in label2id.items()}

In [None]:
import json
import pandas as pd
from datasets import load_dataset

def extract_from_cord(example):
    tokens = []
    labels = []
    parsed = json.loads(example['ground_truth'])

    gt_parse = parsed.get('gt_parse', {})
    
    # Handle menu
    for item in gt_parse.get('menu', []):
        # print(f"Item: {item}")
        # print(f"Type of item: {type(item)}")

        # Extract and clean item
        if(type(item) == "dict"):
            item_name = item.get('nm', '').strip()
            quantity = item.get('cnt', 0)
            price = item.get('price', 0)
        else:
            continue

        # Skip items with missing data
        if not item_name or not quantity or not price:
            continue

        # Append to tokens and labels
        tokens.append(item_name)
        labels.append('B-ITEM')
        tokens.append(str(quantity))
        labels.append('B-QUANTITY')
        tokens.append(str(price))
        labels.append('B-PRICE')

    def count_data(tokens):
        return len(tokens)
    
    # Di dalam fungsi extract
    # print(f"Jumlah data akhir: {count_data(tokens)}")

    return tokens, labels

def extract_from_donut(example):
    tokens = []
    labels = []
    parsed = json.loads(example['ground_truth'])

    # Handle items
    for item in parsed.get('gt_parse', {}).get('items', []):
        # print(f"Item: {item}")
        # print(f"Type of item: {type(item)}")
        
        item_name = item.get('item_desc', '').strip()
        quantity = item.get('item_qty', 0)
        price = item.get('item_gross_worth') or item.get('item_net_price') or 0

        # Skip items with missing data
        if not item_name or not quantity or not price:
            continue

        # Append to tokens and labels
        tokens.append(item_name)
        labels.append('B-ITEM')
        tokens.append(str(quantity))
        labels.append('B-QUANTITY')
        tokens.append(str(price))
        labels.append('B-PRICE')

    # Handle total
    summary_data = parsed.get('gt_parse', {}).get('summary', {})
    total_gross = summary_data.get('total_gross_worth', 0)

    tokens.append(str(total_gross))
    labels.append('B-TOTAL')
    
    def count_data(tokens):
        return len(tokens)
    
    # Di dalam fungsi extract
    # print(f"Jumlah data akhir: {count_data(tokens)}")

    return tokens, labels

In [None]:
# import pandas as pd 

# tokens, labels = extract_from_cord(example)
# df = pd.DataFrame({'token': tokens, 'label': labels})
# print(df.head())

In [None]:
all_tokens_train, all_labels_train = [], []
all_tokens_val, all_labels_val = [], []

# Train
for example in cord_train:
    tokens, labels = extract_from_cord(example)
    all_tokens_train.append(tokens)
    all_labels_train.append(labels)

for example in donut_train:
    tokens, labels = extract_from_donut(example)
    all_tokens_train.append(tokens)
    all_labels_train.append(labels)

# Validation
for example in cord_val:
    tokens, labels = extract_from_cord(example)
    all_tokens_val.append(tokens)
    all_labels_val.append(labels)

for example in donut_val:
    tokens, labels = extract_from_donut(example)
    all_tokens_val.append(tokens)
    all_labels_val.append(labels)

In [None]:
from transformers import AutoTokenizer

# Pakai tokenizer multilingual
MODEL_NAME = "bert-base-multilingual-cased"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize_and_align_labels(tokens_list, labels_list):
    tokenized_inputs = []
    aligned_labels = []

    for tokens, labels in zip(tokens_list, labels_list):
        input_tokens = []
        input_labels = []

        for token, label in zip(tokens, labels):
            tokenized = tokenizer.tokenize(token)
            input_tokens.extend(tokenized)
            input_labels.extend([label] + ["I-" + label[2:]] * (len(tokenized) - 1))

        # Convert to input ids
        input_ids = tokenizer.convert_tokens_to_ids(input_tokens)
        label_ids = [label2id[label] for label in input_labels]

        tokenized_inputs.append(input_ids)
        aligned_labels.append(label_ids)

    return tokenized_inputs, aligned_labels

# Tokenize Train
input_ids_train, label_ids_train = tokenize_and_align_labels(all_tokens_train, all_labels_train)

# Tokenize Validation
input_ids_val, label_ids_val = tokenize_and_align_labels(all_tokens_val, all_labels_val)


In [None]:
import torch
from torch.utils.data import Dataset

class InvoiceNERDataset(Dataset):
    def __init__(self, input_ids_list, label_ids_list, max_length=512):
        self.input_ids_list = input_ids_list
        self.label_ids_list = label_ids_list
        self.max_length = max_length

    def __len__(self):
        return len(self.input_ids_list)

    def __getitem__(self, idx):
        input_ids = self.input_ids_list[idx]
        labels = self.label_ids_list[idx]

        # Padding
        attention_mask = [1] * len(input_ids)
        padding_length = self.max_length - len(input_ids)

        if padding_length > 0:
            input_ids = input_ids + [tokenizer.pad_token_id] * padding_length
            attention_mask = attention_mask + [0] * padding_length
            labels = labels + [-100] * padding_length
        else:
            input_ids = input_ids[:self.max_length]
            attention_mask = attention_mask[:self.max_length]
            labels = labels[:self.max_length]

        return {
            "input_ids": torch.tensor(input_ids),
            "attention_mask": torch.tensor(attention_mask),
            "labels": torch.tensor(labels),
        }

In [None]:
# Dataset
train_dataset = InvoiceNERDataset(input_ids_train, label_ids_train)
val_dataset = InvoiceNERDataset(input_ids_val, label_ids_val)

In [None]:
hyperparams = {
    "output_dir": "./bert-receipt-model",
    "evaluation_strategy": "steps",
    "save_strategy": "steps",
    "save_steps": 500,
    "eval_steps": 500,
    "logging_steps": 100,
    "per_device_train_batch_size": 8,
    "per_device_eval_batch_size": 8,
    "num_train_epochs": 5,
    "weight_decay": 0.01,
    "learning_rate": 5e-5,
    "warmup_steps": 500,
    "load_best_model_at_end": True,
    "metric_for_best_model": "loss",  # atau nanti bisa pakai f1
    "greater_is_better": False,       # karena loss makin kecil makin bagus
    "save_total_limit": 2,            # save max 2 checkpoint aja
}

In [None]:
from transformers import BertForTokenClassification, Trainer, TrainingArguments, EarlyStoppingCallback

In [None]:
# 🔥 Model
model = BertForTokenClassification.from_pretrained(
    "bert-base-multilingual-cased",
    num_labels=len(LABELS),
    id2label=id2label,
    label2id=label2id,
)

In [None]:
training_args = TrainingArguments(
    output_dir=hyperparams["output_dir"],
    eval_strategy=hyperparams["evaluation_strategy"],
    save_strategy=hyperparams["save_strategy"],
    save_steps=hyperparams["save_steps"],
    eval_steps=hyperparams["eval_steps"],
    logging_steps=hyperparams["logging_steps"],
    per_device_train_batch_size=hyperparams["per_device_train_batch_size"],
    per_device_eval_batch_size=hyperparams["per_device_eval_batch_size"],
    num_train_epochs=hyperparams["num_train_epochs"],
    weight_decay=hyperparams["weight_decay"],
    learning_rate=hyperparams["learning_rate"],
    warmup_steps=hyperparams["warmup_steps"],
    load_best_model_at_end=hyperparams["load_best_model_at_end"],
    metric_for_best_model=hyperparams["metric_for_best_model"],
    greater_is_better=hyperparams["greater_is_better"],
    save_total_limit=hyperparams["save_total_limit"],
    logging_dir="./logs",
    report_to="none",  # disable wandb
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    processing_class=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],
)

In [None]:
trainer.train()

In [None]:
# 🔥 Save final model + tokenizer
trainer.save_model(hyperparams["output_dir"])
tokenizer.save_pretrained(hyperparams["output_dir"])