<a href="https://colab.research.google.com/github/christopherormerod/AI-AES-Colab/blob/main/FullExample.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install datasets
!pip install transformers

In [None]:
import datasets
import sklearn
import pandas as pd
import numpy as np

data_id = "llm-aes/asap-7-original"
data = datasets.load_dataset(data_id)
df = pd.DataFrame(data["train"])
train, test = sklearn.model_selection.train_test_split(df, test_size=0.2, random_state=42)

In [None]:
def aes_metrics(y1, y2):
  qwk = sklearn.metrics.cohen_kappa_score(y1, y2, weights="quadratic")
  acc = sklearn.metrics.accuracy_score(y1, y2)
  smd_numerator = np.mean(y1) - np.mean(y2)
  smd_denominator = np.sqrt((np.std(y1)**2 + np.std(y2)**2)/2)
  smd = smd_numerator / smd_denominator
  return {"QWK": qwk, "Acc": acc, "SMD":smd}

In [None]:
# Import necessary libraries
from transformers import AutoModelForSequenceClassification, AutoTokenizer  # For loading pre-trained models and tokenizers
import torch  # PyTorch library for tensors and neural networks
from tqdm.notebook import tqdm, trange  # For displaying progress bars

class FineTunedEssayScorer:
    """
    A class to fine-tune and use a transformer model for automated essay scoring.

    This class wraps the process of loading a pre-trained model, fine-tuning it
    on a specific essay scoring task (as a classification problem), and using it
    for inference (scoring new essays).
    """

    def __init__(self, model_id, max_score, min_score):
        """
        Initializes the essay scorer.

        Args:
            model_id (str): The identifier of the pre-trained model from the Hugging Face Hub (e.g., "bert-base-uncased").
            max_score (int): The maximum possible score for the essays (e.g., 6).
            min_score (int): The minimum possible score for the essays (e.g., 1).
        """
        self.max_score = max_score
        self.min_score = min_score

        # Calculate the number of distinct score points (labels)
        num_labels = max_score - min_score + 1

        # Load the pre-trained sequence classification model with the correct number of output labels
        self.classifier = AutoModelForSequenceClassification.from_pretrained(model_id, num_labels=num_labels)

        # Load the corresponding tokenizer for the pre-trained model
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)

        # Check if a GPU is available and move the model to the GPU if so
        if torch.cuda.is_available():
            self.classifier.cuda()

    def train(self, X, y, epochs=10, dev_size=0.15):
        """
        Fine-tunes the model on the provided training data.

        Note: This implementation uses a batch size of 1, which is highly inefficient.
        Consider modifying this to process data in batches for faster training.

        Args:
            X (list[str]): A list of all essay texts (strings).
            y (list[int]): A list of all corresponding integer scores.
            epochs (int, optional): The number of training epochs. Defaults to 10.
            dev_size (float, optional): The proportion of the dataset to use as a validation (dev) set. Defaults to 0.15.
        """

        # --- MODIFICATION: Split data into training and validation (dev) sets ---
        X_train, X_dev, y_train, y_dev = train_test_split(
            X, y, test_size=dev_size, random_state=42  # random_state for reproducibility
        )

        print(f"Total samples: {len(X)}, Training samples: {len(X_train)}, Validation samples: {len(X_dev)}")

        # Set the model to training mode (enables dropout, etc.)
        self.classifier.train()

        # Initialize the AdamW optimizer (common for transformers)
        optimizer = torch.optim.AdamW(self.classifier.parameters(), lr=5e-5)

        N = len(X_train)  # Total number of training samples

        # --- BUG FIX: Scheduler total_iters should be total steps ---
        # Initialize a learning rate scheduler to decay the LR over time
        scheduler = torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=1.0, end_factor=0.0, total_iters=epochs * N)

        # Store the initial model state as the "best" state
        best_state = self.classifier.state_dict()
        best_score = -1  # Initialize best QWK score

        print(f"Starting training for {epochs} epochs...")
        for e in range(epochs):
            # Loop over each training sample individually (batch size = 1)
            # --- MODIFICATION: Use X_train, y_train ---
            for i in tqdm(range(N), desc=f"Epoch {e+1}/{epochs}"):
                # Clear previous gradients
                optimizer.zero_grad()

                # Tokenize the current training essay text
                X_batch = self.tokenizer(X_train[i], # Use X_train
                                         return_tensors='pt',
                                         padding="max_length",
                                         truncation=True,
                                         max_length=512).to(self.classifier.device)

                # Adjust the label to be zero-indexed (e.g., score 1 -> label 0)
                y_batch = y_train[i] - self.min_score # Use y_train

                # Perform a forward pass and compute the loss
                outputs = self.classifier(**X_batch, labels=torch.tensor([y_batch]).to(self.classifier.device))

                # Get the loss
                loss = outputs.loss

                # Clip gradients to prevent exploding gradients
                torch.nn.utils.clip_grad_norm_(self.classifier.parameters(), 1.0)

                # Perform a backward pass to compute gradients
                loss.backward()

                # Update model parameters
                optimizer.step()

                # Update the learning rate
                scheduler.step()

            print("Evaluating model on dev set...")
            metrics = self.evaluate(X_dev, y_dev) # Changed from (X, y) to (X_dev, y_dev)

            print(f"Epoch {e+1} Validation Metrics: {metrics}")

            # Check if the current model is the best one based on QWK
            if metrics["QWK"] > best_score:
                print(f"New best score: {metrics['QWK']:.4f}")
                self.best_state = self.classifier.state_dict()  # Save the model's weights
                best_score = metrics["QWK"]

        # After all epochs, load the weights of the best-performing model
        print(f"Training complete. Loading best model state with QWK: {best_score:.4f}")
        self.classifier.load_state_dict(self.best_state)

    def evaluate(self, X, y):
        """
        Evaluates the model on a given dataset.

        Args:
            X (list[str]): A list of essay texts.
            y (list[int]): A list of corresponding true scores.

        Returns:
            dict: A dictionary of evaluation metrics (e.g., {"QWK": 0.85}).
        """
        # Set the model to evaluation mode (disables dropout, etc.)
        self.classifier.eval()

        # Get the model's predicted scores for the essays
        scores = self.score(X)

        # Compute and return metrics (assumes 'aes_metrics' function is defined)
        return aes_metrics(y, scores)

    def score(self, X):
        """
        Generates scores for a list of new essays (inference).

        Args:
            X (list[str]): A list of essay texts to score.

        Returns:
            list[int]: A list of predicted integer scores.
        """
        # Set the model to evaluation mode
        self.classifier.eval()

        scores = []  # List to hold the predicted scores

        # Disable gradient calculations to save memory and computation
        with torch.no_grad():
            # Process each essay one by one
            for X_batch in tqdm(X, desc="Scoring"):
                # Tokenize the essay
                X_batch = self.tokenizer(X_batch,
                                         return_tensors='pt',
                                         padding="max_length",
                                         truncation=True,
                                         max_length=512).to(self.classifier.device)

                # Perform a forward pass (inference)
                outputs = self.classifier(**X_batch)

                # Get the raw output logits (scores for each class)
                predicted_label = int(outputs.logits.cpu().argmax(dim=1))

                # Convert the zero-indexed label back to the original score
                predicted_score = predicted_label + self.min_score

                scores.append(predicted_score)

        return scores

    def save(self, path):
        """
        Saves the fine-tuned model and tokenizer to a directory.

        Args:
            path (str): The directory path to save the model and tokenizer.
        """
        print(f"Saving model and tokenizer to {path}")
        # Save the model's weights and configuration
        self.classifier.save_pretrained(path)
        # Save the tokenizer's vocabulary and configuration
        self.tokenizer.save_pretrained(path)

    def load(self, path):
        """
        Loads a fine-tuned model and tokenizer from a directory.

        Args:
            path (str): The directory path to load from.
        """
        print(f"Loading model and tokenizer from {path}")
        # Load the saved model
        self.classifier = AutoModelForSequenceClassification.from_pretrained(path)
        # Load the saved tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(path)

        # Move the loaded model to the GPU if available
        if torch.cuda.is_available():
            self.classifier.cuda()

In [None]:
Pete = FineTunedEssayScorer("google/electra-small-discriminator", max_score = max(train['rater1_domain1']), min_score = min(train['rater1_domain1']))
Pete.train(list(train['essay']), list(train['rater1_domain1']), epochs=4)