# Setup

## Load Dataset

In [1]:
datasets_root = 'datasets'
dataset_id = 'FUNSD_polygon_augmented'

In [2]:
from datasets import Dataset
from pathlib import Path
import json
from transformers import LayoutLMv3FeatureExtractor, AutoTokenizer, LayoutLMv3Processor
from datasets import Features, Sequence, ClassLabel, Value, Array2D
from PIL import Image
from functools import partial

### Load dataset
Generate a Dataset object from a generator function that contains the paths of the images and annotations.

In [3]:
def generate_samples(folder):
    images_path = Path(folder) / 'images'
    annotations_path = Path(folder) / 'annotations'
    
    images = list(images_path.glob('*.png'))
    
    for image_path in images:
        image_id = image_path.stem
        annotation_path = annotations_path / f'{image_id}.json'
        yield {
            'image_path': str(image_path),
            'annotation_path': str(annotation_path)
        }

def get_features_type():
    return Features({
        "image_path": Value("string"),
        "annotation_path": Value("string")
    })

def get_dataset_folder(split='train'):
    if split == 'train':
        path = Path(datasets_root) / dataset_id / 'dataset' / 'training_data'
    elif split == 'test':
        path = Path(datasets_root) / dataset_id / 'dataset' / 'testing_data'
    else:
        raise ValueError('split must be either "train" or "test"')
    
    return path



train_folder = get_dataset_folder()
train_dataset = Dataset.from_generator(partial(generate_samples, train_folder), features=get_features_type())
test_folder = get_dataset_folder('test')
test_dataset = Dataset.from_generator(partial(generate_samples, test_folder), features=get_features_type())

In [4]:
def get_unique_labels(folder):
    annotations_path = Path(folder) / 'annotations'
    unique_labels = set()

    for annotation_file in annotations_path.glob('*.json'):
        with open(annotation_file, 'r') as f:
            annotations = json.load(f)['form']
            for annotation in annotations:
                unique_labels.add(annotation['label'])

    return sorted(list(unique_labels))

labels = get_unique_labels(train_folder)
label_to_id = {label: idx for idx, label in enumerate(labels)}
id_to_label = {idx: label for idx, label in enumerate(labels)}

### Preprocess dataset
Load the images and annotations, and encode them using the LayoutLMv3 processor.

In [None]:
# Initialize processor
model_id = "SCUT-DLVCLab/lilt-roberta-en-base"
feature_extractor = LayoutLMv3FeatureExtractor(apply_ocr=False)
tokenizer = AutoTokenizer.from_pretrained(model_id)
processor = LayoutLMv3Processor(feature_extractor, tokenizer)

# Custom features
features = Features(
    {
        "input_ids": Sequence(feature=Value(dtype="int64")),
        "attention_mask": Sequence(feature=Value(dtype="int64")),
        "polygon": Array2D(dtype="int64", shape=(512, 8)),
        "labels": Sequence(ClassLabel(names=labels)),
    }
)

# Preprocess function
def process(sample, processor=None):
    # Assume sample["annotation_path"] is the path to the JSON file containing your annotations
    with open(sample["annotation_path"], "r") as f:
        annotations = json.load(f)['form']

    # Extract tokens and bounding polygons from annotations
    tokens = [ann["text"] for ann in annotations]
    polygons = [ann["polygon"] for ann in annotations]  # Modify this if your "polygon" is not actually a bbox
    
    max_length = 512
    padding_length = max_length - len(tokens)

    # Custom Padding for polygons
    polygons = polygons + [[0, 0, 0, 0, 0, 0, 0, 0] for _ in range(padding_length)]
    
    # Custom Padding for tokens
    tokens = tokens + ["[PAD]" for _ in range(padding_length)]

    
    # Convert string labels to integers using the mapping
    word_labels = [label_to_id[ann["label"]] for ann in annotations]

    # Custom Padding for word_labels (use a specific padding value, e.g., -100)
    word_labels = word_labels + [-100 for _ in range(padding_length)]
    
    # Load image
    image = Image.open(sample["image_path"]).convert("RGB")

    # Encoding without 
    encoding = processor(
        image,
        tokens,
        word_labels=word_labels,
        boxes=polygons,
        padding="max_length",
        truncation=True,
    )
    # Manually insert polygons (since processor can't handle 8-coordinates)
    encoding['polygon'] = polygons
    # encoding['bbox'] = torch.tensor(encoding['bbox'], dtype=torch.int64).numpy()
    
    del encoding["pixel_values"]
    del encoding['bbox']

    return encoding

# Process your dataset (replace `dataset` with the actual dataset object)
train_dataset = train_dataset.map(
    partial(process, processor=processor),
    remove_columns=["image_path", "annotation_path"],
    features=features,
).with_format("torch")
test_dataset = test_dataset.map(
    partial(process, processor=processor),
    remove_columns=["image_path", "annotation_path"],
    features=features,
).with_format("torch")

In [None]:
from datasets import DatasetDict

dataset = DatasetDict({'train': train_dataset, 'test': test_dataset})
dataset.save_to_disk(Path(datasets_root) / dataset_id / 'huggingface_dataset')

# Training

## Load HF dataset

In [7]:
from datasets import DatasetDict

dataset = DatasetDict.load_from_disk(f'{datasets_root}/{dataset_id}/huggingface_dataset')
dataset

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask', 'polygon', 'labels'],
        num_rows: 14900
    })
    test: Dataset({
        features: ['input_ids', 'attention_mask', 'polygon', 'labels'],
        num_rows: 5000
    })
})

## Create model

In [8]:
from model.lilt import LiltForSequenceClassification
from transformers import AutoConfig

num_labels = len(labels)
config = AutoConfig.from_pretrained(
    'SCUT-DLVCLab/lilt-roberta-en-base',
    num_labels=num_labels,
    label2id=label_to_id,
    id2label=id_to_label,
)
model = LiltForSequenceClassification(config)

## Prepare metrics

In [9]:
import evaluate
import numpy as np

# load seqeval metric
metric = evaluate.load("seqeval")

# labels of the model
class_labels = [config.id2label[i] for i in range(num_labels)]


def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    all_predictions = []
    all_labels = []
    for prediction, label in zip(predictions, labels):
        for predicted_idx, label_idx in zip(prediction, label):
            if label_idx == -100:
                continue
            all_predictions.append(class_labels[predicted_idx])
            all_labels.append(class_labels[label_idx])
    return metric.compute(predictions=[all_predictions], references=[all_labels])


## Prepare trainer & train

In [10]:
from transformers import Trainer, TrainingArguments

# hugging face parameter
repository_id = "lilt-polygon"

# Define training args
training_args = TrainingArguments(
    output_dir=repository_id,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    fp16=True,
    learning_rate=5e-5,
    max_steps=2500,
    # logging & evaluation strategies
    logging_dir=f"{repository_id}/logs",
    logging_strategy="steps",
    logging_steps=200,
    evaluation_strategy="steps",
    save_strategy="steps",
    save_steps=200,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="overall_f1",
)

# Create Trainer instance
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    compute_metrics=compute_metrics,
)

In [11]:
trainer.train()

RuntimeError: mat1 and mat2 shapes cannot be multiplied (4096x640 and 768x192)