<a href="https://colab.research.google.com/github/667029/KVP10k/blob/main/LayoutLMv3_KVP10k_9april_clean.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

_______

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
#!jupyter nbconvert --ClearOutputPreprocessor.enabled=True \
#  --inplace "/content/drive/MyDrive/Colab Notebooks/LayoutLMv3_KVP10k_9april.ipynb"

In [None]:
!pip install -q transformers datasets seqeval

In [None]:
!pip install evaluate


In [None]:
import os              #navigere mapper og filer, hente filbaner
from PIL import Image  #åpne, vise og manipulere bilder
import json            #lese/skrive til JSON-filer
from transformers import LayoutLMv3Processor
import torch
from google.colab import drive

In [None]:
drive.mount('/content/drive')

In [None]:
base_path = "/content/drive/MyDrive/DAT255/KVP10k-dataset/kvp10k/"
print(os.listdir(base_path))

In [None]:
processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base", apply_ocr=False) # <-- Viktig fordi vi allerede har utført OCR på bildet og har tekst og bboxes

In [None]:
# Mapping fra tekstlige BIO-labels til tall som modellen bruker
label_map = {
    "O": 0,
    "B-KEY": 1,
    "I-KEY": 2,
    "B-VALUE": 3,
    "I-VALUE": 4,
}

# Funksjon for å skalere bounding boxes til 0-1000 (som LayoutLMv3 krever)
def normalize_bbox(bbox, width, height):
  return [
      int(1000 * (bbox[0] /width)),
      int(1000 * (bbox[1] / height)),
      int(1000 * (bbox[2] / width)),
      int(1000 * (bbox[3] / height))
  ]


def assign_label_for_box(box, boxes, label_type):
  """Returnerer liste med (index, label) for tokens som overlapper box"""
  overlaps = []
  for i, token_box in enumerate(boxes):
    if box_overlap(box, token_box) > 0:
      overlaps.append(i)

  overlaps = sorted(overlaps)

  labeled = []
  for j, idx in enumerate(overlaps):
    tag = f"B-{label_type}" if j == 0 else f"I-{label_type}"
    labeled.append((idx, tag))

  return labeled


#Sjekker om OCR-boksen overlapper med GTS(key/value)-boksen.
#Ved overlapp hører de til hverandre.
def box_overlap(box1, box2):
  x0 = max(box1[0], box2[0])
  y0 = max(box1[1], box2[1])
  x1 = min(box1[2], box2[2])
  y1 = min(box1[3], box2[3])
  return max(0, x1 - x0) * max(0, y1 - y0)


# Funksjon for å generere BIO-labels fra gts (ground truth).
# Lager en BIO-label for hvert token basert på om det overlapper med en key- eller value-boks fra GTS.
# Matcher hvert token fra OCR (word + bbox) mot key/value-bbokser fra gts:
# --> Token overlapper en nøkkelboks: B-KEY eller I-KEY
# --> Token overlapper en verdiboks: B-VALUE eller I-VALUE
# --> Ellers: O
def iob_from_kvps(words, boxes, kvps):
  labels = ["O"] * len(words)

  #Gå igjennom alle key-value-pairs
  for kvp in kvps:
    if "key" in kvp and "bbox" in kvp["key"]:
      key_bbox = kvp["key"]["bbox"]
      for idx, tag in assign_label_for_box(key_bbox, boxes, "KEY"):
        labels[idx] = tag

    if "value" in kvp and "bbox" in kvp["value"]:
      value_box = kvp["value"]["bbox"]
      for idx, tag in assign_label_for_box(value_box, boxes, "VALUE"):
        labels[idx] = tag

  return labels

In [None]:
def load_example(doc_id, base_path):
  image_path = os.path.join(base_path, "images", f"{doc_id}.png")
  ocr_path = os.path.join(base_path, "ocrs", f"{doc_id}.json")
  gt_path = os.path.join(base_path, "gts", f"{doc_id}.json")

  image = Image.open(image_path).convert("RGB")

  with open(ocr_path, "r", encoding="utf-8") as f:
    ocr_data = json.load(f)

  with open(gt_path, "r", encoding="utf-8") as f:
    gt_data = json.load(f)

  page = ocr_data["pages"][0]
  width, height = page["width"], page["height"]

  words = [w["text"] for w in page["words"]]
  raw_boxes = [w["bbox"] for w in page["words"]]
  boxes = [normalize_bbox(b, width, height) for b in raw_boxes]

  kvps = gt_data["kvps_list"]
  string_labels = iob_from_kvps(words, raw_boxes, kvps)
  labels = [label_map[l] for l in string_labels]

  #Fyller på med padding, og truncation klipper av hvis sekvensen har for mange tokens, returnerer som PyTorch-tensor
  encoding = processor(image, words, boxes=boxes, word_labels=labels, padding="max_length", truncation=True, return_tensors="pt")

  return encoding, words, boxes, string_labels, image

In [None]:
from datasets import Dataset, DatasetDict, Image as HFImage
from tqdm import tqdm
import os

def load_split(split_name, base_path, limit=None):
    split_path = os.path.join(base_path, split_name)
    gts_dir = os.path.join(split_path, "gts")

    doc_ids = sorted([
        fname.replace(".json", "")
        for fname in os.listdir(gts_dir)
        if fname.endswith(".json")
    ])

    if limit:
        doc_ids = doc_ids[:limit]

    examples = []

    for doc_id in tqdm(doc_ids, desc=f"Laster {split_name}"):
        try:
            encoding, words, boxes, string_labels, image = load_example(doc_id, split_path)

            example = {
                "id": doc_id,
                "input_ids": encoding["input_ids"].squeeze(0),
                "attention_mask": encoding["attention_mask"].squeeze(0),
                "bbox": encoding["bbox"].squeeze(0),
                "labels": encoding["labels"].squeeze(0),
                "pixel_values": encoding["pixel_values"].squeeze(0),
                "image": image,
                "tokens": words,
                "bboxes": boxes,
                "ner_tags": [label_map[l] for l in string_labels]

            }

            examples.append(example)

        except Exception as e:
            print(f"Feil i {doc_id}: {e}")

    #return Dataset.from_list(examples)
    return Dataset.from_list(examples).cast_column("image", HFImage(decode=True))


In [None]:
from datasets import load_from_disk

# Stien du lagret til
dataset = load_from_disk("/content/drive/MyDrive/KVP10k_processed_ready/dataset")

# Hent delene
train_dataset = dataset["train"]
eval_dataset = dataset["eval"]
test_dataset = dataset["test"]

train_dataset.set_format("torch")
eval_dataset.set_format("torch")
test_dataset.set_format("torch")

print("Train size:", len(train_dataset))
print("Eval size:", len(eval_dataset))
print("Test size:", len(test_dataset))

# Eksempel på batch-format
print(train_dataset[0].keys())

In [None]:
from copy import deepcopy

# Last inn begrenset antall dokumenter for utvikling/testing
raw_train_dataset = load_split("train", base_path, limit=1000)
raw_test_dataset = load_split("test", base_path, limit=100)

# Kombiner i Hugging Face-format
dataset = DatasetDict({
    "train": deepcopy(raw_train_dataset),
    "test": deepcopy(raw_test_dataset),
})

# Angi hvilke kolonner som skal konverteres til PyTorch-tensorer
dataset.set_format(
    type="torch",
    columns=["input_ids", "attention_mask", "bbox", "labels", "pixel_values"],
)

# Splitt treningssettet i train + eval (f.eks. 80/20)
split = dataset["train"].train_test_split(test_size=0.2, seed=42)
train_dataset = split["train"]
eval_dataset = split["test"]

print("\n")
print(f"Fullt datasett: {dataset}")
print(f"Train: {train_dataset}")
print(f"Eval: {eval_dataset}")


In [None]:
train_dataset.features

In [None]:
example = train_dataset[0]
for k,v in example.items():
    print(k,v.shape)

In [None]:
processor.tokenizer.decode(train_dataset[0]["input_ids"])

In [None]:
for id, label in zip(train_dataset[0]["input_ids"], train_dataset[0]["labels"]):
  print(processor.tokenizer.decode([id]), label.item())

In [None]:
from evaluate import load
metric = load("seqeval")

In [None]:
import numpy as np

label_list = ["O", "B-KEY", "I-KEY", "B-VALUE", "I-VALUE"]
label2id = {label: i for i, label in enumerate(label_list)}
id2label = {i: label for i, label in enumerate(label_list)}


def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [id2label[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
from transformers import LayoutLMv3ForTokenClassification

model = LayoutLMv3ForTokenClassification.from_pretrained("microsoft/layoutlmv3-base",
                                                         id2label=id2label,
                                                         label2id=label2id)

In [None]:
from transformers import TrainingArguments

train_args = TrainingArguments(
    output_dir="test",
    num_train_epochs=8,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    learning_rate=5e-6,
    weight_decay=0.01,
    evaluation_strategy="steps",
    eval_steps=125,
    save_steps=250,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    logging_dir="./logs",
    logging_steps=50,
    report_to="tensorboard",
    lr_scheduler_type="cosine",
    warmup_steps=100,
    fp16=True
)



In [None]:
from transformers import Trainer, EarlyStoppingCallback
from transformers.data.data_collator import default_data_collator

trainer = Trainer (
    model=model,
    args=train_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=processor.tokenizer,
    data_collator=default_data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

In [None]:
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
# Angi en mappe i Drive (eller lokalt hvis du vil kopiere senere)
output_dir = "/content/drive/MyDrive/layoutlmv3_kvp10k_model"

# Lagre modell og tokenizer
trainer.save_model(output_dir)
processor.save_pretrained(output_dir)  # dette lagrer både tokenizer + feature extracto

In [None]:
from transformers import AutoModelForTokenClassification

model = AutoModelForTokenClassification.from_pretrained("/content/test/checkpoint-500")


In [None]:
example = dataset["test"][2]
print(example.keys())
example_raw = raw_test_dataset[2]

In [None]:
for k, v in example.items():
  print(k, v.shape)

In [None]:
# Pakker ut tensorene
inputs = {
    "input_ids": example["input_ids"].unsqueeze(0),
    "attention_mask": example["attention_mask"].unsqueeze(0),
    "bbox": example["bbox"].unsqueeze(0),
    "pixel_values": example["pixel_values"].unsqueeze(0)
}

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
inputs = {k: v.to(device) for k, v in inputs.items()}

In [None]:
# Inference
with torch.no_grad():
    outputs = model(**inputs)

In [None]:
logits = outputs.logits
predictions = logits.argmax(-1).squeeze().tolist()
labels = example["labels"]

In [None]:
print(predictions)

In [None]:
print(f"{'Token ID':10} {'Label':10} {'Pred':10}")
print("=" * 30)
for token_id, label_id, pred_id in zip(example["input_ids"], labels, predictions):
    if label_id == -100:
        continue
    token = processor.tokenizer.decode([token_id])
    print(f"{token:10} {id2label[label_id.item()]:10} {id2label[pred_id]:10}")

In [None]:
def unnormalize_box(bbox, width, height):
    return [
        width * (bbox[0] / 1000),
        height * (bbox[1] / 1000),
        width * (bbox[2] / 1000),
        height * (bbox[3] / 1000),
    ]

In [None]:
image = example_raw["image"].copy().convert("RGB")  # PIL image
token_boxes = example["bbox"]  # allerede ferdig prosessert
labels = example["labels"]
input_ids = example["input_ids"]

# Unnormalize bboxes
width, height = image.size
true_boxes = [
    unnormalize_box(box, width, height)
    for box, label in zip(token_boxes, labels)
    if label != -100
]


# Konverter til tekst og fjern -100 padding
true_predictions = [
    id2label[int(pred)] for pred, label in zip(predictions, labels) if label != -100
]
true_labels = [
    id2label[int(label)] for pred, label in zip(predictions, labels) if label != -100
]

token_boxes = example["bbox"]
width, height = example_raw["image"].size

true_boxes = [
    unnormalize_box(box, width, height)
    for box, label in zip(token_boxes, labels)
    if label != -100
]

In [None]:
def iob_to_label(label):
    if label.startswith("B-") or label.startswith("I-"):
        return label[2:].lower()
    return "other"

label2color = {
    "key": "blue",
    "value": "green",
    "other": "gray"
}

Models predictions

In [None]:
from PIL import ImageDraw, ImageFont
from IPython.display import display

draw = ImageDraw.Draw(image)
font = ImageFont.load_default()

for pred_label, box in zip(true_predictions, true_boxes):
    label = iob_to_label(pred_label)
    draw.rectangle(box, outline=label2color.get(label, "red"), width=2)
    draw.text((box[0] + 10, box[1] - 10), label, fill=label2color.get(label, "red"), font=font)

display(image)


_______
Ground truth

In [None]:
image_true = example_raw["image"].convert("RGB")
draw_true = ImageDraw.Draw(image_true)

for word, box, label_id in zip(example_raw['tokens'], example_raw['bboxes'], example_raw['ner_tags']):
    label = iob_to_label(id2label[label_id]).lower()
    box = unnormalize_box(box, width, height)
    draw_true.rectangle(box, outline=label2color.get(label, "gray"), width=2)
    draw_true.text((box[0] + 10, box[1] - 10), label, fill=label2color.get(label, "gray"), font=font)

display(image_true)
