# Load the dataset

In [None]:
!pip install -U "datasets==3.6.0"

In [None]:
### YOU MUST NOT CHANGE THIS CELL! ###

from datasets import load_dataset

full_dataset = load_dataset("skeskinen/TinyStories-GPT4", split="train")
full_dataset = full_dataset.remove_columns([c for c in full_dataset.column_names if c not in ["story", "features"]])
assert len(full_dataset) == 2745100

splits = full_dataset.train_test_split(test_size=10000, seed=42, shuffle=True)

train_dataset = splits["train"]
test_dataset  = splits["test"]

assert len(train_dataset) == 2735100
assert len(test_dataset)  == 10000

assert train_dataset[0]["story"][:33] == "One day, a little girl named Lily"
assert train_dataset[0]["features"] == ["Dialogue", "Conflict"]

In [None]:
# Here we print the first example of the train dataset

from pprint import pprint
pprint(train_dataset[0])

In [None]:
!pip install gdown
import gdown
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import random
import copy
import pandas as pd
import string

from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.nn import BCEWithLogitsLoss
from torchsummary import summary
from sklearn.metrics import f1_score, accuracy_score, precision_recall_curve, confusion_matrix, precision_recall_fscore_support
from sklearn.model_selection import train_test_split
from transformers import AutoModelForSequenceClassification, AutoTokenizer

In [None]:
# Fix the seed for the reproducibility
SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Extract stories and labels from the test dataset
stories_test, labels_test = test_dataset['story'], test_dataset['features']

In [None]:
TAGS = ["BadEnding", "Conflict", "Dialogue", "Foreshadowing", "MoralValue", "Twist"]

#we define the f1_per_tag function where the inputs of the function are already binary vectors

def f1_per_tag(true_tag_lists, pred_tag_lists):

    scores = f1_score(true_tag_lists, pred_tag_lists, average=None, zero_division=0)
    return {t: float(s) for t, s in zip(TAGS, scores)}

Convert every label to a binary vector:

We define a mapping from each label to a unique index, then create a function to convert lists of labels into binary vectors. This allows us to represent the presence or absence of each label as 1s and 0s for model training.

In [None]:
label_to_index = {label: i for i, label in enumerate(TAGS)} # Map each label to a unique index (BadEnding->0, Conflict->1,...)

def list_to_binary(label_list):
    vector = [0] * len(TAGS) # Initialize a binary vector of zeros with length equal to number of labels
    for label in label_list:
        label = label.strip() # Remove any extra whitespace around the label
        vector[label_to_index[label]] = 1 # Set the position corresponding to this label to 1
    return vector

def binary_to_list(vector):
  return [TAGS[i] for i, value in enumerate(vector) if value == 1]

labels_bin_test = [list_to_binary(label) for label in labels_test]

# Model A: pretrained model RoBERTa

Load the best model for RoBERTa obtained from the training procedure and a pre-trained tokenizer for further fine-tuning or inference.

In [None]:
file_id_roberta = 'https://drive.google.com/file/d/1bavSp2TvsToWA7AIjMI4NQkOTUwqJcNj/view?usp=sharing'
file_name_roberta = 'best_model_RoBERTa.pt'
best_model_roberta = gdown.download(file_id_roberta, file_name_roberta, fuzzy=True, quiet=True)

In [None]:
tokenizer = AutoTokenizer.from_pretrained("roberta-base")

Create the DataLoader

In [None]:
test_encodings = tokenizer(stories_test, truncation=True, padding='max_length', max_length=256, return_tensors="pt")

# Converts in tensors
test_labels_tensor = torch.tensor(labels_bin_test, dtype=torch.float)

# Create the TensorDataset
test_dataset_A = TensorDataset(test_encodings["input_ids"], test_encodings["attention_mask"], test_labels_tensor)

# DataLoader
test_dataloader_A = DataLoader(test_dataset_A, batch_size=32, shuffle=False)

In [None]:
# Evaluate function
def evaluate_model(model, loader, loss_fn, thresholds=None):
    model.to(device)
    model.eval() # Evaluation mode

    if thresholds is None:
        thresholds = [0.5] * len(TAGS)

    #Initialization
    loss = 0
    all_targets = []
    all_outputs = []

    with torch.no_grad():
        for batch in loader:
            input_ids = batch[0].to(device)      #move input data to the device
            attention_mask = batch[1].to(device)
            labels = batch[2].float().to(device)

            output = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = output.logits                   #logits are suitable for our loss function
            loss += loss_fn(logits, labels).item()  #update the loss

            all_targets.append(labels.cpu().numpy())
            all_outputs.append(logits.cpu().numpy())



    # Avarage loss
    loss /= len(loader)

    # Calculate F1 score per tag
    all_targets = np.concatenate(all_targets, axis=0)
    all_outputs = np.concatenate(all_outputs, axis=0)

    # Apply sigmoid and threshold to get binary predictions
    pred_probabilities = torch.sigmoid(torch.tensor(all_outputs)).numpy()
    pred_tags = (pred_probabilities > thresholds).astype(int)

    # Calculate F1 scores per label
    f1_scores = f1_per_tag(all_targets, pred_tags)
    current_f1_values = np.array([f1_scores[tag] for tag in TAGS])

    print(f"Evaluation loss: {loss:.4f}")
    print("F1 Score per tag:")
    for tag, score in f1_scores.items():
        print(f"  {tag}: {score:.4f}")

    return loss, current_f1_values, pred_probabilities, pred_tags, all_targets

In [None]:
model_roberta = AutoModelForSequenceClassification.from_pretrained(
    'roberta-base',
    num_labels=6,
    problem_type="multi_label_classification"
)

# Upload the weights
model_roberta.load_state_dict(torch.load(best_model_roberta, map_location=device))
model_roberta.to(device)

In [None]:
loss_fn = BCEWithLogitsLoss()  # loss function suitable for RoBERTa

thresholds = [0.84, 0.39, 0.46, 0.20, 0.47, 0.42]   # best thresholds computed using the precision_recall_curve in the training procedure

_, _, pred_probabilities_A, pred_tags_A , true_tags_A = evaluate_model(model_roberta, test_dataloader_A, loss_fn, thresholds=thresholds)

- Precision (Positive Predictive Value): measures how many of the predicted positive instances are actually correct
$$ Precision = \frac{TP}{TP +FP} $$

- Recall (Sensitivity, True Positive Rate): measures how many actual positive instances are correctly predicted:
$$ Recall = \frac{TP}{TP +FN} $$

- F1-Score: harmonic mean of precision and recall:
$$ F1 = 2 \times \frac{Precision \times Recall}{Precision + Recall}$$

In [None]:
precision_A, recall_A, _, _ = precision_recall_fscore_support(true_tags_A, pred_tags_A, average=None, zero_division=0 )

# I create a DataFrame pandas to visualize all the results
results_df = pd.DataFrame({
    "Tag": TAGS,
    "Precision": precision_A,
    "Recall": recall_A,
})

print(results_df)

- Accuracy:
measures how often the model's predictions are correct:

$$ Accuracy = \frac{Number \ of \ correct \ predictions}{Total \  number \ of \ predictions} $$

In [None]:
# Compute the accuracy

correct_predictions_A = {}
for i, tag in enumerate(TAGS):
    true = true_tags_A[:, i]
    pred = pred_tags_A[:, i]
    acc = accuracy_score(true, pred) * 100
    correct_predictions_A[tag] = acc

print("Accuracy for each tag:")
for tag, accuracy in correct_predictions_A.items():
    print(f"- {tag}: {accuracy:.2f}%")

Confusion matrix shows how many examples were correctly or incorrectly classified into each class and it is very useful because it underlines what kinds of errors the model is making and if the model favors one class.

In [None]:
# Confusion matrices
fig, axes = plt.subplots(2, 3, figsize=(12, 8))  # 6 labels → 2 rows x 3 columns
axes = axes.flatten()

for i, tag in enumerate(TAGS):
    cm = confusion_matrix(true_tags_A[:, i], pred_tags_A[:, i])
    ax = axes[i]
    im = ax.imshow(cm, cmap='Reds')

    ax.set_title(f"Confusion Matrix - {tag}")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
    ax.set_xticks([0, 1])
    ax.set_yticks([0, 1])
    plt.savefig(f"model_A_cm_{tag}.png")

    for r in range(2):
        for c in range(2):
            ax.text(c, r, cm[r, c], ha="center", va="center", color="white" if cm[r, c] > 0.5 * cm.max() else "black")

plt.tight_layout()
plt.show()

# Model B: CNN model

We upload the small dataset that we created before in order to be consistent with the creation of the vocabulary for our CNN model.

In [None]:
url='https://drive.google.com/file/d/1lTZhwFsV55W_O9oBX6mSi3MLh-lXTmJF/view?usp=drive_link'    # Small dataset 400.000
file_name = 'small_dataset_400.000.csv'
gdown.download(url, file_name, fuzzy=True, quiet=False)
df = pd.read_csv("small_dataset_400.000.csv")

In [None]:
# Convert stories and labels to a list
import ast

stories = df['stories'].tolist()
df_labels = df['labels'].apply(ast.literal_eval)
labels = df_labels.tolist()

In [None]:
labels_bin = [list_to_binary(label) for label in labels]

In [None]:
# Define a function that separate the words from the punctuation with an empty space
def simple_tokenizer(stories):

    punctuation = string.punctuation    # Construct a string for the punctuation
    all_tokens = []

    for text in stories:
        # Convert text to lowercase
        text = text.lower()

        # Iterate over each punctuation character
        for char in punctuation:
            # Add a space before and after that character in the text
            text = text.replace(char, f' {char} ')

        # Split the tokens when there is a space
        tokens = text.split()
        all_tokens.append(tokens)

    return all_tokens

In [None]:
# Apply the simple_tokenizer function to all the stories of the small_dataset:
stories_split = simple_tokenizer(stories)

Create the vocabulary

In [None]:
# Define a special token for padding and for unknown tokens
PAD_TOKEN = '[PAD]'
UNK_TOKEN = '[UNK]'

# Flatten the list of tokenized stories to get a single list of all tokens
all_tokens = [token for story in stories_split for token in story]

# Remove duplicates and sort the tokens
vocab_tokens = sorted(set(all_tokens))

V = [PAD_TOKEN, UNK_TOKEN] + vocab_tokens
token_to_index = {token: idx for idx, token in enumerate(V)}

vocab_size = len(V)

In [None]:
# Define a function to encode a story
def encode(x):
    return [token_to_index[token] if token in token_to_index else token_to_index[UNK_TOKEN] for token in x]

In [None]:
# How to choose max_length

lengths = [len(story) for story in stories_split]              # Compute the length of each story
max_length = int(np.percentile(lengths, 95))                    # Fix max_length s.t. 95% of the stories are shorter than max_length

print("Max length chosen:", max_length)

In [None]:
# Define a function to have all the stories of the same length
def truncate_and_pad(sequence):
    sequence = copy.copy(sequence)
    if len(sequence) > max_length:      # If the story is too long, take only the last max_length tokens => We give more weight to the end of the story rather than the beginning
        sequence = sequence[-max_length:]
    elif len(sequence) < max_length:    # If the story is too short add some PAD tokens (at the end of the story)
        sequence = sequence + [token_to_index[PAD_TOKEN]] * (max_length - len(sequence))

    return sequence

Create the DataLoader

In [None]:
# Preparation of small_test_dataset to evaluate the model:
test_stories_split = simple_tokenizer(stories_test)
test_stories_encoded = [encode(story) for story in test_stories_split]
test_stories_padded = [truncate_and_pad(story) for story in test_stories_encoded]

# Construct the TensorDataset
test_dataset_B = TensorDataset(torch.tensor(test_stories_padded, dtype = torch.long), torch.tensor(labels_bin_test, dtype = torch.float))

# Create DataLoader instances
batch_size = 32
test_dataloader_B = DataLoader(test_dataset_B, batch_size=batch_size, shuffle= False)

Define the evaluating procedure:

This function evaluates the best-performing model (previously saved during training) on the validation set.

In [None]:
# Evaluate function
def evaluate(model, loader, device):
    model.to(device)
    model.eval()    # Evaluation mode
    loss = 0

    loss_fn = nn.BCEWithLogitsLoss()
    all_targets = []
    all_outputs = []

    with torch.no_grad():
        for data, target in loader:
            data = data.to(device)      # Access input_ids
            target = target.to(device)  # Access labels

            output = model(data)
            loss += loss_fn(output, target).item()     #Update the loss

            all_targets.append(target.cpu().numpy())
            all_outputs.append(output.cpu().numpy())


    loss /= len(loader)

    # Calculate F1 score per tag
    all_targets = np.concatenate(all_targets)
    all_outputs = np.concatenate(all_outputs)

    # Apply sigmoid and threshold to get binary predictions
    pred_probabilities = torch.sigmoid(torch.tensor(all_outputs)).numpy()
    pred_tags = (pred_probabilities > 0.5).astype(int)

    # Calculates F1 scores per label
    f1_scores = f1_per_tag(all_targets, pred_tags)
    current_f1_values = np.array([f1_scores[tag] for tag in TAGS]) # np.array with F1 values


    print(f'Evaluation loss: {loss:.4f}')
    print("F1 Score per tag:")
    for tag, score in f1_scores.items():
        print(f"  {tag}: {score:.4f}")

    return loss, current_f1_values, pred_probabilities, pred_tags, all_targets

Load the best model for CNN model obtained from the training procedure.

In [None]:
file_id_CNN = 'https://drive.google.com/file/d/1yanvtYTnJEqD4f7EMwanwWX2CmXlWjbB/view?usp=sharing' # Best model with 400.000 training dataset
file_name_CNN = 'best_model_CNN.pt'
best_model_CNN = gdown.download(file_id_CNN, file_name_CNN, fuzzy=True, quiet=True)

We defined a PyTorch class called Transpose that we will use inside our neural network in order to swap two dimensions of a tensor. Note that, in PyTorch, nn.Embedding() returns a b×l×d tensor (where b is the batch size, l is the sequence length, and d is the embedding dimension), whereas nn.Conv1d() espects a b×d×l tensor. Therefore, if you use these layers in your 1-dimensional CNN, you need to swap the second and third dimension between the embedding layer and the first convolutional layer.

In [None]:
class Transpose(nn.Module):
    """Swap two tensor dimensions inside a Sequential."""
    def __init__(self, dim0: int, dim1: int):
        super().__init__()
        self.dim0, self.dim1 = dim0, dim1

    def forward(self, x):
        # x is returned as a **view**, so this is zero‑copy
        return x.transpose(self.dim0, self.dim1)

We obtained the best hyperparameters through the Grid Search after the training procedure.

In [None]:
# Best hyperparameters:
best_lr = 0.0005
best_embedding_dim = 300
best_drop_out = 0.4
best_filters_conv1 = 128
best_kernel_size_conv1 = 3
best_filters_conv2 = 64
best_kernel_size_conv2 = 3

In [None]:
# Definition of the model:
model_CNN = nn.Sequential(
    nn.Embedding(vocab_size, best_embedding_dim),
    Transpose(1, 2),
    nn.Conv1d(in_channels=best_embedding_dim, out_channels=best_filters_conv1, kernel_size=best_kernel_size_conv1, padding="same"),
    nn.ReLU(),
    nn.MaxPool1d(kernel_size=2),
    nn.Conv1d(in_channels=best_filters_conv1, out_channels=best_filters_conv2, kernel_size=best_kernel_size_conv2, padding="same"),
    nn.ReLU(),
    nn.AdaptiveMaxPool1d(1),
    nn.Flatten(),
    nn.Dropout(best_drop_out),
    nn.Linear(best_filters_conv2, 6)
    )

model_CNN.load_state_dict(torch.load(best_model_CNN, map_location=device))
model_CNN.to(device)
model_CNN.eval()

In [None]:
# Test of the model
_, _, pred_probs_B, pred_tags_B, true_tags_B = evaluate(model_CNN, test_dataloader_B, device)

In [None]:
precision_B, recall_B, _, _ = precision_recall_fscore_support(true_tags_B, pred_tags_B, average=None, zero_division=0 )

# I create a DataFrame pandas to visualize all the results
results_df = pd.DataFrame({
    "Tag": TAGS,
    "Precision": precision_B,
    "Recall": recall_B,
})

print(results_df)

In [None]:
# Compute the accuracy

correct_predictions_B = {}
for i, tag in enumerate(TAGS):
    true = true_tags_B[:, i]
    pred = pred_tags_B[:, i]
    acc = accuracy_score(true, pred) * 100
    correct_predictions_B[tag] = acc

print("Accuracy for each tag:")
for tag, accuracy in correct_predictions_B.items():
    print(f"- {tag}: {accuracy:.2f}%")

In [None]:
# Confusion matrices
fig, axes = plt.subplots(2, 3, figsize=(12, 8))   # 6 labels → 2 rows x 3 columns
axes = axes.flatten()

for i, tag in enumerate(TAGS):
    cm = confusion_matrix(true_tags_B[:, i], pred_tags_B[:, i])
    ax = axes[i]
    im = ax.imshow(cm, cmap='Reds')

    ax.set_title(f"Confusion Matrix - {tag}")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
    ax.set_xticks([0, 1])
    ax.set_yticks([0, 1])

    for r in range(2):
        for c in range(2):
            ax.text(c, r, cm[r, c], ha="center", va="center", color="white" if cm[r, c] > 0.5 * cm.max() else "black")
    plt.savefig(f"model_B_cm_{tag}.png")


plt.tight_layout()
plt.show()