In [None]:
!mkdir data
!unzip data.zip -d data

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import os
import torch.nn.functional as F

from sklearn.model_selection import train_test_split

def load_data(directory):
    categories = ['ham', 'spam']
    data = []
    labels = []
    # Iterate through each enron directory
    for enron_dir in [d for d in os.listdir(directory) if d.startswith('enron')]:
        for label, category in enumerate(categories):
            category_dir = os.path.join(directory, enron_dir, category)
            for filename in os.listdir(category_dir):
                file_path = os.path.join(category_dir, filename)
                with open(file_path, 'r', encoding='latin-1') as file:
                    text = file.read()
                    data.append(text)
                    labels.append(label)
    return data, labels


data_directory = 'data'
emails, labels = load_data(data_directory)

X_train, X_test, y_train, y_test = train_test_split(emails, labels, test_size=0.2, random_state=42)

In [None]:
from transformers import BertTokenizer, BertModel


tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')


encoded_train_data = []
encoded_test_data = []

for email_text in X_train:
    inputs = tokenizer(email_text, return_tensors="pt", max_length=32, truncation=True)

    with torch.no_grad():
        outputs = model(**inputs)

    email_representation = outputs.last_hidden_state[:, 0, :].numpy()

    encoded_train_data.append(email_representation)

encoded_train_data = np.array(encoded_train_data)

print("Shape of the encoded train data:", encoded_train_data.shape)


for email_text in X_test:
    inputs = tokenizer(email_text, return_tensors="pt", max_length=32, truncation=True)

    with torch.no_grad():
        outputs = model(**inputs)

    email_representation = outputs.last_hidden_state[:, 0, :].numpy()

    encoded_test_data.append(email_representation)

encoded_test_data = np.array(encoded_test_data)

print("Shape of the encoded test data:", encoded_test_data.shape)

In [15]:
train_da = torch.from_numpy(encoded_train_data).view(encoded_train_data.shape[0], -1)
test_da = torch.from_numpy(encoded_test_data).view(encoded_test_data.shape[0], -1)

In [17]:
batch_size = 32

train_dataset = torch.utils.data.TensorDataset(train_da, torch.tensor(y_train))
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = torch.utils.data.TensorDataset(test_da, torch.tensor(y_test))
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [49]:

class BayesianNeuralNetworkWithBN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout_rate):
        super(BayesianNeuralNetworkWithBN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.weights_in_hidden_mu = nn.Parameter(torch.randn(input_size, hidden_size))
        self.weights_in_hidden_log_var = nn.Parameter(torch.randn(input_size, hidden_size))
        self.weights_hidden_out_mu = nn.Parameter(torch.randn(hidden_size, output_size))
        self.weights_hidden_out_log_var = nn.Parameter(torch.randn(hidden_size, output_size))
        self.bias_hidden_mu = nn.Parameter(torch.randn(hidden_size))
        self.bias_hidden_log_var = nn.Parameter(torch.randn(hidden_size))
        self.bias_out_mu = nn.Parameter(torch.randn(output_size))
        self.bias_out_log_var = nn.Parameter(torch.randn(output_size))

        self.bn_input = nn.BatchNorm1d(input_size)
        self.bn_hidden = nn.BatchNorm1d(hidden_size)

        self.dropout = nn.Dropout(p=dropout_rate)

    def forward(self, x):
        x = self.bn_input(x)

        weights_in_hidden = self.weights_in_hidden_mu + torch.exp(0.5 * self.weights_in_hidden_log_var) * torch.randn_like(self.weights_in_hidden_log_var)
        weights_hidden_out = self.weights_hidden_out_mu + torch.exp(0.5 * self.weights_hidden_out_log_var) * torch.randn_like(self.weights_hidden_out_log_var)
        bias_hidden = self.bias_hidden_mu + torch.exp(0.5 * self.bias_hidden_log_var) * torch.randn_like(self.bias_hidden_log_var)
        bias_out = self.bias_out_mu + torch.exp(0.5 * self.bias_out_log_var) * torch.randn_like(self.bias_out_log_var)

        hidden = torch.tanh(torch.matmul(x, weights_in_hidden) + bias_hidden)

        hidden = self.bn_hidden(hidden)

        hidden = self.dropout(hidden)

        output = torch.matmul(hidden, weights_hidden_out) + bias_out
        return output, (weights_in_hidden, self.weights_in_hidden_log_var,
                        weights_hidden_out, self.weights_hidden_out_log_var,
                        bias_hidden, self.bias_hidden_log_var,
                        bias_out, self.bias_out_log_var)


# Hyperparameters
input_size = train_da.shape[1]
hidden_size = 64
output_size = 2  
learning_rate = 0.01
num_epochs = 20

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = BayesianNeuralNetworkWithBN(input_size, hidden_size, output_size, dropout_rate=0.5).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

def compute_kl_div(mu, log_var):
    return -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())


In [50]:
for epoch in range(num_epochs):
    total_loss = 0.0
    model.train() 

    for batch_inputs, batch_labels in train_loader:
        batch_inputs = batch_inputs.to(device)
        batch_labels = batch_labels.to(device)

        outputs, params = model(batch_inputs)

        weights_in_hidden, log_var_weights_in_hidden, weights_hidden_out, log_var_weights_hidden_out, bias_hidden, log_var_bias_hidden, bias_out, log_var_bias_out = params

        nll_loss = F.cross_entropy(outputs, batch_labels)

        kl_loss = compute_kl_div(weights_in_hidden, log_var_weights_in_hidden) + \
                  compute_kl_div(weights_hidden_out, log_var_weights_hidden_out) + \
                  compute_kl_div(bias_hidden, log_var_bias_hidden) + \
                  compute_kl_div(bias_out, log_var_bias_out)

        loss = nll_loss + 1e-4 * kl_loss
        total_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    average_loss = total_loss / len(train_loader)


    model.eval()
    with torch.no_grad():
        test_len = 0
        right = 0
        for batch_test, batch_test_label in test_loader:
            batch_test = batch_test.to(device)
            batch_test_label = batch_test_label.to(device)

            test_outputs, _ = model(batch_test)
            predictions = torch.argmax(test_outputs, dim=1)

            right += torch.sum(predictions == batch_test_label).item()
            test_len += batch_test_label.shape[0]

    accuracy = right/test_len

    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {average_loss}  ', "Accuracy on test data:", accuracy)

Epoch [1/20], Average Loss: 9.40502793933149   Accuracy on test data: 0.827846975088968
Epoch [2/20], Average Loss: 6.496457071066751   Accuracy on test data: 0.8852313167259787
Epoch [3/20], Average Loss: 4.9913874684986554   Accuracy on test data: 0.9068801897983393
Epoch [4/20], Average Loss: 3.82845545097885   Accuracy on test data: 0.9137010676156584
Epoch [5/20], Average Loss: 3.018462588920005   Accuracy on test data: 0.923932384341637
Epoch [6/20], Average Loss: 2.5432858523666364   Accuracy on test data: 0.9295670225385528
Epoch [7/20], Average Loss: 2.3033707068342615   Accuracy on test data: 0.9298635824436536
Epoch [8/20], Average Loss: 2.1967540368490797   Accuracy on test data: 0.9289739027283511
Epoch [9/20], Average Loss: 2.1646005980060616   Accuracy on test data: 0.9292704626334519
Epoch [10/20], Average Loss: 2.1464634034981507   Accuracy on test data: 0.9314946619217082
Epoch [11/20], Average Loss: 2.1490673734624193   Accuracy on test data: 0.9297153024911032
Epoch