In [None]:
cd ..

# BERT base multilingual Fine-Tuning
This notebook explores fine-tuning BERT base for text classification.

In [None]:
import numpy as np
import pandas as pd
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    DataCollatorWithPadding,
    pipeline
)
from datasets import load_dataset
from datasets import Dataset, ClassLabel, DatasetDict
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, top_k_accuracy_score
from sklearn.preprocessing import LabelEncoder
import random


# Configs
Here, we can set some parameters for importing and training.

In [None]:
model_id        : str   = f'bert-base-multilingual-uncased'
max_seq_len     : int   = 256

output_dir      : str   = f'saved_models/{model_id}'
epochs          : int   = 4
learn_rate      : float = 5e-5
scheduler       : str   = 'linear'
train_bs        : int   = 16
eval_bs         : int   = 32
ga_steps        : int   = 2
decay           : float = 0.01
warmup          : float = 0.1
# log_steps     : int   = 10
eval_strategy   : str   = 'epoch'
logging_strategy: str = 'epoch'
save_strategy   : str   = 'no'
fp16            : bool  = False
load_best       : bool  = False
report_to       : list  = []
log_level       : str   = 'warning'

SEED            : int   = 42

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data Preprocessing
Load the data locally and convert it into hugging face datasets format. 
The data should have a text column and a label column that comprises numerical labels.

In [None]:
df = pd.read_csv(
    "data/raw/nace_train.csv", # TODO: change to augmented dataset
    index_col=0
)

In [None]:
data = DatasetDict({
    'train': Dataset.from_pandas(df)
})

In [None]:
data['train'][0]

In [None]:
label_encoder = LabelEncoder()
label_encoder.fit(data['train']['label'])

# Generate mappings
id2label = {i: str(label) for i, label in enumerate(label_encoder.classes_)}
label2id = {label: i for i, label in id2label.items()}

class_label = ClassLabel(names=label_encoder.classes_.tolist())

In [None]:
data = data.map(lambda x: {'label': label_encoder.transform(x['label'])}, batched=True)
# Map your dataset to use the ClassLabel feature for stratification
data = data.cast_column('label', class_label)

In [None]:
data = data['train'].train_test_split(test_size=0.05, seed=SEED, stratify_by_column="label")
data["validation"] = data.pop("test")

# Load the Model
Load the model and tokenizer from huggingface. If the model is gated or private, you need to set an environment variable called "HF_TOKEN" that contans your huggingface token.

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_id,
    num_labels=len(id2label), 
    id2label=id2label, 
    label2id=label2id,
).to(device)

tokenizer = AutoTokenizer.from_pretrained(model_id)

Now we tokenize and pad the data using the pretrained tokenizer.

In [None]:
def tokenize(example):
    return tokenizer(example["text"], padding=True, truncation=True, max_length=max_seq_len)

tokenized_data = data.map(
    tokenize,
    batched=True
)
     

# Training
First, we define a function to compute the metrics that we want to monitor during training.

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    
    num_classes = logits.shape[1]
    class_labels = np.arange(num_classes)  # Ensure all classes are covered
    
    accuracy = accuracy_score(labels, predictions)
    f1 = f1_score(labels, predictions, average='macro', zero_division=0)
    precision = precision_score(labels, predictions, average='macro', zero_division=0)
    recall = recall_score(labels, predictions, average='macro', zero_division=0)
    top_1_acc = top_k_accuracy_score(labels, logits, k=1, labels=class_labels)
    top_2_acc = top_k_accuracy_score(labels, logits, k=2, labels=class_labels)

    return {
        'accuracy': accuracy,
        'f1_macro': f1,
        'precision_macro': precision,
        'recall_macro': recall,
        'top_1_accuracy': top_1_acc,
        'top_2_accuracy': top_2_acc,
    }

Now, we define the training arguments and the trainer class.

In [None]:
training_args = TrainingArguments(
    output_dir=output_dir,
    num_train_epochs=epochs,
    learning_rate=learn_rate,
    lr_scheduler_type=scheduler,
    per_device_train_batch_size=train_bs,
    per_device_eval_batch_size=eval_bs,
    gradient_accumulation_steps=ga_steps,
    warmup_ratio=warmup,
    weight_decay=decay,
    logging_dir='./logs',
    # logging_steps=log_steps,
    logging_strategy=logging_strategy,
    eval_strategy=eval_strategy,
    save_strategy=save_strategy,
    fp16=fp16,
    load_best_model_at_end=load_best,
    report_to=report_to,
    log_level=log_level,
)

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_data['train'],
    eval_dataset=tokenized_data['validation'],
    compute_metrics=compute_metrics,
    data_collator=data_collator
)

Finally, we can start training the model.

In [None]:
%%time
trainer.train()

# Evaluation
Now, we can evaluate the model on our test set.

In [None]:
pipe = pipeline(
    task='text-classification',
    model=model, 
    tokenizer=tokenizer, 
)

In [None]:
df_test = pd.read_csv('data/raw/nace_test.csv', index_col=0)

In [None]:
df_test

In [None]:
y_test = df_test['label'].tolist()
X_test = df_test['text'].tolist()

In [None]:
%%time
result = pipe(X_test)
result_topk = pipe(X_test, top_k=2)

In [None]:
y_pred = [_['label'] for _ in result]

In [None]:
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='macro', zero_division=0)
precision = precision_score(y_test, y_pred, average='macro', zero_division=0)
recall = recall_score(y_test, y_pred, average='macro', zero_division=0)

In [None]:
print('Performance on test set \n')
print(f'Accuracy score  : {accuracy:.3f}')
print(f'F1 score        : {f1:.3f}')
print(f'precision score : {precision:.3f}')
print(f'recall score    : {recall:.3f}')

In [None]:
# Create probability matrix
num_samples = len(result_topk)
num_classes = len(label2id)
y_pred_proba = np.zeros((num_samples, num_classes))

for i, sample in enumerate(result_topk):
    for pred in sample:
        class_idx = label2id[pred['label']]
        y_pred_proba[i][class_idx] = pred['score']

In [None]:
top1 = top_k_accuracy_score(y_test, y_pred_proba, k=1, labels=list(label2id.keys()))
top2 = top_k_accuracy_score(y_test, y_pred_proba, k=2, labels=list(label2id.keys()))

In [None]:
print(f'Top 1 accuracy  : {top1:.3f}')
print(f'Top 2 accuracy  : {top2:.3f}')