# Spam classifier with MLP

In [None]:

import glob
import os
import re
import shutil
import tarfile
from urllib.request import urlretrieve
from tqdm import tqdm

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.utils import shuffle


class EmailCleaner(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        no_header=True,
        to_lowercase=True,
        url_to_word=True,
        num_to_word=True,
        remove_punc=True,
    ):
        self.no_header = no_header
        self.to_lowercase = to_lowercase
        self.url_to_word = url_to_word
        self.num_to_word = num_to_word
        self.remove_punc = remove_punc

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        cleaned_emails = []
        for email in X:
            if self.no_header:
                email = self.remove_header(email)
            if self.to_lowercase:
                email = email.lower()

            words = email.split()
            if self.url_to_word:
                words = self.convert_url_to_word(words)
            if self.num_to_word:
                words = self.convert_num_to_word(words)
            email = " ".join(words)
            if self.remove_punc:
                email = "".join([c for c in email if c.isalnum() or c.isspace()])
            cleaned_emails.append(email)
        return cleaned_emails

    @staticmethod
    def remove_header(email):
        return email[email.index("\n\n") :]

    @staticmethod
    def is_url(string):
        return re.match(
            "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+",
            string,
        )

    @staticmethod
    def convert_url_to_word(words):
        return ["URL" if EmailCleaner.is_url(word) else word for word in words]

    @staticmethod
    def convert_num_to_word(words):
        return ["NUM" if word.isdigit() else word for word in words]


def download_and_extract(url, dataset_dir="data"):
    tar_dir = os.path.join(dataset_dir, "tar")
    os.makedirs(tar_dir, exist_ok=True)
    filename = url.rsplit("/", 1)[-1]
    tarpath = os.path.join(tar_dir, filename)

    class DownloadProgressBar(tqdm):
        def update_to(self, b=1, bsize=1, tsize=None):
            if tsize is not None:
                self.total = tsize
            self.update(b * bsize - self.n)

    if not os.path.exists(tarpath):
        print(f"Downloading {filename}...")
        with DownloadProgressBar(
            unit="B", unit_scale=True, miniters=1, desc=url.split("/")[-1]
        ) as t:
            urlretrieve(url, tarpath, reporthook=t.update_to)
        print("\nDownload completed.")
    else:
        print(f"{filename} already downloaded.")

    print("Extracting files...")
    with tarfile.open(tarpath) as tar:
        dirname = os.path.join(dataset_dir, tar.getmembers()[0].name.split("/")[0])
        if os.path.isdir(dirname):
            shutil.rmtree(dirname)
        tar.extractall(path=dataset_dir)
    print("Extraction completed.")

    cmds_path = os.path.join(dirname, "cmds")
    if os.path.isfile(cmds_path):
        os.remove(cmds_path)
    return dirname


def load_dataset(dirpath):
    files = []
    filepaths = glob.glob(os.path.join(dirpath, "*"))
    for path in filepaths:
        with open(path, "rb") as f:
            content = f.read().decode("utf-8", errors="ignore")
            files.append(content)
    return files


def download_datasets():
    spam_url = "https://github.com/comp3314/hw-data/releases/download/hw3/20050311_spam_2.tar.bz2"
    easy_ham_url = "https://github.com/comp3314/hw-data/releases/download/hw3/20030228_easy_ham_2.tar.bz2"
    hard_ham_url = "https://github.com/comp3314/hw-data/releases/download/hw3/20030228_hard_ham.tar.bz2"

    spam = load_dataset(download_and_extract(spam_url))
    easy_ham = load_dataset(download_and_extract(easy_ham_url))
    hard_ham = load_dataset(download_and_extract(hard_ham_url))

    X = spam + easy_ham + hard_ham
    y = np.concatenate((np.ones(len(spam)), np.zeros(len(easy_ham) + len(hard_ham))))
    return X, y

In [None]:


# Download and prepare the dataset
print("Starting dataset download and preparation...")
X, y = download_datasets()
print("Dataset preparation completed.")

# Shuffle and split the dataset
X, y = shuffle(X, y, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)
print(f"The number of training samples: {len(X_train)}")
print(f"The number of test samples: {len(X_test)}")

# Preprocess the data
print("Starting preprocessing...")
email_cleaner = EmailCleaner()
count_vectorizer = CountVectorizer()
prepare_pipeline = Pipeline(
    [
        ("email_cleaner", email_cleaner),
        ("count_vectorizer", count_vectorizer),
    ]
)
X_all = X_train + X_test
prepare_pipeline.fit(X_all)
X_all_transformed = prepare_pipeline.transform(X_all)
num_train = len(X_train)
X_train = X_all_transformed[:num_train]
X_test = X_all_transformed[num_train:]
print("Preprocessing completed.")

print(X_train.shape)
print(X_test.shape)

Starting dataset download and preparation...
20050311_spam_2.tar.bz2 already downloaded.
Extracting files...
Extraction completed.
20030228_easy_ham_2.tar.bz2 already downloaded.
Extracting files...
Extraction completed.
20030228_hard_ham.tar.bz2 already downloaded.
Extracting files...
Extraction completed.
Dataset preparation completed.
The number of training samples: 2436
The number of test samples: 610
Starting preprocessing...
Preprocessing completed.
(2436, 108735)
(610, 108735)


## Train spam classifiers with MLP 


In [3]:
# === Your code here ===
# ======================
from sklearn.neural_network import MLPClassifier

configurations = [(10,), (20,), (40,), (5, 5), (10, 10), (20, 20)]

models = {}

for config in configurations:
    model = MLPClassifier(hidden_layer_sizes=config)
    model.fit(X_train, y_train)
    models[config] = model
    

## Evaluating classifiers



In [8]:
# === Your code here ===
# ======================
from sklearn.metrics import accuracy_score, precision_score, recall_score

for config, model in models.items():
    y_pred = model.predict(X_test)
    
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)

    print(f"Configuration: {config}")
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print()

Configuration: (10,)
Accuracy: 0.9721311475409836
Precision: 0.9781818181818182
Recall: 0.9607142857142857

Configuration: (20,)
Accuracy: 0.9737704918032787
Precision: 0.9748201438848921
Recall: 0.9678571428571429

Configuration: (40,)
Accuracy: 0.9737704918032787
Precision: 0.9714285714285714
Recall: 0.9714285714285714

Configuration: (5, 5)
Accuracy: 0.9672131147540983
Precision: 0.9814814814814815
Recall: 0.9464285714285714

Configuration: (10, 10)
Accuracy: 0.9721311475409836
Precision: 0.9713261648745519
Recall: 0.9678571428571429

Configuration: (20, 20)
Accuracy: 0.9704918032786886
Precision: 0.9781021897810219
Recall: 0.9571428571428572



## Step 4: Ensemble of classifiers



In [None]:
# === Your code here ===

# ======================
from sklearn.ensemble import VotingClassifier

ensemble_configs = [((40,), models[(40,)]), ((5, 5), models[(5, 5)]), ((20,), models[(20,)])]
ensemble_model = VotingClassifier(estimators=ensemble_configs)

ensemble_model.fit(X_train, y_train)

y_pred = ensemble_model.predict(X_test)

accuracy_ensemble = accuracy_score(y_test, y_pred)
precision_ensemble = precision_score(y_test, y_pred)
recall_ensemble = recall_score(y_test, y_pred)

print("Ensemble classifier performance:")
print(f"Accuracy: {accuracy_ensemble}")
print(f"Precision: {precision_ensemble}")
print(f"Recall: {recall_ensemble}")

Ensemble classifier performance:
Accuracy: 0.9737704918032787
Precision: 0.9714285714285714
Recall: 0.9714285714285714
