In [None]:
# Import image processing and OCR tools
from PIL import Image
from pytesseract import image_to_string
from pdf2image import convert_from_path
from transformers import BertTokenizer, BertForTokenClassification, Trainer, TrainingArguments, DistilBertTokenizer
import torch
from sklearn.model_selection import train_test_split
import pandas as pd
import os
from natsort import natsorted  # For naturally sorted lists (e.g., invoice1, invoice2, invoice10)

# Converts a PDF file into a list of image objects (one per page)
def pdf_to_image(pdf_file):
    return convert_from_path(pdf_file)

# Performs OCR (optical character recognition) on a single image file
def image_to_text(file):
    return image_to_string(file)

# Processes a full PDF file: converts to images → performs OCR → combines the text from all pages
def get_text_from_any_pdf(pdf_file):
    images = pdf_to_image(pdf_file)
    final_text = ""
    for pg, img in enumerate(images):
        final_text += image_to_text(img)
    return final_text

# List of PDF invoice files to be processed
pdfs = ["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"]

# Load the Excel file containing ground truth invoice numbers
invoice_numbers_path = "invoices.xlsx"
df = pd.read_excel(invoice_numbers_path)

# Extract invoice numbers from the Excel file
invoice_numbers = []
for index, row in df.iterrows():
    invoice_number = row[1:].dropna().tolist()
    invoice_number = [str(num) for num in invoice_number if num is not None]
    invoice_numbers.append(invoice_number)


# Extract text from all invoice PDFs using OCR
texts = [get_text_from_any_pdf(pdf) for pdf in pdfs]


### Bereinigung der Daten

In [None]:
from transformers import LongformerTokenizerFast, LongformerForTokenClassification
from datasets import Dataset, DatasetDict
import torch
from sklearn.model_selection import train_test_split

# Split PDFs, invoice number labels and extracted texts into training and test sets
train_pdfs, test_pdfs, train_labels, test_labels, train_texts, test_texts = train_test_split(
    pdfs, invoice_numbers, texts, test_size=0.2, random_state=42
)

# Load Longformer tokenizer and model for token classification (e.g., NER-style)
tokenizer = LongformerTokenizerFast.from_pretrained("allenai/longformer-base-4096", add_prefix_space=True)
model = LongformerForTokenClassification.from_pretrained("allenai/longformer-base-4096", num_labels=3)
# Labels: 0 = O (outside), 1 = B (beginning of entity), 2 = I (inside of entity)

# Convert a list of tokens and known labels (e.g., invoice numbers) into NER-style tags
def label_tokens(tokens, labels):
    ner_tags = [0] * len(tokens)  # Default to "O" (no entity)
    for label in labels:
        label_parts = label.split()
        for i in range(len(tokens) - len(label_parts) + 1):
            if tokens[i:i+len(label_parts)] == label_parts:
                ner_tags[i] = 1  # "B" – beginning of the entity
                for j in range(1, len(label_parts)):
                    ner_tags[i + j] = 2  # "I" – inside the entity
                break
    return ner_tags

# Split the token sequence into overlapping segments suitable for Longformer input length
def split_into_segments(text, labels, max_length=2648, overlap=256):
    tokens = text.split()
    ner_tags = label_tokens(tokens, labels)
    segments = []
    for start_idx in range(0, len(tokens), max_length - overlap):
        end_idx = min(start_idx + max_length, len(tokens))
        segment_tokens = tokens[start_idx:end_idx]
        segment_labels = ner_tags[start_idx:end_idx]
        segments.append({"tokens": segment_tokens, "ner_tags": segment_labels})
        if end_idx == len(tokens):
            break
    return segments

# Create training segments by slicing text + labels into overlapping token windows
train_data = []
for text, labels in zip(train_texts, train_labels):
    train_data.extend(split_into_segments(text, labels))

# Do the same for the test data
test_data = []
for text, labels in zip(test_texts, test_labels):
    test_data.extend(split_into_segments(text, labels))

# Convert to Hugging Face Dataset format
train_dataset = Dataset.from_list(train_data)
test_dataset = Dataset.from_list(test_data)

# Tokenize the input tokens and align the NER labels accordingly
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        padding="max_length",
        max_length=2648,
        is_split_into_words=True,
        add_special_tokens=True
    )
    
    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)  
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100) 
            previous_word_idx = word_idx
        labels.append(label_ids)
    
    tokenized_inputs["labels"] = torch.tensor(labels)
    return tokenized_inputs

# Apply tokenization and label alignment to training and test datasets
train_tokenized = train_dataset.map(tokenize_and_align_labels, batched=True)
test_tokenized = test_dataset.map(tokenize_and_align_labels, batched=True)

# Create a DatasetDict for use in training/evaluation
dataset = DatasetDict({"train": train_tokenized, "test": test_tokenized})


In [None]:
from transformers import LongformerForTokenClassification, Trainer, TrainingArguments
from datasets import Dataset, DatasetDict
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import numpy as np
import wandb

# Initialize Weights & Biases for experiment tracking
wandb.init(project="Training Plots")

# Split training dataset into training (80%) and validation (20%) sets
train_valid_split = train_dataset.train_test_split(test_size=0.2)

# Combine train, validation, and test into a single DatasetDict
dataset = DatasetDict({
    'train': train_valid_split['train'],
    'validation': train_valid_split['test'],
    'test': test_dataset
})

# Evaluation metrics for token classification (ignoring special tokens)
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored tokens (-100) from both predictions and labels
    true_labels = [[label for label in label_group if label != -100] for label_group in labels]
    true_predictions = [
        [pred for (pred, label) in zip(pred_group, label_group) if label != -100]
        for pred_group, label_group in zip(predictions, labels)
    ]
    
    # Flatten for metric calculation
    y_true = [item for sublist in true_labels for item in sublist]
    y_pred = [item for sublist in true_predictions for item in sublist]

    results = precision_recall_fscore_support(y_true, y_pred, average='weighted')
    accuracy = accuracy_score(y_true, y_pred)
    
    return {
        "accuracy": accuracy,
        "precision": results[0],
        "recall": results[1],
        "f1": results[2],
    }

# Tokenize inputs and align token-level labels with word-level annotations
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        padding="max_length",
        max_length=2648,
        is_split_into_words=True,
        add_special_tokens=True
    )
    
    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []

        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)  # Special tokens will be ignored
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])  # First sub-token of word gets the label
            else:
                label_ids.append(-100)  # Remaining sub-tokens are ignored
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# Tokenize and align labels for each split
train_tokenized = dataset["train"].map(tokenize_and_align_labels, batched=True)
validation_tokenized = dataset["validation"].map(tokenize_and_align_labels, batched=True)
test_tokenized = dataset["test"].map(tokenize_and_align_labels, batched=True)

# Load Longformer model with 3 labels for token classification (e.g., O, B, I)
model = LongformerForTokenClassification.from_pretrained(
    "allenai/longformer-base-4096",
    num_labels=3
)

# Define training arguments for the Hugging Face Trainer
training_args = TrainingArguments(
    output_dir="./results",                 # Directory for model checkpoints
    evaluation_strategy="epoch",            # Evaluate at the end of each epoch
    logging_dir='./logs',                   # Directory for log files
    logging_steps=10,                       # Log every 10 steps
    num_train_epochs=3,                     # Number of training epochs
    per_device_train_batch_size=1,          # Batch size per device during training
    per_device_eval_batch_size=1,           # Batch size for evaluation
    save_steps=10_000,                      # Save model every 10,000 steps (not likely in small datasets)
    save_total_limit=2,                     # Keep only the 2 most recent checkpoints
    learning_rate=5e-5,                     # Learning rate for optimizer
    report_to="wandb"                       # Enable Weights & Biases logging
)

# Initialize the Hugging Face Trainer object
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=validation_tokenized,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

# Start the training loop
trainer.train()


In [None]:
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    classification_report,
    confusion_matrix
)
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# Run evaluation on the test set using the trained model
results = trainer.predict(test_tokenized)

# Extract raw model predictions and true labels
predictions = results.predictions
labels = results.label_ids

# Convert logits to predicted class indices
predictions = np.argmax(predictions, axis=2)

# Remove special tokens (-100) from labels and predictions
true_labels = [
    [label for label in label_group if label != -100]
    for label_group in labels
]
true_predictions = [
    [pred for (pred, label) in zip(pred_group, label_group) if label != -100]
    for pred_group, label_group in zip(predictions, labels)
]

# Flatten the lists for metric calculations
true_labels_flat = [item for sublist in true_labels for item in sublist]
true_predictions_flat = [item for sublist in true_predictions for item in sublist]

# Function to compute classification metrics and confusion matrix
def compute_metrics_per_class(true_labels_flat, true_predictions_flat):
    report = classification_report(
        true_labels_flat,
        true_predictions_flat,
        target_names=["O", "I-invoice_number", "B-invoice_number"]
    )
    print("Classification Report:")
    print(report)
    
    conf_matrix = confusion_matrix(true_labels_flat, true_predictions_flat)
    return conf_matrix, report

# Compute metrics and confusion matrix
conf_matrix, report = compute_metrics_per_class(true_labels_flat, true_predictions_flat)

# Print basic metrics
print("Evaluation Results:")
print(f"Accuracy: {accuracy_score(true_labels_flat, true_predictions_flat)}")
print(f"Precision: {precision_recall_fscore_support(true_labels_flat, true_predictions_flat, average='weighted')[0]}")
print(f"Recall: {precision_recall_fscore_support(true_labels_flat, true_predictions_flat, average='weighted')[1]}")
print(f"F1 Score: {precision_recall_fscore_support(true_labels_flat, true_predictions_flat, average='weighted')[2]}")

# Plot the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(
    conf_matrix,
    annot=True,
    fmt='d',
    cmap='Blues',
    xticklabels=["O", "I-invoice_number", "B-invoice_number"],
    yticklabels=["O", "I-invoice_number", "B-invoice_number"]
)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()


# Identify misclassified tokens and their surrounding context
misclassifications = []
for i, (pred, true) in enumerate(zip(true_predictions, true_labels)):
    for j, (p, t) in enumerate(zip(pred, true)):
        if p != t:
            # Extract surrounding tokens (±5) for context
            context_tokens = dataset["test"][i]['tokens'][max(0, j-5):min(len(dataset["test"][i]['tokens']), j+6)]
            context = " ".join(context_tokens)
            misclassifications.append((i, j, p, t, context))

# Print misclassified tokens with context
for doc_idx, token_idx, pred, true, context in misclassifications:
    print(
        f"Document {doc_idx}, Token {token_idx}: "
        f"Predicted {pred}, Actual {true}, "
        f"Context: {test_tokenized['tokens'][doc_idx][token_idx]}"
    )
