In [None]:
import torch
import pandas as pd

from datasets import Dataset

from sklearn.metrics import (
    accuracy_score, 
    precision_recall_fscore_support)

from transformers import (
    DistilBertTokenizerFast,        
    DistilBertForSequenceClassification,  
    Trainer,                     
    TrainingArguments,
    EarlyStoppingCallback
)

from peft import get_peft_model, PromptTuningConfig, TaskType

import optuna


In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MAX_LENGTH = 128
NUM_LABELS = 3
MODEL_NAME = 'distilbert-base-uncased'
main_path = '' # Path to main directory
train_path = f'{main_path}data\\cleaned\\train.csv'
test_path = f'{main_path}data\\cleaned\\test.csv'

# Get Data

In [None]:
def load_data(train_path: str, test_path: str) -> tuple[Dataset, Dataset]:

    train_df, test_df = pd.read_csv(train_path), pd.read_csv(test_path)
    return Dataset.from_pandas(train_df), Dataset.from_pandas(test_df)

In [None]:
def add_prompt(examples, prompt: str)->dict:
    examples['post'] = [prompt + post for post in examples['post']]
    return examples

In [None]:
def tokenize_data(tokenizer, dataset: Dataset, prompt: str = None) -> Dataset:
    
    if prompt:
        dataset = dataset.map(lambda examples: add_prompt(examples, prompt), batched=True)
    
    def tokenize(examples):
        return tokenizer(examples['post'], padding='max_length', truncation=True, max_length=128)
    
    dataset = dataset.map(tokenize, batched=True)
    dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
    
    return dataset

In [None]:
def get_datasets(train_path: str, 
                 test_path: str, 
                 tokenizer, 
                 prompt: str = None)->(Dataset, Dataset):
    
    train_dataset, val_dataset = load_data(train_path, test_path)
    train_dataset = tokenize_data(tokenizer, train_dataset, prompt)
    val_dataset = tokenize_data(tokenizer, val_dataset, prompt)
    
    return train_dataset, val_dataset

# Get model

In [None]:
def freeze_all_layers(model):
    for param in model.parameters():
        param.requires_grad = False
    return model

In [None]:
def unfreeze_specific_layers(model, parameters_to_unfreeze: list = None):
            
    if parameters_to_unfreeze is None:
        parameters_to_unfreeze = ['classifier.bias', 
                                  'classifier.weight', 
                                  'pre_classifier.bias',
                                  'pre_classifier.weight']
        
    for name, param in model.named_parameters():
        if any([name.startswith(param_name) for param_name in parameters_to_unfreeze]):
            param.requires_grad = True
            
    return model

In [None]:
def get_model(freeze_all: bool = False, unfreeze_specific: bool = False, parameters_to_unfreeze: list = None):
    model = DistilBertForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS)
    tokenizer = DistilBertTokenizerFast.from_pretrained(MODEL_NAME)
    
    if freeze_all:
        model = freeze_all_layers(model)
    
    if unfreeze_specific:
        model = unfreeze_specific_layers(model, parameters_to_unfreeze)
    
    model.to(DEVICE)
    
    return model, tokenizer

# Evaluation Metrics

In [None]:
def compute_metrics(pred)->dict:
    
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, 
                                                               preds, 
                                                               average='weighted', 
                                                               zero_division=0)
    acc = accuracy_score(labels, preds)
    return {'accuracy': acc, 
            'f1': f1, 
            'precision': precision, 
            'recall': recall}

In [None]:
def print_metrics(metrics: dict):
    print(f"Accuracy: {metrics['eval_accuracy']:.4f}")
    print(f"F1: {metrics['eval_f1']:.4f}")
    print(f"Precision: {metrics['eval_precision']:.4f}")
    print(f"Recall: {metrics['eval_recall']:.4f}")

# Fine-tuning

## Layer Unfreezing

In [None]:
def fine_tune_with_layer_unfreezing():
    
    model, tokenizer = get_model(freeze_all=True, unfreeze_specific=True)
    
    train_dataset, val_dataset = get_datasets(train_path, test_path, tokenizer)
    
    training_args = TrainingArguments(
        output_dir=f'{main_path}results\\unfrozen_layers',
        do_train=True,
        do_eval=True,
        num_train_epochs=10,
        logging_dir=f'{main_path}logs\\unfrozen_layers',
        logging_steps=400,
        eval_strategy='epoch',
        save_strategy='epoch',
        seed=42,
        load_best_model_at_end=True,
    )
    
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
    )
    
    print("Fine-tuning model...")
    tr_metrics=trainer.train()
    print("Training metrics:")
    print(tr_metrics)
    
    print("Evaluating model...")
    eval_results = trainer.evaluate()
    
    print("Evaluation results:")
    print_metrics(eval_results)
    
    return model, tokenizer

## Hyperparameter Search

In [None]:
class CustomTrainer(Trainer):
    def __init__(self, trial, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.trial = trial

    def training_step(self, model, inputs):

        loss = super().training_step(model, inputs)

        self.trial.report(loss.item(), step=self.state.global_step)

        if self.trial.should_prune():
            raise optuna.TrialPruned()

        return loss

In [None]:
def hyperparameter_search():
    
    model, tokenizer = get_model(freeze_all=True, unfreeze_specific=True)
    
    train_dataset, val_dataset = get_datasets(train_path, test_path, tokenizer)
    
    def objective(trial):
        num_train_epochs = trial.suggest_int('num_train_epochs', 2, 15)
        per_device_train_batch_size = trial.suggest_categorical('per_device_train_batch_size', [8, 16, 32, 64, 128])
        learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 0.1)
        warmup_steps = trial.suggest_int('warmup_steps', 0, 500)
        weight_decay = trial.suggest_loguniform('weight_decay', 1e-6, 1e-2)

        training_args = TrainingArguments(
            output_dir=f'{main_path}results\\hyperparam_tuning',
            num_train_epochs=num_train_epochs,
            per_device_train_batch_size=per_device_train_batch_size,
            per_device_eval_batch_size=64,
            logging_dir=f'{main_path}logs\\hyperparam_tuning',
            logging_steps=400,
            learning_rate=learning_rate,
            lr_scheduler_type='linear',
            warmup_steps=warmup_steps,
            weight_decay=weight_decay,
            eval_strategy='epoch',
            save_strategy='epoch',
            seed=42,
            load_best_model_at_end=True,
            metric_for_best_model='accuracy'
        )

        trainer = CustomTrainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=compute_metrics,
            trial=trial
        )
    
        
        trainer.train()
        eval_results = trainer.evaluate()
        return eval_results['eval_accuracy']

    study = optuna.create_study(study_name='hyperparam_tuning',
                                sampler=optuna.samplers.TPESampler(seed=42),
                                direction='maximize',
                                pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=5, interval_steps=1))
    
    study.optimize(objective, n_trials=40)
    best_params = study.best_params
    
    print("Best hyperparameters:")
    print(best_params)
    
    return best_params, model, tokenizer



## Prompt Tuning

In [None]:
def prompt_tuning(best_params: dict, prompts: list):
    
    all_metrics = {}
    best_model = None
    best_tokenizer = None
    max_accuracy = 0
    best_prompt = None
    
    for prompt in prompts:
        print(f"Using prompt: {prompt}")
        
        model, tokenizer = get_model(freeze_all=True)
        
        train_dataset, val_dataset = get_datasets(train_path, test_path, tokenizer, prompt)
        
        config = PromptTuningConfig(
            peft_type='PROMPT_TUNING',
            task_type=TaskType.SEQ_CLS,
            num_virtual_tokens=15, 
            num_transformer_submodules=1,
            num_attention_heads=model.config.num_attention_heads,
            num_layers=model.config.num_hidden_layers,
            token_dim=model.config.dim,
            prompt_tuning_init='TEXT',
            prompt_tuning_init_text="Predict if sentiment of this review is positive, negative, or neutral.",
            tokenizer_name_or_path=MODEL_NAME
        )
        
        peft_model = get_peft_model(model, config)
        
        training_args = TrainingArguments(
            output_dir=f'{main_path}results\\prompt_tuning_{prompts.index(prompt)}',
            num_train_epochs=best_params['num_train_epochs'],
            per_device_train_batch_size=best_params['per_device_train_batch_size'],
            per_device_eval_batch_size=64,
            logging_dir= f'{main_path}logs\\prompt_tuning_{prompts.index(prompt)}',
            logging_steps=400,
            learning_rate=best_params['learning_rate'],
            lr_scheduler_type='linear',
            warmup_steps=best_params['warmup_steps'],
            weight_decay=best_params['weight_decay'],
            eval_strategy='epoch',
            save_strategy='epoch',
            seed=42
        )
        
        trainer = Trainer(
            model=peft_model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=compute_metrics
        )

        print(f"Starting prompt tuning with current prompt: {prompts.index(prompt)}")
        tr_metrics = trainer.train()
        print("Training metrics:")
        print(tr_metrics)
        
        print("Evaluating model...")
        eval_results = trainer.evaluate()
        print("Evaluation results:")
        print_metrics(eval_results)

        all_metrics[prompt] = eval_results
        
        if eval_results['eval_accuracy'] > max_accuracy:
            max_accuracy = eval_results['eval_accuracy']
            best_model = peft_model
            best_tokenizer = tokenizer
            best_prompt = prompt
            

    print("\nSummary of evaluation metrics for all prompts:")
    for prompt, metrics in all_metrics.items():
        print(f"\nPrompt: {prompt}")
        print_metrics(metrics)
    
    return all_metrics, best_model, best_tokenizer, best_prompt

# Main

### Layer Unfreezing

In [None]:
model, tokenizer = fine_tune_with_layer_unfreezing()

print("\nSaving model...")
model.save_pretrained(f'{main_path}models/unfrozen_layers')
tokenizer.save_pretrained(f'{main_path}models/unfrozen_layers')

### Hyperparameter Tuning

In [None]:
%%time

best_params, model, tokenizer = hyperparameter_search()

print("\nSaving model...")
model.save_pretrained(f'{main_path}models/hyperparam_tuning')
tokenizer.save_pretrained(f'{main_path}models/hyperparam_tuning')

### Prompt Tuning

In [None]:
'''
best_params = {'num_train_epochs': 8, 
               'per_device_train_batch_size': 16, 
               'learning_rate': 0.0006672367170464204, 
               'warmup_steps': 393, 
               'weight_decay': 6.290644294586145e-06}
'''

In [None]:
prompts = [
    "The sentiment of this review is:", 
    "This tweet expresses a sentiment that is:",
    "Sentiment classification of this message:",
]

metrics, model, tokenizer, prompt = prompt_tuning(best_params, prompts)
print(metrics)

print("\nSaving model...")
model.save_pretrained(f'{main_path}models/prompt_tuning')
tokenizer.save_pretrained(f'{main_path}models/prompt_tuning')