# Install and Import Requirements

In [None]:
!pip install -q peft
!pip install -q peft evaluate datasets

In [None]:
import pandas as pd
import numpy as np

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

from transformers import BertTokenizer, BertModel, DataCollatorWithPadding, BertForSequenceClassification, AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, EarlyStoppingCallback

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix

import matplotlib.pyplot as plt
from datasets import DatasetDict, Dataset

import evaluate
from peft import LoraConfig, TaskType, get_peft_model

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

df = pd.read_csv("/content/final_dataset.csv") # can be downloaded from GitHub repo, contains 9.5k samples

# load tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

# create datasets for training, validation, and testing

temp_df, test_df = train_test_split(df, test_size=0.2, stratify=df["label"], random_state=42)
train_df, val_df = train_test_split(temp_df, test_size=0.125, stratify=temp_df["label"], random_state=73) # not sure if this split is right

# create datasets from dataframes
train_dataset = Dataset.from_pandas(train_df[['input', 'label']].reset_index(drop=True))
val_dataset = Dataset.from_pandas(val_df[['input', 'label']].reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df[['input', 'label']].reset_index(drop=True))

def tokenize_func(examples):
  return tokenizer(examples["input"], truncation=True, max_length=128, padding="max_length")

tokenized_train = train_dataset.map(tokenize_func, batched=True, remove_columns="input")
tokenized_val = val_dataset.map(tokenize_func, batched=True, remove_columns="input")
tokenized_test = test_dataset.map(tokenize_func, batched=True, remove_columns="input")

# Evaluation Setup

In [None]:
metric = evaluate.load("accuracy")
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load("recall")
f1_metric = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    # used claude to write code to calculate multiple training metrics
    accuracy = metric.compute(predictions=predictions, references=labels)
    precision = precision_metric.compute(predictions=predictions, references=labels, average='binary')
    recall = recall_metric.compute(predictions=predictions, references=labels, average='binary')
    f1 = f1_metric.compute(predictions=predictions, references=labels, average='binary')

    tn, fp, fn, tp = confusion_matrix(labels, predictions).ravel()

    return {
        'accuracy': accuracy['accuracy'],
        'precision': precision['precision'],
        'recall': recall['recall'],
        'f1': f1['f1'],
        # for printed confusion matrix
        'true_positives': tp,
        'false_positives': fp,
        'false_negatives': fn,
        'true_negatives': tn,
    }

# Training Setup

In [None]:
# define lora config (from hyperparameter grid search)
lora_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,
    r=8,
    lora_alpha=32,
    lora_dropout=0.2,
    target_modules=["query", "key", "value"],
    bias="none"
)

# weighted trainer code from Claude to improve model training w/ class imbalance
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        # Calculate class weights
        n_samples = len(train_df)
        n_positive = train_df['label'].sum()
        n_negative = n_samples - n_positive
        pos_weight = n_negative / n_positive * 1.3

        # Fix: Use .float() to ensure float32
        loss_fct = nn.CrossEntropyLoss(
            weight=torch.tensor([1.0, pos_weight], dtype=torch.float32).to(logits.device)
        )
        loss = loss_fct(logits.view(-1, 2), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [None]:
# function to train model
def train_full_model(number, train_dataset, val_dataset, tokenizer, lora_config):
  # define model
  model = BertForSequenceClassification.from_pretrained('bert-base-cased', num_labels=2)
  model = get_peft_model(model, lora_config)


  training_args = TrainingArguments(
      output_dir=f"./model_train_f_{number}",
      eval_strategy="epoch",
      save_strategy="epoch",
      save_total_limit=None, # currently saves models from every epoch
      num_train_epochs=10,
      per_device_train_batch_size=16,
      per_device_eval_batch_size=32,
      learning_rate=1e-4,

      # claude code to improve model training by decreasing memorization and stabilizing learning
      weight_decay=0.01,
      warmup_ratio=0.1,

      # claude code to prevent unecessary logging
      report_to=[],
      logging_strategy="no",

      load_best_model_at_end=True,
      metric_for_best_model="recall",
      greater_is_better=True
  )

  # claude code to implement early stopping
  early_stopping = EarlyStoppingCallback(
      early_stopping_patience=3
  )


  # create trainer from weighted trainer class
  trainer = WeightedTrainer(
      model=model,
      args=training_args,
      train_dataset=train_dataset,
      eval_dataset=val_dataset,
      compute_metrics=compute_metrics,
      tokenizer=tokenizer,
      callbacks=[early_stopping],
  )

  # train
  trainer.train()

  # final val metrics
  eval_results = trainer.evaluate()

  model.save_pretrained(f"./final_model_{number}") # for later reference

  return model

# Model Training

In [None]:
train_full_model(1, tokenized_train, tokenized_val, tokenizer, lora_config)