## Fine-Tuning deepset/gbert-base for German Hate Speech Classification
This notebook fine-tunes `deepset/gbert-base` on a German hate speech dataset using PyTorch, BERT and adding an additional fully connected layer.

For the hyperparameter tuning optuna and the Tree-structured Parzen estimator (TPE), a Bayesian optimization method is used.

The script automatically logs the model metrics to the wandb project as well as to "logged_model_metrics.xlsx"



In [None]:
!pip install optuna

Collecting optuna
  Downloading optuna-4.1.0-py3-none-any.whl.metadata (16 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.14.0-py3-none-any.whl.metadata (7.4 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting Mako (from alembic>=1.5.0->optuna)
  Downloading Mako-1.3.8-py3-none-any.whl.metadata (2.9 kB)
Downloading optuna-4.1.0-py3-none-any.whl (364 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m364.4/364.4 kB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading alembic-1.14.0-py3-none-any.whl (233 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.5/233.5 kB[0m [31m23.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Downloading Mako-1.3.8-py3-none-any.whl (78 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: M

In [None]:
# Import Libraries
import wandb
import optuna
from optuna.pruners import MedianPruner
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score
    , precision_recall_fscore_support
    , matthews_corrcoef
    , classification_report
    , f1_score
    , log_loss
    , precision_score
    , recall_score
    , fbeta_score
    , confusion_matrix
    , ConfusionMatrixDisplay
)
from transformers import AutoTokenizer, BertModel, Trainer, TrainingArguments
import logging
import os
from google.colab import runtime, userdata, drive
import datetime
import threading
import time

## Set up logging and connections

In [None]:
# Configure Logging
logging.basicConfig(
    level=logging.INFO,  # Log INFO
    format="%(asctime)s - %(levelname)s - %(message)s",
    force=True
)

logger = logging.getLogger(__name__)

torch.cuda.empty_cache()


In [None]:
# connect to wandb
wandbkey = userdata.get('WandbKey')

wandb.login(key=wandbkey)

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33mmaxnienh[0m ([33mmaxnienh-xx[0m). Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


True

In [None]:
# Set Device to GPU
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
logger.info(f"Using device: {device}")
if torch.cuda.is_available():
    logger.info(f"GPU Name: {torch.cuda.get_device_name(0)}")


2025-01-05 00:46:05,217 - INFO - Using device: cuda
2025-01-05 00:46:05,240 - INFO - GPU Name: NVIDIA L4


In [None]:
# mount google drive for folder access
drive.mount('/content/drive')

Mounted at /content/drive


## Define custom BERT model class

In [None]:
class BERT_WithExtraLayer(nn.Module):
    def __init__(self, bert_model_name, num_labels=2, hidden_dim=256, dropout_rate=0.1, class_weights=None):
        """
        Initializes a BERT-based model with an additional fully connected layer.

        :param bert_model_name: Name of the pre-trained BERT model to be used.
        :param num_labels: Number of output classes for classification.
        :param hidden_dim: Size of the hidden layer in the additional fully connected network.
        :param dropout_rate: Dropout rate for regularization.
        :param class_weights: Optional tensor containing class weights for handling class imbalance.
        """
         
        super(BERT_WithExtraLayer, self).__init__()

        # Load pre-trained BERT model
        self.bert = BertModel.from_pretrained(bert_model_name)
        self.hidden_size = self.bert.config.hidden_size

        # Additional fully connected layer
        self.extra_fc = nn.Linear(self.hidden_size, hidden_dim)
        self.relu = nn.ReLU()

        # Final classification layer
        self.classifier = nn.Linear(hidden_dim, num_labels)

        # Dropout layer
        self.dropout = nn.Dropout(dropout_rate)

        self.class_weights = class_weights

    def forward(self, input_ids, attention_mask=None, labels=None):
        """
        Forward pass through the model.

        :param input_ids: Tensor of tokenized input IDs.
        :param attention_mask: Tensor indicating which tokens should be attended to.
        :param labels: Optional tensor of target labels for loss calculation.
        :return: During training: (loss, logits), During inference: logits
        """
        
        # Forward pass through BERT
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output  # Shape: (batch_size, hidden_size)

        # Forward pass through extra layer
        x = self.extra_fc(pooled_output)
        x = self.relu(x)
        x = self.dropout(x)

        # Classification layer
        logits = self.classifier(x)  # Shape: (batch_size, num_labels)

        # Compute loss if labels are provided
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss(weight=class_weights) # for balancing the classes
            loss = loss_fct(logits.view(-1, self.classifier.out_features), labels.view(-1))
            return loss, logits

        return logits

## Load, split and tokenize data

In [None]:
#import pandas as pd
dataset_filename = "Dataset_anonymized_annotated_comments_same_annotation_final.csv"
csv_path = os.path.join('/content/drive/MyDrive/model/data', dataset_filename)

# Load the dataset
data = pd.read_csv(csv_path, delimiter=';', on_bad_lines='skip')
data = data[['comment', 'annotation']]
logger.info(f"Dataset loaded with shape: {data.shape}")

# Compute Class Weights using the normalized formula
class_counts = data['annotation'].value_counts().sort_index()
total_samples = class_counts.sum()
num_classes = len(class_counts)

# Improved inverse class weights
class_weights = total_samples / (num_classes * class_counts)
class_weights = torch.tensor(class_weights.values, dtype=torch.float32).to(device)

logger.info(f"Computed class weights: {class_weights}")

2025-01-05 00:46:24,444 - INFO - Dataset loaded with shape: (23580, 2)
2025-01-05 00:46:24,999 - INFO - Computed class weights: tensor([0.7221, 1.6258], device='cuda:0')


In [None]:
# Variable to switch betwenn full and small dataset (for tests)
use_full_dataset = 1

# Split into training (70%), validation (15%), and test (15%) datasets
if use_full_dataset == 1:

  train_data, temp_data = train_test_split(data, test_size=0.3, stratify=data['annotation'], random_state=42)
  val_data, test_data = train_test_split(temp_data, test_size=0.5, stratify=temp_data['annotation'], random_state=42)

  logger.info(f"Train size: {len(train_data)} rows | Test size: {len(test_data)} rows | Validation size: {len(val_data)} rows")

else:
  #Separate Dataset into two groups
  hate_data = data[data['annotation'] == 1]
  no_hate_data = data[data['annotation'] == 0]

  #Take n random comments out of both groups to create a balanced dataset
  subset_size_per_class = 500
  positive_subset = hate_data.sample(n=subset_size_per_class, random_state=42)
  negative_subset = no_hate_data.sample(n=subset_size_per_class, random_state=42)

  # Combine both groups into one dataset
  balanced_subset = pd.concat([positive_subset, negative_subset])

  # Shuffle and reset index
  balanced_subset = balanced_subset.sample(frac=1, random_state=42).reset_index(drop=True)

  # Verify balance
  logger.info(f"Balanced subset size: {len(balanced_subset)} rows")
  logger.info(f"Annotation distribution: {balanced_subset['annotation'].value_counts()}")

  train_size = int(0.7 * len(balanced_subset))
  val_size = int(0.15 * len(balanced_subset))

  train_data = balanced_subset[:train_size]
  val_data = balanced_subset[train_size:train_size + val_size]
  test_data = balanced_subset[train_size + val_size:]

  # Log sizes of each split
  logger.info(f"Small Train size: {len(train_data)} rows | Small Test size: {len(test_data)} rows | Small Validation size: {len(val_data)} rows")


2025-01-05 00:46:25,023 - INFO - Train size: 16506 rows | Test size: 3537 rows | Validation size: 3537 rows


In [None]:
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained('deepset/gbert-base')

# Define tokenization function
def tokenize_texts(texts, tokenizer, max_length=256): # reduced for testing
    """
    Tokenizes a list of input texts.

    :param texts: List of input text strings.
    :param tokenizer: Tokenizer to be used (e.g., BERT tokenizer).
    :param max_length: Maximum sequence length for tokenization.
    :return: Dictionary of tokenized outputs, including input IDs and attention masks.
    """
    
    return tokenizer(
        list(texts),
        truncation=True,
        padding=True,
        max_length=max_length,
        return_tensors="pt"
    )

# Tokenize train and test datasets
train_encodings = tokenize_texts(train_data['comment'], tokenizer)
val_encodings = tokenize_texts(val_data['comment'], tokenizer)
test_encodings = tokenize_texts(test_data['comment'], tokenizer)

# Define Dataset class
class HateSpeechDataset(Dataset):   
    def __init__(self, encodings, labels):
        """
        Custom dataset class for handling tokenized text and labels.
    
        :param encodings: Tokenized text data.
        :param labels: Corresponding labels for classification.
        """
        
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        """
        Retrieves a sample from the dataset.
        
        :param idx: Index of the sample to retrieve.
        :return: Dictionary containing tokenized inputs and label.
        """
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        :return: Integer count of samples.
        """
        return len(self.labels)

# Create datasets
train_dataset = HateSpeechDataset(train_encodings, train_data['annotation'].values)
val_dataset = HateSpeechDataset(val_encodings, val_data['annotation'].values)
test_dataset = HateSpeechDataset(test_encodings, test_data['annotation'].values)
logger.info(f"Training samples: {len(train_dataset)} | Test samples: {len(test_dataset)} | Validation samples: {len(val_dataset)}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/83.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/362 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/240k [00:00<?, ?B/s]

2025-01-05 00:46:32,300 - INFO - Training samples: 16506 | Test samples: 3537 | Validation samples: 3537


## Define the function for calculating metrics and training

In [None]:
def compute_metrics(eval_pred, trial=None, trainer=None):
    """
    Computes evaluation metrics for classification.

    :param eval_pred: Tuple containing predictions and labels.
    :param trial: Optional Optuna trial object for hyperparameter tuning.
    :param trainer: Optional Trainer object for tracking progress.
    :return: Dictionary containing accuracy, precision, recall, F1-score, and other metrics.
    """
    
    # metrics computation for model evaluation, WandB tracking, and Optuna pruning.
    predictions, labels = eval_pred
    preds = predictions.argmax(axis=1)  # Predictions

    # Calculate metrics
    accuracy = accuracy_score(labels, preds)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
    f2 = fbeta_score(labels, preds, beta=2, average='binary')
    mcc = matthews_corrcoef(labels, preds)
    mcc_normalized = (mcc + 1) / 2
    S = (f2 + mcc_normalized) / 2

    # Prepare metrics dictionary
    metrics = {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "f2": f2,
        "mcc": mcc,
        "mcc_normalized": mcc_normalized,
        "S": S,
    }

    # Report F2-score to Optuna and prune trial if applicable
    if trial and trainer:
        trial.report(f2, step=int(trainer.state.epoch))
        if trial.should_prune():
            raise optuna.TrialPruned()

    return metrics

In [None]:
def suggest_hyperparameters(trial):
    """
    Suggests hyperparameters for the model training using Optuna.

    :param trial: Optuna trial object used for suggesting hyperparameters.
    :return: Dictionary containing suggested hyperparameters.
    """
    
    num_epochs = trial.suggest_int("num_train_epochs", 2, 4)
    batch_size = trial.suggest_categorical("batch_size", [16, 32])

    # Compute total training steps dynamically
    num_train_samples = len(train_dataset)
    total_steps = (num_train_samples // batch_size) * num_epochs

    # Compute warmup steps as 5-10% of total_steps
    warmup_steps = int(trial.suggest_float("warmup_ratio", 0.05, 0.1) * total_steps)

    return {
        "num_train_epochs": num_epochs,
        "learning_rate": trial.suggest_float("learning_rate", 2e-5, 1e-4, log=True),
        "batch_size": batch_size,
        "weight_decay": trial.suggest_float("weight_decay", 0.01, 0.1),
        "warmup_steps": warmup_steps
    }

def create_training_arguments(params, trial_number):
    """
    Creates training arguments for the Trainer class.

    :param params: Dictionary of hyperparameters for training.
    :param trial_number: Trial number for tracking runs.
    :return: TrainingArguments object with specified settings.
    """
    
    return TrainingArguments(
        output_dir='./results',
        num_train_epochs=params["num_train_epochs"],
        per_device_train_batch_size=params["batch_size"],
        per_device_eval_batch_size=64,
        warmup_steps=params["warmup_steps"],
        weight_decay=params["weight_decay"],
        learning_rate=params["learning_rate"],
        eval_strategy="epoch",
        save_strategy="no",
        logging_dir='./logs',
        logging_steps=10,
        report_to='wandb',
        fp16=True,  # Mixed precision
        run_name=f'Trial_{trial_number}_lr_{params["learning_rate"]:.1e}_bs_{params["batch_size"]}'
    )

## Define the objective function for training, optimization and logging

Initalizes wandb run for each trial. Starts the model training and logs the metrics to the initialized wandb run. Plots confusion matrix. Returns S-Score and also tracks the best score.

In [None]:
# Initialize variables for tracking the best score and model
best_score = None
best_model_state_dict = None

# Create an empty list to store the results
all_results = []

# Function return F2 score -> thats the value that study wants to maximize
def objective(trial):
    """
    Defines the objective function for Optuna hyperparameter tuning.

    :param trial: Optuna trial object.
    :return: S score for model evaluation and hyperparameter optimization.
    """
    global best_score, best_model_state_dict

    # Create distinctive name for wandb run
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    run_name = f"BERT_Extra_Layer_Trial_{trial.number}_{timestamp}"

    # Initialize WandB with a unique run ID
    run = wandb.init(
        project="German_Hate_Speech_Classification",
        name=run_name,
        reinit=True,  # Force new run
        id=run_name,  # Use trial number as unique ID
    )

    pruned_status = 0

    try:
        # Suggest hyperparameters
        params = suggest_hyperparameters(trial)
        training_args = create_training_arguments(params, trial.number)

        logger.info(f"num_train_epochs: {training_args.num_train_epochs}")
        logger.info(f"per_device_train_batch_size: {training_args.per_device_train_batch_size}")
        logger.info(f"per_device_eval_batch_size: {training_args.per_device_eval_batch_size}")
        logger.info(f"warmup_steps: {training_args.warmup_steps}")
        logger.info(f"weight_decay: {training_args.weight_decay}")
        logger.info(f"learning_rate: {training_args.learning_rate}")

        wandb.config.update(training_args)

        # Initialize model
        model = BERT_WithExtraLayer(bert_model_name="deepset/gbert-base", num_labels=2, class_weights=class_weights)
        model.to(device)

        # Trainer
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=lambda eval_pred: compute_metrics(eval_pred, trial, trainer),
        )

        # Train and Evaluate
        trainer.train()

        # Get predictions on val_dataset
        predictions = trainer.predict(val_dataset)

        # Calculate metrics using compute_metrics function
        eval_metrics = compute_metrics((predictions.predictions, predictions.label_ids))
        logger.info(f"Trial {trial.number} - Validation Metrics: {eval_metrics}")

        # Log metrics
        wandb.log(eval_metrics)

        # Get predictions on test_dataset
        test_predictions = trainer.predict(test_dataset)
        test_metrics = compute_metrics((test_predictions.predictions, test_predictions.label_ids))
        logger.info(f"Trial {trial.number} - Test Metrics: {test_metrics}")

        wandb.log(test_metrics)

        logits, labels = test_predictions.predictions, test_predictions.label_ids
        preds = np.argmax(logits, axis=1)
        cm = confusion_matrix(labels, preds)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["No Hate-Speech", "Hate-Speech"])

        # Plot the Confusion Matrix
        plt.figure(figsize=(8, 6))
        ax = disp.plot(cmap=plt.cm.Blues).ax_  # Get the Axes object of the plot
        ax.set_yticklabels(ax.get_yticklabels(), rotation=90, va='center')
        plt.title(f"Confusion Matrix for Trial {trial.number}")

        plt.tight_layout()  # Adjust layout for better spacing
        plt.show()

        all_results.append({
            "Trial_Name": run_name,
            "Dataset": dataset_filename,
            "Pruned": pruned_status,
            **eval_metrics,
            **{f"test_{k}": v for k, v in test_metrics.items()}
        })

        current_score = eval_metrics["S"]

        # Compare and update the best model
        if best_score is None or current_score > best_score:
            best_score = current_score
            best_model_state_dict = model.state_dict()  # Save the model's state dict
            logger.info(f"New best S score: {best_score}")

        return current_score  # Return Score for Optuna

    except optuna.TrialPruned:
        logger.info(f"Trial {trial.number} pruned at epoch {trainer.state.epoch}")
        pruned_status = 1
        all_results.append({
            "Trial_Name": run_name,
            "Dataset": dataset_filename,
            "Pruned": pruned_status,  # Log pruned status
        })
        raise

    finally:
        # Ensure WandB run is closed properly
        wandb.finish()

## Ensure that wandb is terminating and session ends after training

In [None]:
def safe_wandb_finish(timeout=45):
    """
    Ensures safe termination of the WandB session with a timeout.

    :param timeout: Maximum time (in seconds) to wait for WandB termination.
    """
    
    # Safely terminates wandb.finish() with a timeout.
    def finish_task():
        try:
            #import wandb
            wandb.finish()
        except Exception as e:
            logger.error(f"Error in wandb.finish(): {e}")

    finish_thread = threading.Thread(target=finish_task)
    finish_thread.start()
    finish_thread.join(timeout=timeout)  # Wait for up to `timeout` seconds

    if finish_thread.is_alive():
        logger.warning("wandb.finish() timed out. Proceeding with runtime.unassign().")
    else:
        logger.info("wandb.finish() completed successfully.")

## Main code for execution
This code is used for objective execution, Bayes´ optimization, selecting and saving the best model as wells as logging the data. The HyperbandfPruner stops unpromising trials early.

In [None]:
if __name__ == "__main__":
  try:
      # Optuna Study. Uses MedianPruner as 'Early Stopping Method'
      study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(), pruner=optuna.pruners.MedianPruner(n_startup_trials=15, n_warmup_steps=3))

      logger.info(f"Sampler is {study.sampler.__class__.__name__}") # TPESampler is default in Optuna. uses by default 10 Trials for its startup

      study.optimize(objective, n_trials=50) # should be set to at least 10

      # Additional logging in Excel file
      excel_file_path = "/content/drive/MyDrive/model/logged_model_metrics.xlsx" # Check if the Excel file exists
      if os.path.exists(excel_file_path):
          # File exists, load and append new data
          existing_df = pd.read_excel(excel_file_path, index_col="Trial_Name")
          new_df = pd.DataFrame(all_results)
          new_df.set_index("Trial_Name", inplace=True)
          results_df = pd.concat([existing_df, new_df])  # Concatenate DataFrames
      else:
          # File doesn't exist, create new one
          results_df = pd.DataFrame(all_results)
          results_df.set_index("Trial_Name", inplace=True)

      # Save the DataFrame to the Excel file
      results_df.to_excel(excel_file_path)

      logger.info(f"Trial results saved to: {excel_file_path}")

      # Log the best hyperparameters
      best_params = study.best_params
      logger.info(f"Best Hyperparameters: {best_params}")

      timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")

      # Save the best model and tokenizer
      if best_model_state_dict is not None:
          best_model_dir = f"/content/drive/MyDrive/model/best_model_extra_Layer_{timestamp}"
          os.makedirs(best_model_dir, exist_ok=True)
          best_trial = study.best_trial
          logger.info(f"Best Trial Number: {best_trial.number}")

          # create distinct model file name
          model_filename = f"model_extra_Layer_trial_{best_trial.number}_{timestamp}.bin"

          # model_filename = f"pytorch_model_trial_{best_trial.number}_{timestamp}.pth" # alternative file format

          # Save model
          torch.save(best_model_state_dict, os.path.join(best_model_dir, model_filename)) # this passes the mode.state_dict of the best model

          # Save tokenizer
          tokenizer.save_pretrained(best_model_dir)

          logger.info(f"Best model from Trial Number {best_trial.number} saved to {best_model_dir}")

          # wandb.finish()
          safe_wandb_finish(timeout=45) # function needs to be revised again

          logger.info("Process finished and terminated")

          # Allow threads to finish
          time.sleep(5)

          runtime.unassign()

      else:
          logger.error("No best model was found. Model could not be saved. Runtime will be terminated...")
          runtime.unassign()

  except Exception as e:
        # Logging and runtime termination
        logger.error(f"An error occurred during execution: {str(e)}", exc_info=True)
        runtime.unassign()

Output hidden; open in https://colab.research.google.com to view.