# Imports

In [114]:
import os
import torch
import torch.optim as optim
import torch.nn as nn

In [115]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


# Dataset

In [116]:
email_data_path = '/content/drive/MyDrive/ColabNotebooks/Winter Semester 2023 24/NSI/Datasets-20231011/email/'
n = 50

In [117]:
def parse_email_data(folder_path, no_of_emails=n):
    emails = []
    labels = []  # Spam 1, Ham 0
    counter = 1

    # Iterate through ham emails
    ham_folder_path = os.path.join(folder_path, 'ham')
    for filename in os.listdir(ham_folder_path):
        if counter >= no_of_emails:
            break

        with open(os.path.join(ham_folder_path, filename), 'r', encoding='utf-8', errors='ignore') as file:
            lines = file.readlines()
            if len(lines) >= 3:
                body = ''.join(lines[2:]).strip()
                emails.append(body)
                labels.append(0)
                counter += 1

    counter = 1

    # Iterate through spam emails
    spam_folder_path = os.path.join(folder_path, 'spam')
    for filename in os.listdir(spam_folder_path):
        if counter >= no_of_emails:
            break

        with open(os.path.join(spam_folder_path, filename), 'r', encoding='utf-8', errors='ignore') as file:
            lines = file.readlines()
            if len(lines) >= 3:
                body = ''.join(lines[2:]).strip()
                emails.append(body)
                labels.append(1)
                counter += 1

    return emails, labels

In [118]:
train_emails, train_labels =  parse_email_data(os.path.join(email_data_path, 'train'), n)
# test_emails, test_labels =  parse_email_data(os.path.join(email_data_path, 'test'), n)
# val_emails, val_labels =  parse_email_data(os.path.join(email_data_path, 'val'), n)

# Model

In [119]:
def make_context_vector(context, word_to_ix):
    idxs = [word_to_ix[w] for w in context]
    return torch.tensor(idxs, dtype=torch.long)

In [125]:
EMDEDDING_DIM = 128

vocab = set(word for email in train_emails for word in email)
vocab_size = len(vocab)

word_to_ix = {word: ix for ix, word in enumerate(vocab)}
ix_to_word = {ix: word for ix, word in enumerate(vocab)}

In [126]:
# Commented out parts = actual CBOW model (taken from the Github link provided)
class CBOW(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(CBOW, self).__init__()

        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(embedding_dim, 256)
        self.activation_function1 = nn.ReLU()
        # self.linear2 = nn.Linear(128, vocab_size)
        # self.activation_function2 = nn.LogSoftmax(dim = -1)


    def forward(self, inputs):
        embeds = sum(self.embeddings(inputs)).view(1,-1)
        out = self.linear1(embeds)
        out = self.activation_function1(out)
        # out = self.linear2(out)
        # out = self.activation_function2(out)
        return out

    def get_word_embedding(self, word):
        word = torch.tensor([word_to_ix[word]])
        return self.embeddings(word).view(1, -1)

In [127]:
class SpamClassifier(nn.Module):
    def __init__(self, input_size):
        super(SpamClassifier, self).__init__()
        self.linear = nn.Linear(input_size, 1)
        self.activation_function = nn.Sigmoid()

    def forward(self, inputs):
        out = self.linear(inputs)
        out = self.activation_function(out)
        return out

In [128]:
# Instantiate the models
cbow_model = CBOW(vocab_size, EMDEDDING_DIM)

classifier_model = SpamClassifier(64)

loss_function = nn.BCELoss()
optimizer = torch.optim.SGD(classifier_model.parameters(), lr=0.001)

## For original CBOW

In [None]:
model = CBOW(vocab_size, EMDEDDING_DIM)

loss_function = nn.NLLLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

#TRAINING
for epoch in range(50):
    total_loss = 0

    for context, target in data:
        context_vector = make_context_vector(context, word_to_ix)

        log_probs = model(context_vector)

        total_loss += loss_function(log_probs, torch.tensor([word_to_ix[target]]))

    #optimize at the end of each epoch
    optimizer.zero_grad()
    total_loss.backward()
    optimizer.step()

# Training

In [129]:
# TRAINING
for epoch in range(2):
    total_loss = 0
    correct_predictions = 0
    total_samples = 0

    for i, (email, label) in enumerate(zip(train_emails, train_labels)):
        for j in range(2, len(email) - 2):
            context = [email[j - 2], email[j - 1],
                       email[j + 1], email[j + 2]]
            target = email[j]

            context_vector = make_context_vector(context, word_to_ix)

            # Use CBOW representation for classification
            cbow_representation = cbow_model(context_vector)
            output = classifier_model(cbow_representation)

            # labels (0 for 'ham', 1 for 'spam')
            label_tensor = torch.tensor([[label]], dtype=torch.float32)

            # Compute loss and backpropagate
            loss = loss_function(output, label_tensor)
            total_loss += loss.item()

            # Calculate accuracy
            predicted_label = 1 if output.item() > 0.5 else 0

            if (predicted_label == label):
              correct_predictions += 1

            total_samples += 1

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # Calculate accuracy
    accuracy = correct_predictions / total_samples

    print(f'Epoch {epoch + 1}, Accuracy: {accuracy * 100:.2f}%')

Epoch 1, Accuracy: 99.67%
Epoch 2, Accuracy: 99.37%
