In [None]:
from google.colab import files
files.upload()  # Upload kaggle.json


In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [None]:
!pip install -q kaggle


In [None]:
!kaggle datasets download -d abdallahwagih/spam-emails


In [None]:
!unzip spam-emails.zip -d spam_emails


In [None]:
import pandas as pd

df = pd.read_csv("spam_emails/spam.csv")
df.head()


In [None]:
df.shape

In [None]:
print(df['Category'].value_counts())

In [None]:
print(df[df['Category']=='spam'].shape)

In [None]:
df[df['Category']=='ham'].sample(5)

In [None]:
df['Category'] = df['Category'].map({'spam':0, 'ham':1})
df[:5]

In [None]:
import numpy as np
shuffled_df = df.sample(frac=1).reset_index(drop=True)
shuffled_df[:5]

In [None]:
shuffled_df.shape

In [None]:
train_percent, test_percent, val_percent = 0.75, 0.2, 0.05

In [None]:
train_df = shuffled_df[:int(np.ceil(train_percent*shuffled_df.shape[0]))]
test_df = shuffled_df[int(np.ceil(train_percent*shuffled_df.shape[0])):(int(np.ceil(train_percent*shuffled_df.shape[0]))+int(np.ceil(test_percent*shuffled_df.shape[0])))]
val_df = shuffled_df[(int(np.ceil(train_percent*shuffled_df.shape[0]))+int(np.ceil(test_percent*shuffled_df.shape[0]))):]
train_df[:5], test_df[:5], val_df[:5]

In [None]:
train_df.shape, test_df.shape, val_df.shape

In [None]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader


class Classification(Dataset):
    def __init__(self, data_frame, tokenizer, max_length=None, pad_token_id=50256):
        self.data = data_frame

        # Pre-tokenize msgs
        self.encoded_msgs = [tokenizer.encode(msg) for msg in self.data["Message"]]

        if max_length is None:
            self.max_length = self._longest_encoded_length()
        else:
            self.max_length = max_length
            # truncate messages if they are longer than max_length
            self.encoded_msgs = [
                encoded_msg[:self.max_length]
                for encoded_msg in self.encoded_msgs]

        # pad all sequences (tokenized msg) in self.encoded_msgs to the same length: self.max_length.
        self.encoded_msgs = [
            encoded_msg + [pad_token_id] * (self.max_length - len(encoded_msg))
            for encoded_msg in self.encoded_msgs]

    def __getitem__(self, index):
        encoded = self.encoded_msgs[index]
        label = self.data.iloc[index]["Category"]
        return (
            torch.tensor(encoded, dtype=torch.long),
            torch.tensor(label, dtype=torch.long)
        )

    def __len__(self):
        return len(self.data)

    def _longest_encoded_length(self):
        return max(len(encoded_msg) for encoded_msg in self.encoded_msgs)


In [None]:
import tiktoken
tokenizer = tiktoken.get_encoding("gpt2")

In [None]:
#train_df.to_csv("train.csv", index=None)
train_dataset = Classification(train_df, max_length=None, tokenizer=tokenizer)
print(train_dataset.max_length)
#test_df.to_csv("test.csv", index=None)
test_dataset = Classification(test_df, max_length=train_dataset.max_length, tokenizer=tokenizer)


In [None]:
batch_size = 6
num_workers = 2
train_loader = DataLoader(dataset=train_dataset,batch_size=batch_size,shuffle=True,num_workers=num_workers)
test_loader = DataLoader(dataset=test_dataset,batch_size=batch_size,shuffle=False,num_workers=num_workers)


In [None]:
from transformers import GPT2Model, GPT2Tokenizer

# Load GPT-2 small (124M)
model = GPT2Model.from_pretrained("gpt2")
print(model)

In [None]:
for param in model.parameters():
    param.requires_grad = False

In [None]:
num_classes = 2
model.out_head = torch.nn.Linear(in_features=768, out_features=num_classes)
print(model)
#for block in model.h:
#    block.attn.attn_dropout.p = 0.3     # Attention dropout
#    block.attn.resid_dropout.p = 0.3    # Residual dropout
#    block.mlp.dropout.p = 0.3           # MLP dropout

In [None]:
# Unfreeze parameters of the last transformer block
for param in model.h[-1].parameters():  # `h` is the list of transformer blocks
    param.requires_grad = True

# Unfreeze parameters of the final layer normalization
for param in model.ln_f.parameters():  # `ln_f` is the final layer norm
    param.requires_grad = True

In [None]:
inputs = tokenizer.encode("Good day")
inputs = torch.tensor(inputs).unsqueeze(0)
print("Inputs:", inputs)
print("Inputs dimensions:", inputs.shape) # shape: (batch_size, num_tokens)

In [None]:
with torch.no_grad():
    outputs = model(inputs)

hidden_states = outputs[0]  # shape: [1, 2, 768]
logits = model.out_head(hidden_states)  # shape: [1, 2, 2]
probas = torch.softmax(logits, dim=-1)
print("Logits:\n", logits.shape)
print('probas', probas)

In [None]:
probas = torch.softmax(model.out_head(hidden_states)[:, -1, :], dim=-1)
label = torch.argmax(probas)
print("Class label:", label.item())

In [None]:
def calc_accuracy_loader(data_loader, model, num_batches=None):
    model.eval()
    correct_predictions, num_examples = 0, 0

    if num_batches is None:
        num_batches = len(data_loader)
    else:
        num_batches = min(num_batches, len(data_loader))
    for i, (input_batch, target_batch) in enumerate(data_loader):
        if i < num_batches:
            with torch.no_grad():
                outputs = model(input_batch)  # Logits of last output token
            logits = model.out_head(outputs[0])  # shape: [batch_size, seq_len, 2]
            logits_last = logits[:, -1, :]       # shape: [batch_size, 2]
            probas = torch.softmax(logits_last, dim=-1)
            predicted_labels = torch.argmax(probas, dim=1)  # shape: [batch_size]
           #print(torch.argmax(probas), predicted_labels, target_batch)

            correct_predictions += (predicted_labels == target_batch).sum().item()
            num_examples += target_batch.size(0)
        else:
            break
    return correct_predictions / num_examples

In [None]:
# without finetuning the model
train_accuracy = calc_accuracy_loader(train_loader, model, num_batches=10)
test_accuracy = calc_accuracy_loader(test_loader, model, num_batches=10)

print(f"Training accuracy: {train_accuracy*100:.2f}%")
print(f"Test accuracy: {test_accuracy*100:.2f}%")

In [None]:
def train_step(model, train_loader, optimizer):
    train_losses, train_accs = 0, 0

    # Main training loop
    model.train()  # Set model to training mode
    for input_batch, target_batch in train_loader:
        y_pred = model(input_batch)
        logits = model.out_head(y_pred[0])[:, -1, :]
        loss = torch.nn.functional.cross_entropy(logits, target_batch)
        train_losses+=loss.item()
        optimizer.zero_grad() # Reset loss gradients from previous batch iteration
        loss.backward() # Calculate loss gradients
        optimizer.step() # Update model weights using loss gradients
        probas = torch.softmax(logits, dim=-1)
        predicted_labels = torch.argmax(probas, dim=1)
        train_accs += (predicted_labels == target_batch).sum().item()/len(predicted_labels)
    train_loss = train_losses / len(train_loader)
    train_acc = train_accs / len(train_loader)
    return train_loss, train_acc

In [None]:
def test_step(model, test_loader):
    # Put model in eval mode
    model.eval()

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0

    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(test_loader):
            # Send data to target device

            # 1. Forward pass
            test_pred_logits = model(X)
            logits = model.out_head(test_pred_logits[0])[:, -1, :]

            # 2. Calculate and accumulate loss
            loss = torch.nn.functional.cross_entropy(logits, y)
            test_loss+=loss.item()

            # Calculate and accumulate accuracy
            probas = torch.softmax(logits, dim=-1)
            test_pred_labels = torch.argmax(probas, dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(test_loader)
    test_acc = test_acc / len(test_loader)
    return test_loss, test_acc


In [None]:
import time
from tqdm.auto import tqdm
start_time = time.time()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5, weight_decay=0.1)

num_epochs = 5

best_loss = float('inf')
epochs_no_improve = 0
patience = 2

results = {"train_loss": [],
    "train_acc": [],
    "test_loss": [],
    "test_acc": []
}

for epoch in tqdm(range(num_epochs)):
    train_loss, train_acc = train_step(model, train_loader, optimizer)
    test_loss, test_acc = test_step(model,test_loader)

    print(
        f"Epoch: {epoch+1} | "
        f"train_loss: {train_loss:.4f} | "
        f"train_acc: {train_acc:.4f} | "
        f"test_loss: {test_loss:.4f} | "
        f"test_acc: {test_acc:.4f}"
    )


    results["train_loss"].append(train_loss.item() if isinstance(train_loss, torch.Tensor) else train_loss)
    results["train_acc"].append(train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc)
    results["test_loss"].append(test_loss.item() if isinstance(test_loss, torch.Tensor) else test_loss)
    results["test_acc"].append(test_acc.item() if isinstance(test_acc, torch.Tensor) else test_acc)

    if test_loss < best_loss:
        best_loss = test_loss
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1

    if epochs_no_improve >= patience:
        print(f"\nEarly stopping triggered after {epoch+1} epochs.")
        break

end_time = time.time()
execution_time_minutes = (end_time - start_time) / 60
print(f"Training completed in {execution_time_minutes:.2f} minutes.")

In [None]:
from typing import Tuple, Dict, List
import matplotlib.pyplot as plt
def plot_loss_curves(results):

    # Get the loss values of the results dictionary (training and test)
    loss = results['train_loss']
    test_loss = results['test_loss']

    # Get the accuracy values of the results dictionary (training and test)
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    # Figure out how many epochs there were
    epochs = range(len(results['train_loss']))

    # Setup a plot
    plt.figure(figsize=(10, 5))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend();


plot_loss_curves(results)

In [None]:
def classify_review(text, model, tokenizer, max_length=None, pad_token_id=50256):
    model.eval()

    # Prepare inputs to the model
    input_ids = tokenizer.encode(text)
    supported_context_length = model.wpe.weight.shape[0]
    # Truncate sequences if they too long
    input_ids = input_ids[:min(max_length, supported_context_length)]

    # Pad sequences to the longest sequence
    input_ids += [pad_token_id] * (max_length - len(input_ids))
    input_tensor = torch.tensor(input_ids).unsqueeze(0) # add batch dimension

    # Model inference
    with torch.no_grad():
        outputs = model(input_tensor)  # Logits of last output token
    logits = model.out_head(outputs[0])  # shape: [batch_size, seq_len, 2]
    logits_last = logits[:, -1, :]
    predicted_label = torch.argmax(logits_last, dim=-1).item()

    # Return the classified result
    return predicted_label#"spam" if predicted_label == 0 else "not spam"

In [None]:
y_true_list =[]
y_pred_list =[]
for ind, class_msg in val_df.iterrows():
    y_true_list.append(class_msg['Category'])
    y_pred_list.append(classify_review(class_msg['Message'], model, tokenizer, max_length=train_dataset.max_length))
    #print(class_msg['Category'],classify_review(class_msg['Message'], model, tokenizer, max_length=train_dataset.max_length))
from sklearn.metrics import confusion_matrix
import seaborn as sns
# Get the confusion matrix
cm = confusion_matrix(y_true_list, y_pred_list)
target_names = ['Spam', 'Not Spam']

# Plot the confusion matrix
plt.figure(figsize=(7, 6))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=target_names, yticklabels=target_names)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

In [None]:
text_1 = (
    "You are a winner you have been specially"
    " selected to receive $1000 cash or a $2000 award."
)

print(classify_review(
    text_1, model, tokenizer, max_length=train_dataset.max_length
))

In [None]:
text_2 = (
    "Hey, just wanted to check if we're still on"
    " for dinner tonight? Let me know!"
)

print(classify_review(
    text_2, model, tokenizer, max_length=train_dataset.max_length
))

In [None]:
torch.save(model.state_dict(), "spam_classifier.pth")

In [None]:
# Save model
model_save_path = "./custom_gpt2_classifier"
torch.save(model.state_dict(), f"{model_save_path}/pytorch_model.bin")

In [None]:

model_state_dict = torch.load("spam_classifier.pth", weights_only=True)
model.load_state_dict(model_state_dict)