In [1]:
import urllib.request
import tarfile
from pathlib import Path
import os

def fetch_all_spam_data():
    spam_root = "http://spamassassin.apache.org/old/publiccorpus/"
    files = {
        "non_spam": [
            "20021010_easy_ham.tar.bz2",
            "20030228_easy_ham.tar.bz2",
            "20030228_easy_ham_2.tar.bz2",
            "20021010_hard_ham.tar.bz2",
            "20030228_hard_ham.tar.bz2",
        ],
        "spam": [
            "20021010_spam.tar.bz2",
            "20030228_spam.tar.bz2",
            "20030228_spam_2.tar.bz2",
            "20050311_spam_2.tar.bz2",
        ],
    }

    user_home = Path(os.environ["USERPROFILE"])
    base_path = user_home / "SpamFilter" / "Data" / "Raw"

    for category, filenames in files.items():
        target_dir = base_path / ("Non-Spam" if category == "non_spam" else "Spam")
        target_dir.mkdir(parents=True, exist_ok=True)

        for filename in filenames:
            file_url = spam_root + filename
            local_tar_path = target_dir / filename

            if not local_tar_path.exists():
                urllib.request.urlretrieve(file_url, local_tar_path)

            with tarfile.open(local_tar_path, "r:bz2") as tar:
                for member in tar.getmembers():
                    if member.isfile():
                        member.name = Path(member.name).name
                        tar.extract(member, path=target_dir)

    return base_path / "Non-Spam", base_path / "Spam"


In [2]:
ham_dir, spam_dir = fetch_all_spam_data()

  tar.extract(member, path=target_dir)


In [3]:
ham_filenames = [f for f in sorted(ham_dir.rglob("*")) if f.is_file() and len(f.name) > 20]
spam_filenames = [f for f in sorted(spam_dir.rglob("*"))if f.is_file() and len(f.name) > 20]

In [23]:
len(ham_filenames)

6956

In [24]:
len(spam_filenames)

2402

In [25]:
import email
import email.policy

def load_email(filepath):
    with open(filepath, "rb") as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)

In [26]:
ham_emails = [load_email(filepath) for filepath in ham_filenames]
spam_emails = [load_email(filepath) for filepath in spam_filenames]

In [27]:
msg = spam_emails[6]
def get_email_body(msg):
    charset = msg.get_content_charset()
    if not charset or charset.lower() == "default":
        charset = "utf-8"
    try:
        return msg.get_payload(decode=True).decode(charset, errors="ignore").strip()
    except Exception as e:
        return f"[Error: {e}]"

print(get_email_body(spam_emails[6]))

<html>
<body>
<center>
<b>
<font color="blue">
*****Bonus Fat Absorbers As Seen On TV, Included Free With Purchase Of 2 Or More Bottle, $24.95 Value*****
</font>
<br>
<br>
***TAKE $10.00 OFF 2 & 3 MONTH SUPPLY ORDERS, $5.00 OFF 1 MONTH SUPPLY!
***AND STILL GET YOUR BONUS!  PRICE WILL BE DEDUCTED DURING PROCESSING.
<br>
<br>
***FAT ABSORBERS ARE GREAT FOR THOSE WHO WANT TO LOSE WEIGHT,  BUT CAN'T STAY ON A DIET***
<br>
<br>
***OFFER GOOD UNTIL MAY 27, 2002!  FOREIGN ORDERS INCLUDED!
<br>
<br>

<font color="blue">

LOSE 30 POUNDS  IN 30 DAYS... GUARANTEED!!!
<br>
<br>

All Natural Weight-Loss Program, Speeds Up The Metabolism Safely
Rated #1 In Both Categories of SAFETY & EFFECTIVENESS In<br>
(THE United States Today)
<br><br>
WE'LL HELP YOU GET THINNER!
WE'RE GOING TO HELP YOU LOOK GOOD, FEEL GOOD AND TAKE CONTROL IN
2002
<br>
<br>
</b>
</font color="blue">
</center>

Why Use Our Amazing Weight Loss Capsules?
<br><br>
*  They act like a natural magnet to attract fat.<br>
*  Stimulates t

# Split the train and test data

In [28]:
import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(ham_emails + spam_emails, dtype=object) 
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
                                                    random_state=42)

# Data preprocessing

In [29]:
from bs4 import BeautifulSoup

def html_to_plain_text(msg):
    def extract_email_body(msg):
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                if content_type in ["text/html", "text/plain"]:
                    payload = part.get_payload(decode=True)
                    charset = part.get_content_charset()
                    if not charset or charset.lower() == "default":
                        charset = "utf-8"
                    try:
                        return payload.decode(charset, errors='ignore')
                    except LookupError:
                        return payload.decode("utf-8", errors='ignore')
        else:
            payload = msg.get_payload(decode=True)
            charset = msg.get_content_charset()
            if not charset or charset.lower() == "default":
                charset = "utf-8"
            try:
                return payload.decode(charset, errors='ignore')
            except LookupError:
                return payload.decode("utf-8", errors='ignore')
        return ""

    raw_body = extract_email_body(msg)
    soup = BeautifulSoup(raw_body, "html.parser")
    text = soup.get_text(separator=' ', strip=True)
    return text


In [30]:
html_to_plain_text(msg)

'*****Bonus Fat Absorbers As Seen On TV, Included Free With Purchase Of 2 Or More Bottle, $24.95 Value***** ***TAKE $10.00 OFF 2 & 3 MONTH SUPPLY ORDERS, $5.00 OFF 1 MONTH SUPPLY!\n***AND STILL GET YOUR BONUS!  PRICE WILL BE DEDUCTED DURING PROCESSING. ***FAT ABSORBERS ARE GREAT FOR THOSE WHO WANT TO LOSE WEIGHT,  BUT CAN\'T STAY ON A DIET*** ***OFFER GOOD UNTIL MAY 27, 2002!  FOREIGN ORDERS INCLUDED! LOSE 30 POUNDS  IN 30 DAYS... GUARANTEED!!! All Natural Weight-Loss Program, Speeds Up The Metabolism Safely\nRated #1 In Both Categories of SAFETY & EFFECTIVENESS In (THE United States Today) WE\'LL HELP YOU GET THINNER!\nWE\'RE GOING TO HELP YOU LOOK GOOD, FEEL GOOD AND TAKE CONTROL IN\n2002 Why Use Our Amazing Weight Loss Capsules? *  They act like a natural magnet to attract fat. *  Stimulates the body\'s natural metabolism. *  Controls appetite naturally and makes it easier to\n   eat the right foods consistently. *  Reduces craving for sweets. *  Aids in the absorption of fat and in

In [31]:
import nltk

stemmer = nltk.PorterStemmer()

In [32]:
import urlextract

url_extractor = urlextract.URLExtract()

In [33]:
from sklearn.base import BaseEstimator, TransformerMixin
import re
from collections import Counter

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, lower_case=True,
                 remove_punctuation=True, replace_urls=True,
                 replace_numbers=True, stemming=True):
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = html_to_plain_text(email) or ""
            if self.lower_case:
                text = text.lower()
            if self.replace_urls and url_extractor is not None:
                urls = list(set(url_extractor.find_urls(text)))
                urls.sort(key=lambda url: len(url), reverse=True)
                for url in urls:
                    text = text.replace(url, " URL ")
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)
            word_counts = Counter(text.split())
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts
            X_transformed.append(word_counts)
        return np.array(X_transformed)

In [34]:
X_few = X_train[:3]
X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)
X_few_wordcounts

array([Counter({'number': 5, 'to': 4, 'use': 3, 'perl': 3, 'you': 3, 'messag': 3, 'your': 3, 'on': 2, 'url': 2, 'receiv': 2, 'thi': 2, 'and': 2, 'from': 2, 'or': 2, 'chang': 2, 'prefer': 2, 'daili': 1, 'headlin': 1, 'mailer': 1, 'two': 1, 'oscon': 1, 'lightn': 1, 'talk': 1, 'onlin': 1, 'post': 1, 'by': 1, 'gnat': 1, 'friday': 1, 'august': 1, 'news': 1, 'copyright': 1, 'pudg': 1, 'all': 1, 'right': 1, 'reserv': 1, 'have': 1, 'becaus': 1, 'subscrib': 1, 'it': 1, 'stop': 1, 'other': 1, 'add': 1, 'more': 1, 'pleas': 1, 'go': 1, 'user': 1, 'page': 1, 'can': 1, 'log': 1, 'in': 1, 'there': 1}),
       Counter({'the': 9, 'and': 9, 'number': 7, 'marla': 7, 'jack': 7, 'i': 7, 'you': 6, 'have': 6, 'to': 5, 'take': 5, 'parasit': 5, 'of': 4, 'meet': 4, 'it': 4, 'want': 4, 'fork': 3, 'com': 3, 'a': 3, 'can': 3, 'blood': 3, 'brain': 3, 'll': 3, 'that': 3, 'we': 3, 'admin': 2, 'xent': 2, 'on': 2, 'r': 2, 'hettinga': 2, 'aa': 2, 'hottest': 2, 'place': 2, 'women': 2, 'with': 2, 'big': 2, 'buck': 2, 'wha

In [35]:
from scipy.sparse import csr_matrix

class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        self.vocabulary_size = vocabulary_size
    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)
        most_common = total_count.most_common()[:self.vocabulary_size]
        self.vocabulary_ = {word: index + 1
                            for index, (word, count) in enumerate(most_common)}
        return self
    def transform(self, X, y=None):
        rows = []
        cols = []
        data = []
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                rows.append(row)
                cols.append(self.vocabulary_.get(word, 0))
                data.append(count)
        return csr_matrix((data, (rows, cols)),
                          shape=(len(X), self.vocabulary_size + 1))

In [36]:
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
X_few_vectors

<Compressed Sparse Row sparse matrix of dtype 'int64'
	with 26 stored elements and shape (3, 11)>

In [37]:
X_few_vectors.toarray()

array([[ 60,   5,   4,   0,   2,   3,   1,   0,   2,   0,   0],
       [187,   7,   5,   9,   9,   6,   6,   0,   0,   4,   7],
       [192,   9,  10,  10,   7,   4,   4,   8,   5,   3,   0]])

In [38]:
vocab_transformer.vocabulary_

{'number': 1,
 'to': 2,
 'the': 3,
 'and': 4,
 'you': 5,
 'have': 6,
 'name': 7,
 'thi': 8,
 'of': 9,
 'marla': 10}

In [41]:
from sklearn.pipeline import Pipeline

preprocess_pipeline = Pipeline([
    ("email_to_wordcount", EmailToWordCounterTransformer()),
    ("wordcount_to_vector", WordCounterToVectorTransformer()),
])

X_train_transformed = preprocess_pipeline.fit_transform(X_train)
X_test_transformed = preprocess_pipeline.transform(X_test)

In [42]:
from sklearn.metrics import precision_score, recall_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier

models = {
    "Random Forest": RandomForestClassifier(random_state=42),
    "Gradient Boosting": GradientBoostingClassifier(random_state=42),
    "XGBoost": XGBClassifier(random_state=42),
    "SVM": SVC(kernel='linear', probability=True, random_state=42),
    "KNN": KNeighborsClassifier(),
    "LogReg": LogisticRegression(max_iter=4000, random_state=42)
}

for name, model in models.items():
    model.fit(X_train_transformed, y_train)
    y_pred = model.predict(X_test_transformed)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    print(f"{name} in test set -> Precision: {precision:.2%}, Recall: {recall:.2%}")


Random Forest in test set -> Precision: 98.93%, Recall: 94.50%
Gradient Boosting in test set -> Precision: 97.61%, Recall: 91.65%
XGBoost in test set -> Precision: 98.95%, Recall: 95.93%
SVM in test set -> Precision: 96.96%, Recall: 97.35%
KNN in test set -> Precision: 90.53%, Recall: 83.71%
LogReg in test set -> Precision: 97.75%, Recall: 97.15%


As we can see, the best performing model in the test set what the logistic regression!