**Note:**

This jupyter notebook used Google Colab and Hugging Face.

To use this notebook,
- you may need to update the file path on "UPDATE_TO_YOUR_FILE_PATH"
- you need to get access token from your Hugging Face account and add it to your Google Colab as a secret named "HF_TOKEN"

# Import Modules & Setup

In [None]:
!pip install optuna

In [5]:
import pandas as pd
import numpy as np
import optuna
import torch
from torch import nn
import gc
import os
from pathlib import Path

from sklearn.model_selection import StratifiedKFold
from transformers import (
    RobertaConfig,
    RobertaTokenizer,
    RobertaForSequenceClassification,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback
)
from sklearn.metrics import (
    precision_recall_fscore_support,
    accuracy_score,
    f1_score,
    classification_report,
)
import logging
from google.colab import userdata

In [3]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from googleapiclient.discovery import build
from google.colab import auth

# Authenticate
auth.authenticate_user()

# Create the Drive API service
service = build('drive', 'v3')

# Import Data

In [None]:
def prepare_data(file_path):
    """Load and prepare the data"""
    try:
        # Load dataset
        df = pd.read_csv(file_path)

        # Combine title and comment text
        df['combined_text'] = df['title'].fillna('') + ' [SEP] ' + df['comment_text'].fillna('')

        # Convert labels to proper format (-1 → 0, 0 → 1, 1 → 2)
        df['labels'] = df['comment_consensus'].map({-1: 0, 0: 1, 1: 2})

        # Remove any rows with NaN values
        df = df.dropna(subset=['combined_text', 'labels'])

        # Print dataset statistics
        print("\nDataset Statistics:")
        print(f"Total samples: {len(df)}")
        print("Label distribution:")
        label_distribution = df['labels'].value_counts().sort_index()
        print(label_distribution)

        return df['combined_text'], df['labels'], label_distribution

    except Exception as e:
        logger.error(f"Error preparing data: {str(e)}")
        raise

In [None]:
# Load and prepare data
dataset_path = '/content/drive/MyDrive/SP2024-GPT/dataset/ground_truth_hn_ai_comment.csv' # UPDATE_TO_YOUR_FILE_PATH
X, y, label_distribution = prepare_data(dataset_path)


Dataset Statistics:
Total samples: 385
Label distribution:
labels
0     72
1    237
2     76
Name: count, dtype: int64


# Training Setup

In [10]:
MAX_LENGTH = 512
MODEL = 'FacebookAI/roberta-base'
SEED = 42
N_SPLITS = 5
NUM_LABELS = len(set(y))
print(NUM_LABELS)

3


In [None]:
tokenizer = RobertaTokenizer.from_pretrained(MODEL, use_auth_token=userdata.get('HF_TOKEN'))

In [12]:
def tokenize_function(texts):
    return tokenizer(texts, padding='max_length',
                     truncation=True,
                     max_length=MAX_LENGTH,
                    )

In [13]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [None]:
# For experiment, not used
n_samples = label_distribution.sum()
n_classes = len(label_distribution)
class_weights = torch.tensor([n_samples / (n_classes * label_distribution[i]) for i in range(n_classes)])

In [None]:
# For experiment, not used
class CustomTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights.float().to(self.args.device) if class_weights is not None else None

    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.get('logits')

        logits = logits.float()
        labels = labels.long()

        # compute custom loss
        if self.class_weights is not None:
            loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
        else:
            loss_fct = nn.CrossEntropyLoss()

        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))

        return (loss, outputs) if return_outputs else loss

In [14]:
# Compute metric
def compute_metrics(y_true, y_preds, labels=[0,1,2]):
    precision, recall, f1, support = precision_recall_fscore_support(y_true, y_preds, average=None)
    acc = accuracy_score(y_true, y_preds)
    weighted_f1 = f1_score(y_true, y_preds, labels=labels, average="weighted")
    return {
        "accuracy": acc,
        "weighted_f1": weighted_f1,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "support": support
    }

In [None]:
# UPDATE_TO_YOUR_FILE_PATH
def get_first_checkpoint_folder(directory="/content/drive/MyDrive/SP2024-GPT/models/roberta/"):
    # Get all directories that start with "checkpoint-"
    checkpoint_folders = [d for d in os.listdir(directory)
                         if os.path.isdir(os.path.join(directory, d))
                         and d.startswith("checkpoint-")]

    if not checkpoint_folders:
        return None

    # Return the first item when sorted alphabetically
    return directory + sorted(checkpoint_folders)[0]

In [16]:
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)

In [None]:
def train_model(X, y, trial, model_params):
    try:
        _itr_metrics = []
        _cross_val_f1 = []
        _cross_val_loss = []
        for fold, (train_index, val_index) in enumerate(skf.split(X.to_list(), y.to_list())):
            print(f"Fold {fold + 1}/{N_SPLITS}")

            # Split data into training and validation sets
            X_train, X_val = X.iloc[train_index], X.iloc[val_index]
            y_train, y_val = y.iloc[train_index], y.iloc[val_index]

            # Tokenize and create dataset
            train_encodings = tokenize_function(X_train.tolist())
            val_encodings = tokenize_function(X_val.tolist())

            train_dataset = MyDataset(train_encodings, y_train.tolist())
            val_dataset = MyDataset(val_encodings, y_val.tolist())

            # Load model from pretrained
            model = RobertaForSequenceClassification.from_pretrained(
                    MODEL,
                    num_labels=NUM_LABELS,
                    use_auth_token=userdata.get('HF_TOKEN')
                    ).to(device)
            config = RobertaConfig.from_pretrained(MODEL)
            config.hidden_dropout_prob = model_params['hidden_dropout_prob']
            config.attention_probs_dropout_prob = model_params['attention_probs_dropout_prob']

            # Training Arguments
            training_args = TrainingArguments(
                output_dir=model_params['output_dir'],
                eval_strategy="epoch",
                per_device_train_batch_size=model_params['batch_size'],
                per_device_eval_batch_size=model_params['batch_size'],
                learning_rate=model_params['learning_rate'],
                weight_decay=model_params['weight_decay'],
                max_grad_norm=model_params['max_grad_norm'],
                num_train_epochs=model_params['num_epochs'],
                warmup_ratio=model_params['warmup_ratio'],
                save_strategy="epoch",
                logging_dir=model_params['logging_dir'],
                logging_strategy="epoch",
                seed=SEED,
                save_total_limit=1,
                load_best_model_at_end=True,
                metric_for_best_model="eval_loss",
                greater_is_better=False,
                label_smoothing_factor=model_params['label_smoothing_factor'],
                optim=model_params['optimizer'],
                report_to="none",
                save_only_model=True
            )

            # Initialize and trian your model
            trainer = Trainer(
                model=model,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=val_dataset,
                # class_weights=class_weights,
                callbacks=[EarlyStoppingCallback(early_stopping_patience=3, early_stopping_threshold=1e-4)]
            )

            # Train the model
            trainer.train()

            # Empty the trash
            service.files().emptyTrash().execute()

            # Evaluate the model on the validation set
            val_score = trainer.evaluate(val_dataset)
            _cross_val_loss.append(val_score["eval_loss"])
            print(f'Validation Score fold-{fold+1}: {val_score["eval_loss"]:.4f}')
            cross_val_loss[fold+1].append(val_score["eval_loss"])

            # Make prediction on validation
            trainer.model = trainer.model.from_pretrained(get_first_checkpoint_folder(model_params['output_dir'])).to(device)
            val_preds_raw, val_labels , _ = trainer.predict(val_dataset)
            val_preds = np.argmax(val_preds_raw, axis=-1)


            # Compute & keep metrics
            metrics = compute_metrics(val_labels, val_preds)
            metrics['fold'] = fold + 1
            _itr_metrics.append(metrics)
            itr_metrics.append(metrics)
            print(classification_report(val_labels, val_preds, digits=3))
            intermediate_value = metrics['weighted_f1']
            print(f'Intermediate weighted f1: {intermediate_value:.4f}')
            _cross_val_f1.append(intermediate_value)
            cross_val_f1[fold+1].append(intermediate_value)

            # Save intermediate values to file
            loss_df.loc[len(loss_df.index)] = [trial.number, fold+1, val_score["eval_loss"], intermediate_value]
            fname = '/content/drive/MyDrive/SP2024-GPT/results/roberta_intermediate.csv' # UPDATE_TO_YOUR_FILE_PATH
            loss_df.to_csv(fname, index=False)

            # Report intermediate objective value
            trial.report(intermediate_value, fold)

            # Clean up GPU memory
            del model
            del trainer
            torch.cuda.empty_cache()
            gc.collect()

            if trial.should_prune():
                raise optuna.TrialPruned()

        avg_cross_val_f1 = np.mean(_cross_val_f1)
        avg_cross_val_loss = np.mean(_cross_val_loss)
        print(f'Average weighted f1 in 5-fold: {avg_cross_val_f1:.4f}')
        print(f'Average eval loss in 5-fold: {avg_cross_val_loss:.4f}')
        # Save average to file
        avg_score_df.loc[len(loss_df.index)] = [trial.number, avg_cross_val_loss, avg_cross_val_f1]
        fname = '/content/drive/MyDrive/SP2024-GPT/results/roberta_avg.csv' # UPDATE_TO_YOUR_FILE_PATH
        avg_score_df.to_csv(fname, index=False)
        return avg_cross_val_f1, avg_cross_val_loss, _itr_metrics

    except Exception as e:
        logger.error(f"Training failed: {str(e)}")
        raise

In [None]:
def objective(trial):
    # Model parameters
    model_params = {
        'hidden_dropout_prob': trial.suggest_float('hidden_dropout_prob', 0.15, 0.35, step=0.01),
        'attention_probs_dropout_prob': trial.suggest_float('attention_probs_dropout_prob', 0.1, 0.35, step=0.01),
        'output_dir': '/content/drive/MyDrive/SP2024-GPT/models/roberta/', # UPDATE_TO_YOUR_FILE_PATH
        'batch_size': trial.suggest_categorical('batch_size', [4, 8, 16]),
        'learning_rate': trial.suggest_float('lr', 9e-6, 4e-5, step=1e-7),
        'weight_decay': trial.suggest_float('weight_decay', 0.05, 0.15, step=0.01),
        'max_grad_norm': trial.suggest_float('max_grad_norm', 0.5, 1.0, step=0.01),
        'num_epochs': 15,
        'warmup_ratio': trial.suggest_float('warmup_ratio', 0.1, 0.3, step=0.01),
        'label_smoothing_factor': trial.suggest_float('label_smoothing_factor', 0.05, 0.22, step=0.01),
        'logging_dir': '/content/drive/MyDrive/SP2024-GPT/logs/roberta/', # UPDATE_TO_YOUR_FILE_PATH
        'optimizer': 'adamw_torch'
    }

    # Train model
    # X, y from prepare_dataset
    avg_cross_val_f1, avg_cross_val_loss, itr_metric = train_model(X, y, trial, model_params)

    return avg_cross_val_f1

In [20]:
def create_dict(n):
    return {i: [] for i in range(1, n+1)}

In [21]:
# Check CUDA availability and set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


# Training

In [None]:
itr_metrics = []
cross_val_f1 = create_dict(N_SPLITS)
cross_val_loss = create_dict(N_SPLITS)
print(cross_val_f1)

loss_df = pd.DataFrame({'trial': [], 'fold': [], 'loss': [], 'weighted-f1': []})
avg_score_df = pd.DataFrame({'trial': [], 'avg_loss': [], 'avg_f1': []})

pruner = optuna.pruners.HyperbandPruner(
            min_resource=1,
            max_resource=5,
            reduction_factor=2
        )
sampler = optuna.samplers.TPESampler(seed=SEED)
study = optuna.create_study(direction='maximize', sampler=sampler, pruner=pruner)
study.optimize(objective, n_trials=50)

# Result

In [None]:
print("best params")
print(study.best_params)

In [None]:
optuna.visualization.plot_optimization_history(study)

In [None]:
optuna.visualization.plot_parallel_coordinate(study)

In [None]:
optuna.visualization.plot_param_importances(study)