# Educational Reading Level Classification

## Import Libraries

In [None]:
!pip install transformers datasets evaluate accelerate

In [None]:
import datasets, evaluate, accelerate
import random
import numpy as np
import pandas as pd
import torch
import transformers
from transformers import AutoTokenizer
from datasets import Dataset

## Load Data

In [None]:
# Load fiction and nonfiction data
all_fiction_data = pd.read_csv("data/fiction.csv")
all_nonfiction_data = pd.read_csv("data/nonfiction.csv")

In [None]:
# Shuffle data randomly
all_fiction_data = all_fiction_data.sample(frac=1)
all_nonfiction_data = all_nonfiction_data.sample(frac=1)

## Clean Data

In [None]:
# Rename columns for consistency
all_fiction_data = all_fiction_data.rename(columns={"passage": "text",
                                                    "reading_level": "labels"})

In [None]:
# Check unique classes
all_fiction_data['labels'].unique()

In [None]:
# Combine classes that are the same
map_levels = {
    'Middle ': 'Middle',
    'High ': 'High'
}

all_fiction_data['labels'] = all_fiction_data['labels'].replace(map_levels)

In [None]:
# Check amount of data in each class
all_fiction_data['labels'].value_counts()

In [None]:
# View data
all_fiction_data

In [None]:
# Convert to Hugging Face Dataset
all_fiction_data = Dataset.from_pandas(all_fiction_data)
all_nonfiction_data = Dataset.from_pandas(all_nonfiction_data)

## Prepare Data for Classification

In [None]:
id2label = {0: "Elementary", 1: "Middle", 2: "High"}
label2id = {"Elementary": 0, "Middle": 1, "High": 2}

In [None]:
all_fiction_data

In [None]:
# Turn labels into 0, 1, or 2
def map_labels_to_number(example):
  example["labels"] = label2id[example["labels"]]
  return example

fiction_data = all_fiction_data.map(map_labels_to_number)

In [None]:
# Split data into training and test sets
fiction_data = fiction_data.train_test_split(test_size=0.2, seed=42)

In [None]:
fiction_data

## Tokenize Data

In [None]:
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path="roberta-large",
                                          use_fast=True)

tokenizer

In [None]:
def tokenize_text(examples):
    """
    Tokenize given example text and return the tokenized text.
    """
    return tokenizer(examples["text"],
                     padding=True,
                     truncation=True)

def tokenize(batch):
    return tokenizer(
        batch["text"],
        truncation=True,
        padding="max_length",
        max_length=256
    )

# different tokenizer for RoBERTa
tokenized_dataset = fiction_data.map(tokenize, batched=True)

In [None]:
# # Map tokenize_text function to the dataset
# tokenized_dataset = fiction_data.map(function=tokenize_text,
#                               batched=True,
#                               batch_size=1000)

# tokenized_dataset

In [None]:
# Get two samples from the tokenized dataset
train_tokenized_sample = tokenized_dataset["train"][0]
test_tokenized_sample = tokenized_dataset["test"][0]

for key in train_tokenized_sample.keys():
    print(f"[INFO] Key: {key}")
    print(f"Train sample: {train_tokenized_sample[key]}")
    print(f"Test sample: {test_tokenized_sample[key]}")
    print("")


## Set Up Evaluation Metric

In [None]:
import evaluate
import numpy as np
from typing import Tuple

In [None]:
accuracy_metric = evaluate.load("accuracy")

def compute_accuracy(predictions_and_labels: Tuple[np.array, np.array]):
  """
  Computes the accuracy of a model by comparing the predictions and labels.
  """
  predictions, labels = predictions_and_labels

  # Get highest prediction probability of each prediction if predictions are probabilities
  if len(predictions.shape) >= 2:
    predictions = np.argmax(predictions, axis=1)

  return accuracy_metric.compute(predictions=predictions, references=labels)


## Set Up Model for Training

In [None]:
from transformers import AutoModelForSequenceClassification

# Setup model for fine-tuning with classification head (top layers of network)
model = AutoModelForSequenceClassification.from_pretrained(
    pretrained_model_name_or_path="roberta-large",
    num_labels=3,
    id2label=id2label,
    label2id=label2id
)

In [None]:
tokenized_dataset = tokenized_dataset.remove_columns(
    [col for col in tokenized_dataset["train"].column_names
     if col not in ["input_ids", "attention_mask", "labels"]]
)

In [None]:
def count_params(model):
    """
    Count the parameters of a PyTorch model.
    """
    trainable_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
    total_parameters = sum(p.numel() for p in model.parameters())

    return {"trainable_parameters": trainable_parameters, "total_parameters": total_parameters}

# Count # parameters of the model
count_params(model)

All parameters in the model are trainable!

### Create Directory for Saving Models

In [None]:
# Create model output directory
from pathlib import Path

# Create models directory
models_dir = Path("models")
models_dir.mkdir(exist_ok=True)

# Create model save name
model_save_name = "reading_level_text_classifier-roberta-large"

# Create model save path
model_save_dir = Path(models_dir, model_save_name)

model_save_dir

### Set Up Training Arguments

In [None]:
from transformers import TrainingArguments

print(f"[INFO] Saving model checkpoints to: {model_save_dir}")

# Create training arguments
training_args = TrainingArguments(
    output_dir=model_save_dir,

    learning_rate=2e-5,
    weight_decay=0.01,
    warmup_ratio=0.1,

    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2,
    num_train_epochs=5,

    fp16=True,

    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=3,
    load_best_model_at_end=True,

    no_cuda=False,

    logging_strategy="epoch",
    report_to="none",

    hub_private_repo=False,
)

### Set Up Trainer Instance

In [None]:
import os
import torch
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
from transformers import Trainer

# Setup Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    tokenizer=tokenizer,
    compute_metrics=compute_accuracy
)

## Training Text Classification Model

In [None]:
# Train a text classification model
results = trainer.train()

## Save Model for Later Use

In [None]:
# Save model
print(f"[INFO] Saving model to {model_save_dir}")
trainer.save_model(output_dir=model_save_dir)

## Inspect Model Training Metrics

In [None]:
# Get training history
trainer_history_all = trainer.state.log_history
trainer_history_metrics = trainer_history_all[:-1] # get everything except the training time metrics (we've seen these already)
trainer_history_training_time = trainer_history_all[-1] # this is the same value as results.metrics from above

# View the first 4 metrics from the training history
trainer_history_metrics[:4]

In [None]:
import pprint # import pretty print for nice printing of lists

# Extract training and evaluation metrics
trainer_history_training_set = []
trainer_history_eval_set = []

# Loop through metrics and filter for training and eval metrics
for item in trainer_history_metrics:
    item_keys = list(item.keys())
    # Check to see if "eval" is in the keys of the item
    if any("eval" in item for item in item_keys):
        trainer_history_eval_set.append(item)
    else:
        trainer_history_training_set.append(item)

# Show the first two items in each metric set
print(f"[INFO] First two items in training set:")
pprint.pprint(trainer_history_training_set[:2])

print(f"\n[INFO] First two items in evaluation set:")
pprint.pprint(trainer_history_eval_set[:2])

In [None]:
# Create pandas DataFrames for the training and evaluation metrics
trainer_history_training_df = pd.DataFrame(trainer_history_training_set)
trainer_history_eval_df = pd.DataFrame(trainer_history_eval_set)

trainer_history_training_df
trainer_history_eval_df

In [None]:
# Plot training and evaluation loss
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.plot(trainer_history_training_df["epoch"], trainer_history_training_df["loss"], label="Training loss")
plt.plot(trainer_history_eval_df["epoch"], trainer_history_eval_df["eval_loss"], label="Evaluation loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Text classification with DistilBert training and evaluation loss over time")
plt.legend()
plt.show()