Extract the "20050311_spam_2" and the "20021010_easy_ham" folders from Spamassassin before running this code. Make sure directory names match

In [92]:
import os
import numpy as np

# Load dataset into an array and store their labels
# First, the spam emails, all labelled 1
dataset_dir = "20050311_spam_2"
emails=[]
labels=[]

for root, dirs, files in os.walk(dataset_dir):
    for filename in files:
        file_path = os.path.join(root, filename)
        if os.path.isfile(file_path):
            with open(file_path, "r", encoding="latin1") as file:
                email_content = file.read()
                emails=np.append(emails,email_content)
                labels=np.append(labels,1)

In [None]:
#Next the ham emails, all labelled 0
dataset_dir = "20021010_easy_ham"

for root, dirs, files in os.walk(dataset_dir):
    for filename in files:
        file_path = os.path.join(root, filename)
        if os.path.isfile(file_path):
            with open(file_path, "r", encoding="latin1") as file:
                email_content = file.read()
                emails=np.append(emails,email_content)
                labels=np.append(labels,0)

In [220]:
# Function to remove unwanted html tags from emails
import re

def remove_html_tags(email):
    clean_email = re.sub(r'<[^>]+>', '', email)
    return clean_email

In [97]:
# Function to build a word dictionary out of all the emails
# This dictionary will contains all the words used and their frequencies
from collections import Counter

def build_dictionary(emails):
    word_counter = Counter()
    for email in emails:
        email = remove_html_tags(email)
        words = email.split()
        word_counter.update(words)
    return word_counter

In [98]:
# Function to sort the top 10000 most frequently used words into an array
def sort_dictionary(emails):
    word_counter = build_dictionary(emails)
    sorted_dict = dict(sorted(word_counter.items(), key=lambda item: item[1], reverse=True))
    sorted_dict = dict(list(sorted_dict.items())[0:10000])
    return sorted_dict

In [99]:
# Function to convert the email into a datapoint
def make_data(email, sorted_dict):
    words = email.split()
    X = np.zeros(10000)
    i = 0
    for key in sorted_dict:
        if key in words:
            X[i] = 1
        i = i+1
    return X

In [235]:
# Function to transform our whole dataset into binary arrays
def data(emails):
    sorted_dict = sort_dictionary(emails)
    X=np.empty((len(emails),10000))
    for i in range(len(emails)):
        X[i] = make_data(emails[i],sorted_dict)
    return X

X = data(emails)

In [236]:
# SVM modelling function
# Using Library function SVC()
from sklearn.svm import SVC

def svm(X_train, Y_train, X_test, Y_test, c):
    svm_classifier = SVC(kernel='linear', C=c) # Give c as hyper parameter
    svm_classifier.fit(X_train, Y_train)       # Fit a classifier model
    Y_pred = svm_classifier.predict(X_test)    # Prdict labels on the test data
    error = 0
    for i in range(Y_pred.shape[0]):
        if Y_test[i] != Y_pred[i] :
            error += 1
    accuracy = 1 - error/Y_test.shape[0]       # Calculate accuracy
    return Y_pred, accuracy, svm_classifier

In [237]:
# Function to split dataset into test and training data and running SVM
def run_svm(X,labels):
    ind = np.random.permutation(X.shape[0])  # Randomize the dataset
    X_new = X[ind]
    Y_new = [labels[i] for i in ind]

    index = int (0.8*X.shape[0])             # Do an 80:20 split on the dataset
    X_train = X_new[:index, :]
    X_test = X_new[index:, :]
    Y_train = np.array(Y_new[:index])
    Y_test = np.array(Y_new[index:])

    c=1e9 # Change depending on problem
    Y_pred, accuracy, svm_classifier = svm(X_train, Y_train, X_test, Y_test, c)
    return Y_pred, accuracy, svm_classifier

Y_pred_svm , accuracy_svm, svm_classifier = run_svm(X,labels)

In [238]:
# Function to do the Naive-Bayes' Algorithm
def naive_bayes(X_train, Y_train, X_test, Y_test):
    n_spam = np.sum(Y_train)             # Calculate number of spam mails
    n_ham = Y_train.shape[0]-n_spam
    p_hat = n_spam/(n_ham+n_spam)
    p = np.zeros((2,10000))
    for j in range(10000):
        for i in range(X_train.shape[0]):
            p[int(Y_train[i])][j] += X_train[i][j]    # Update p values
    p[0] = p[0]/n_ham
    p[1] = p[1]/n_spam

    Y_pred=np.zeros(Y_test.shape[0])
    for i in range(X_test.shape[0]):
        p0=1-p_hat
        p1=p_hat
        for j in range(10000):
            p0 = p0*pow(p[0][j],X_test[i][j])*pow(1-p[0][j],1-X_test[i][j])    # P(y=0|x)
            p1 = p1*pow(p[1][j],X_test[i][j])*pow(1-p[1][j],1-X_test[i][j])    # P(y=1|x)
        if p1 > p0:
            Y_pred[i] = 1  # Predict based on probability values
    
    error = 0
    for i in range(Y_test.shape[0]):
        if Y_test[i] != Y_pred[i]:
            error += 1
    accuracy = 1 - error/Y_test.shape[0]    # Calculate accuracy
    
    return p, p_hat, Y_pred, accuracy

In [240]:
# Split the datset and feed it to the Naive-Bayes' function
def run_naive_bayes(X,labels):
    ind = np.random.permutation(X.shape[0])
    X_new = X[ind]
    Y_new = [labels[i] for i in ind]

    index = int (0.8*X.shape[0])
    X_train = X_new[:index, :]
    X_test = X_new[index:, :]
    Y_train = np.array(Y_new[:index])
    Y_test = np.array(Y_new[index:])

    p, p_hat, Y_pred, accuracy=naive_bayes(X_train, Y_train, X_test, Y_test)
    return p, p_hat, Y_pred, accuracy

p, p_hat, Y_pred_NB, accuracy_NB = run_naive_bayes(X,labels)

In [241]:
# Function to carry out logistic regression
def logistic_gd(X_train, Y_train, X_test, Y_test):
    w = np.zeros(10000)  # Initialize w
    eta = 1e-6           # Step size
    error = 1
    threshold  = 0.0005  # Set a threshold

    while error>threshold:
        grad = 0
        for i in range(X_train.shape[0]):
            grad = grad + X_train[i]*(Y_train[i]-(1/(1+np.exp(-(w.T @ X_train[i]))))) # Find the gradient
        grad = grad*eta
        error = np.linalg.norm(grad)   # Update error value
        w = w + grad   # Update w value
    
    Y_pred = np.zeros(Y_test.shape[0])
    for i in range(X_test.shape[0]):
        pred = 1/(1+np.exp(-(w.T @ X_test[i])))   # Find sigmoid function value
        if pred > 0.5:
            Y_pred[i] = 1  # Predict based on sigmoid function value

    err = 0
    for i in range(X_test.shape[0]):
        if Y_pred[i] != Y_test[i]:
            err = err+1
    accuracy = 1 - err/Y_test.shape[0]   # Calculate accuracy
    return w, accuracy

In [242]:
# Split dataset and feed to the logistic regressor function
def run_logistic(X,labels):
    ind = np.random.permutation(X.shape[0])
    X_new = X[ind]
    Y_new = [labels[i] for i in ind]

    index = int (0.8*X.shape[0])
    X_train = X_new[:index, :]
    X_test = X_new[index:, :]
    Y_train = np.array(Y_new[:index])
    Y_test = np.array(Y_new[index:])

    w,accuracy=logistic_gd(X_train, Y_train, X_test, Y_test)
    return w,accuracy

w_logistic, accuracy_logistic = run_logistic(X,labels)

In [256]:
# Function to run on test emails
def run_test_emails():
    dataset_dir = "test"  # Name the directory "test"
    test_emails=[]
    
    for root, dirs, files in os.walk(dataset_dir):
        for filename in files:
            file_path = os.path.join(root, filename)
            if os.path.isfile(file_path):
                with open(file_path, "r", encoding="latin1") as file:
                    email_content = file.read()
                    test_emails=np.append(test_emails,email_content)

    test_X = data(test_emails)
    Y_pred_test_svm = svm_classifier.predict(test_X)  # SVM predictor results


    Y_pred_test_NB = np.zeros(test_X.shape[0])   # Naive-Bayes' predictor
    for i in range(test_X.shape[0]):
        p0=1-p_hat
        p1=p_hat
        for j in range(10000):
            p0 = p0*pow(p[0][j],test_X[i][j])*pow(1-p[0][j],1-test_X[i][j]) 
            p1 = p1*pow(p[1][j],test_X[i][j])*pow(1-p[1][j],1-test_X[i][j]) 
        if p1 > p0:
            Y_pred_test_NB[i] = 1 

    Y_pred_test_logistic = np.zeros(test_X.shape[0])  # Logistic Regression predictor
    for i in range(test_X.shape[0]):
        pred = 1/(1+np.exp(-(w_logistic.T @ test_X[i]))) 
        if pred > 0.5:
            Y_pred_test_logistic[i] = 1

    return Y_pred_test_svm, Y_pred_test_NB, Y_pred_test_logistic # Return all 3 prediction values

In [None]:
# After loading the test data, run all codes above and finally print the solution by running this snippet
Y_pred_test_svm, Y_pred_test_NB, Y_pred_test_logistic = run_test_emails()
print(Y_pred_test_svm)
print(Y_pred_test_NB)
print(Y_pred_test_logistic)