In [None]:
import os
import ast
import random
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from datasets import load_dataset, DatasetDict, Sequence, ClassLabel, Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    TrainingArguments,
    Trainer,
    DataCollatorForTokenClassification,
)
from seqeval.metrics import classification_report, precision_score, recall_score, f1_score
from torch import cuda

# Setup device
device = 'cuda' if cuda.is_available() else 'cpu'
print(f"Running on device: {device}")

In [None]:
# Labels
label_list = ["O", "B-DEP", "I-DEP", "B-ARR", "I-ARR"]

# Load Dataset
path = "C:/Users/Seed/Documents/Master/Epitech/Travel-Order-Resolver/ai/nlp/dataset/tokens/token.csv"
dataset = load_dataset('csv', data_files={'train': path}, delimiter=';')
dataset["train"] = dataset["train"].remove_columns("spacy_ner_tags")

In [None]:
# Validate and filter data
def is_valid_row(row):
    try:
        tokens = ast.literal_eval(row['tokens'])
        ner_tags = ast.literal_eval(row['ner_tags'])
        return (
            isinstance(tokens, list) and all(isinstance(t, str) for t in tokens)
            and isinstance(ner_tags, list) and all(isinstance(tag, int) for tag in ner_tags)
        )
    except (ValueError, SyntaxError):
        return False

dataset = dataset.filter(is_valid_row)
dataset = dataset.map(lambda line: {'tokens': ast.literal_eval(line['tokens']), 'ner_tags': ast.literal_eval(line['ner_tags'])})
dataset = dataset.cast_column("ner_tags", Sequence(feature=ClassLabel(num_classes=len(label_list), names=label_list)))

In [None]:
# Shuffle and split dataset
train_data = dataset['train'].shuffle(seed=42).train_test_split(test_size=0.7)['train']
train_test_valid = train_data.train_test_split(test_size=0.25)
test_valid = train_test_valid['test'].train_test_split(test_size=0.7)
dataset = DatasetDict({
    'train': train_test_valid['train'],
    'test': test_valid['test'],
    'valid': test_valid['train'],
})

# Initialize tokenizer and model
model_checkpoint = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, padding=True, is_split_into_words=True)
    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

dataset = dataset.map(tokenize_and_align_labels, batched=True)

# Prepare model
id2label = {i: label for i, label in enumerate(label_list)}
label2id = {label: i for i, label in id2label.items()}

model = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint,
    num_labels=len(label_list),
    id2label=id2label,
    label2id=label2id,
)

# Training arguments
batch_size = 16
epochs = 3
training_args = TrainingArguments(
    output_dir="models/distilbert-finetuned-token-classification",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=epochs,
    weight_decay=0.01,
)

# Data collator and Trainer
data_collator = DataCollatorForTokenClassification(tokenizer)

In [None]:
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)
    y_pred = [[label_list[p] for p, l in zip(pred, lab) if l != -100] for pred, lab in zip(predictions, labels)]
    y_true = [[label_list[l] for p, l in zip(pred, lab) if l != -100] for pred, lab in zip(predictions, labels)]
    results = classification_report(y_true, y_pred, output_dict=True)
    return {
        "precision": results["macro avg"]["precision"],
        "recall": results["macro avg"]["recall"],
        "f1": results["macro avg"]["f1-score"],
        "accuracy": results["accuracy"],
    }

trainer = Trainer(
    model,
    training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["valid"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

# Train and evaluate
trainer.train()
trainer.save_model("models/distilbert-finetuned-token-classification")
train_metrics = trainer.evaluate(dataset["train"])
validation_metrics = trainer.evaluate(dataset["valid"])

In [None]:
# Test predictions
predictions, labels, _ = trainer.predict(dataset["test"])
predictions = np.argmax(predictions, axis=2)

y_pred = [[label_list[p] for p, l in zip(pred, lab) if l != -100] for pred, lab in zip(predictions, labels)]
y_true = [[label_list[l] for p, l in zip(pred, lab) if l != -100] for pred, lab in zip(predictions, labels)]

# Confusion Matrix
flat_y_true = [item for sublist in y_true for item in sublist]
flat_y_pred = [item for sublist in y_pred for item in sublist]

cm = confusion_matrix(flat_y_true, flat_y_pred, labels=label_list)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=label_list, yticklabels=label_list)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()