 # Spam classifier

Download examples of spam and ham from Apache SpamAssassin’s public datasets.

In [444]:
# Import necessary libraries
import os
import tarfile
import urllib.request

import email
import email.policy

from collections import Counter

import numpy as np
from sklearn.model_selection import train_test_split

import re
from html import unescape

from sklearn.base import BaseEstimator, TransformerMixin


from scipy.sparse import csr_matrix

from sklearn.pipeline import Pipeline

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

from sklearn.metrics import precision_score, recall_score

In [445]:
# Define the root URL to download spam and ham datasets
DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"
HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"
SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"

# Define the local directory path to store the downloaded datasets
SPAM_PATH = os.path.join("datasets", "spam")

def fetch_spam_data(ham_url=HAM_URL, spam_url=SPAM_URL, spam_path=SPAM_PATH):
    # Create the directory if it doesn't already exist
    if not os.path.isdir(spam_path):
        os.makedirs(spam_path)

    # Download and extract both ham and spam files
    for filename, url in (("ham.tar.bz2", ham_url), ("spam.tar.bz2", spam_url)):
        path = os.path.join(spam_path, filename)

        # Download the file only if it doesn't already exist
        if not os.path.isfile(path):
            urllib.request.urlretrieve(url, path)

        # Open the .tar.bz2 archive
        tar_bz2_file = tarfile.open(path)

        # Extract all contents into the spam_path directory
        tar_bz2_file.extractall(path=spam_path)

        # Close the archive file
        tar_bz2_file.close()

In [446]:
fetch_spam_data()

  tar_bz2_file.extractall(path=spam_path)


In [447]:
# Define the path to the extracted "ham" (non-spam) email directory
HAM_DIR = os.path.join(SPAM_PATH, "easy_ham")

# Define the path to the extracted "spam" email directory
SPAM_DIR = os.path.join(SPAM_PATH, "spam")

# List all filenames in the ham directory, keeping only files with names longer than 20 characters
# (to exclude system files or metadata like 'cmds')
ham_filenames = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20]

# List all filenames in the spam directory, also filtering out short/irrelevant files
spam_filenames = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]

In [448]:
len(ham_filenames)

2500

In [449]:
len(spam_filenames)

500

In [450]:
def load_email(is_spam, filename, spam_path=SPAM_PATH):
    # Choose the directory based on whether the email is spam or not
    directory = "spam" if is_spam else "easy_ham"

    # Construct the full path to the email file and open it in binary mode ("rb")
    with open(os.path.join(spam_path, directory, filename), "rb") as f:
        # Use BytesParser to parse the raw email bytes with a modern parsing policy
        return email.parser.BytesParser(policy=email.policy.default).parse(f)

In [451]:
# Load all ham (non-spam) emails using the load_email function
ham_emails = [load_email(is_spam=False, filename=name) for name in ham_filenames]

# Load all spam emails using the load_email function
spam_emails = [load_email(is_spam=True, filename=name) for name in spam_filenames]

In [452]:
print(ham_emails[2].get_content().strip())

Man Threatens Explosion In Moscow 

Thursday August 22, 2002 1:40 PM
MOSCOW (AP) - Security officers on Thursday seized an unidentified man who
said he was armed with explosives and threatened to blow up his truck in
front of Russia's Federal Security Services headquarters in Moscow, NTV
television reported.
The officers seized an automatic rifle the man was carrying, then the man
got out of the truck and was taken into custody, NTV said. No other details
were immediately available.
The man had demanded talks with high government officials, the Interfax and
ITAR-Tass news agencies said. Ekho Moskvy radio reported that he wanted to
talk with Russian President Vladimir Putin.
Police and security forces rushed to the Security Service building, within
blocks of the Kremlin, Red Square and the Bolshoi Ballet, and surrounded the
man, who claimed to have one and a half tons of explosives, the news
agencies said. Negotiations continued for about one and a half hours outside
the building, ITAR-

In [453]:
print(spam_emails[4].get_content().strip())

I thought you might like these:
1) Slim Down - Guaranteed to lose 10-12 lbs in 30 days
http://www.freeyankee.com/cgi/fy2/to.cgi?l=822slim1

2) Fight The Risk of Cancer! 
http://www.freeyankee.com/cgi/fy2/to.cgi?l=822nic1 

3) Get the Child Support You Deserve - Free Legal Advice 
http://www.freeyankee.com/cgi/fy2/to.cgi?l=822ppl1

Offer Manager
Daily-Deals








If you wish to leave this list please use the link below.
http://www.qves.com/trim/?social@linux.ie%7C29%7C134077


-- 
Irish Linux Users' Group Social Events: social@linux.ie
http://www.linux.ie/mailman/listinfo/social for (un)subscription information.
List maintainer: listmaster@linux.ie


In [454]:
def get_email_structure(email):
    # If the input is already a plain string (not an email object), return it as-is
    if isinstance(email, str):
        return email

    # Get the payload (content) of the email
    payload = email.get_payload()

    # If the payload is a list, it's a multipart email (e.g., contains both plain text and HTML parts)
    if isinstance(payload, list):
        # Recursively call get_email_structure on each part and join their structures
        return "multipart({})".format(", ".join([
            get_email_structure(sub_email)
            for sub_email in payload
        ]))
    else:
        # If it's a single part, return its content type (e.g., text/plain, text/html)
        return email.get_content_type()

In [455]:
def structures_counter(emails):
    # Initialize a counter to count occurrences of each email structure
    structures = Counter()
    
    # Iterate through each email in the provided list
    for email in emails:
        # Get the structure of the email (e.g., text/plain, multipart(...))
        structure = get_email_structure(email)
        
        # Increment the count for this structure
        structures[structure] += 1

    # Return the dictionary-like Counter object showing frequencies of each structure
    return structures

In [456]:
structures_counter(ham_emails).most_common()

[('text/plain', 2408),
 ('multipart(text/plain, application/pgp-signature)', 66),
 ('multipart(text/plain, text/html)', 8),
 ('multipart(text/plain, text/plain)', 4),
 ('multipart(text/plain)', 3),
 ('multipart(text/plain, application/octet-stream)', 2),
 ('multipart(text/plain, text/enriched)', 1),
 ('multipart(text/plain, application/ms-tnef, text/plain)', 1),
 ('multipart(multipart(text/plain, text/plain, text/plain), application/pgp-signature)',
  1),
 ('multipart(text/plain, video/mng)', 1),
 ('multipart(text/plain, multipart(text/plain))', 1),
 ('multipart(text/plain, application/x-pkcs7-signature)', 1),
 ('multipart(text/plain, multipart(text/plain, text/plain), text/rfc822-headers)',
  1),
 ('multipart(text/plain, multipart(text/plain, text/plain), multipart(multipart(text/plain, application/x-pkcs7-signature)))',
  1),
 ('multipart(text/plain, application/x-java-applet)', 1)]

In [457]:
structures_counter(spam_emails).most_common()

[('text/plain', 218),
 ('text/html', 183),
 ('multipart(text/plain, text/html)', 45),
 ('multipart(text/html)', 20),
 ('multipart(text/plain)', 19),
 ('multipart(multipart(text/html))', 5),
 ('multipart(text/plain, image/jpeg)', 3),
 ('multipart(text/html, application/octet-stream)', 2),
 ('multipart(text/plain, application/octet-stream)', 1),
 ('multipart(text/html, text/plain)', 1),
 ('multipart(multipart(text/html), application/octet-stream, image/jpeg)', 1),
 ('multipart(multipart(text/plain, text/html), image/gif)', 1),
 ('multipart/alternative', 1)]

In [458]:
for header, value in spam_emails[0].items():
    print(header, ":", value)

Return-Path : <12a1mailbot1@web.de>
Delivered-To : zzzz@localhost.spamassassin.taint.org
Received : from localhost (localhost [127.0.0.1])	by phobos.labs.spamassassin.taint.org (Postfix) with ESMTP id 136B943C32	for <zzzz@localhost>; Thu, 22 Aug 2002 08:17:21 -0400 (EDT)
Received : from mail.webnote.net [193.120.211.219]	by localhost with POP3 (fetchmail-5.9.0)	for zzzz@localhost (single-drop); Thu, 22 Aug 2002 13:17:21 +0100 (IST)
Received : from dd_it7 ([210.97.77.167])	by webnote.net (8.9.3/8.9.3) with ESMTP id NAA04623	for <zzzz@spamassassin.taint.org>; Thu, 22 Aug 2002 13:09:41 +0100
From : 12a1mailbot1@web.de
Received : from r-smtp.korea.com - 203.122.2.197 by dd_it7  with Microsoft SMTPSVC(5.5.1775.675.6);	 Sat, 24 Aug 2002 09:42:10 +0900
To : dcek1a1@netsgo.com
Subject : Life Insurance - Why Pay More?
Date : Wed, 21 Aug 2002 20:31:57 -1600
MIME-Version : 1.0
Message-ID : <0103c1042001882DD_IT7@dd_it7>
Content-Type : text/html; charset="iso-8859-1"
Content-Transfer-Encoding : qu

In [459]:
spam_emails[1]["Subject"]

'[ILUG] Guaranteed to lose 10-12 lbs in 30 days 10.206'

In [460]:
# Combine ham and spam emails into a single NumPy array (X)
# Using dtype=object because the elements are complex objects (email messages)
X = np.array(ham_emails + spam_emails, dtype=object)

# Create labels: 0 for ham (non-spam), 1 for spam
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

# Split the dataset into training and testing sets
# 80% training data, 20% testing data
# random_state=42 ensures reproducibility
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [461]:
def html_to_plain_text(html):
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub('<a\s.*?>',   'HYPERLINK', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)

In [462]:

def html_to_plain_text(html):
    # Remove the content inside <head>...</head> tags (e.g., metadata, scripts, styles)
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)

    # Replace all hyperlinks with a placeholder text "HYPERLINK"
    text = re.sub('<a\s.*?>', 'HYPERLINK', text, flags=re.M | re.S | re.I)

    # Remove all remaining HTML tags (e.g., <div>, <p>, <br>, etc.)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)

    # Replace multiple consecutive newlines and whitespace with a single newline
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)

    # Convert HTML entities (like &amp;, &gt;) to their corresponding characters
    return unescape(text)

In [463]:
print(html_to_plain_text(sample_html_spam.get_content())[:1000], "...")


OTC
 Newsletter
Discover Tomorrow's Winners 
For Immediate Release
Cal-Bay (Stock Symbol: CBYI)
Watch for analyst "Strong Buy Recommendations" and several advisory newsletters picking CBYI.  CBYI has filed to be traded on the OTCBB, share prices historically INCREASE when companies get listed on this larger trading exchange. CBYI is trading around 25 cents and should skyrocket to $2.66 - $3.25 a share in the near future.
Put CBYI on your watch list, acquire a position TODAY.
REASONS TO INVEST IN CBYI
A profitable company and is on track to beat ALL earnings estimates!
One of the FASTEST growing distributors in environmental & safety equipment instruments.
Excellent management team, several EXCLUSIVE contracts.  IMPRESSIVE client list including the U.S. Air Force, Anheuser-Busch, Chevron Refining and Mitsubishi Heavy Industries, GE-Energy & Environmental Research.
RAPIDLY GROWING INDUSTRY
Industry revenues exceed $900 million, estimates indicate that there could be as much as $25 billi

In [464]:
def email_to_text(email):
    html = None  # Initialize a variable to store HTML content if found

    # Traverse all parts of the email (especially useful for multipart emails)
    for part in email.walk():
        ctype = part.get_content_type()  # Get the MIME type (e.g., text/plain, text/html)

        # Skip parts that are not plain text or HTML
        if ctype not in ("text/plain", "text/html"):
            continue

        # Try to get the email content safely (handle decoding issues)
        try:
            content = part.get_content()
        except:
            # Fallback in case decoding fails
            content = str(part.get_payload())

        # If plain text is found, return it directly
        if ctype == "text/plain":
            return content
        else:
            # Save HTML content in case no plain text is found
            html = content

    # If no plain text was found, convert the HTML content to plain text
    if html:
        return html_to_plain_text(html)

In [465]:
print(email_to_text(sample_html_spam)[:100], "...")


OTC
 Newsletter
Discover Tomorrow's Winners 
For Immediate Release
Cal-Bay (Stock Symbol: CBYI)
Wat ...


In [466]:
try:
    import nltk

    stemmer = nltk.PorterStemmer()
    for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
        print(word, "=>", stemmer.stem(word))
except ImportError:
    print("Error: stemming requires the NLTK modulle.")
    stemmer= None

Computations => comput
Computation => comput
Computing => comput
Computed => comput
Compute => comput
Compulsive => compuls


In [467]:
try:
    # Try importing the URLExtract class from the urlextract module
    import urlextract

    # Create an instance of the URL extractor
    url_extractor = urlextract.URLExtract()

    # Test the extractor on a sample string with URLs
    print(url_extractor.find_urls("Will it detect github.com and https://youtu.be/7Pq-S557XQU?t=3m32s"))

except ImportError:
    # If the urlextract module is not installed, show a warning and set url_extractor to None
    print("Error: replacing URLs requires the urlextract module.")
    url_extractor = None

['github.com', 'https://youtu.be/7Pq-S557XQU?t=3m32s']


In [468]:

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
                 replace_urls=True, replace_numbers=True, stemming=True):
        # Configurable preprocessing options
        self.strip_headers = strip_headers            # (Not used in transform, could be added)
        self.lower_case = lower_case                    # Convert text to lowercase
        self.remove_punctuation = remove_punctuation    # Remove punctuation characters
        self.replace_urls = replace_urls                # Replace URLs with "URL" token
        self.replace_numbers = replace_numbers          # Replace numbers with "NUMBER" token
        self.stemming = stemming                          # Apply stemming to reduce words to roots

    def fit(self, X, y=None):
        # No training needed; just return self
        return self

    def transform(self, X, y=None):
        X_transformed = []

        # Process each email in the input dataset X
        for email in X:
            # Extract the email text (plain or html cleaned)
            text = email_to_text(email) or ""

            # Convert to lowercase if option enabled
            if self.lower_case:
                text = text.lower()

            # Replace URLs if option enabled and url_extractor is available
            if self.replace_urls and url_extractor is not None:
                # Extract unique URLs found in the text
                urls = list(set(url_extractor.find_urls(text)))

                # Sort URLs by length descending to avoid partial replacements
                urls.sort(key=lambda url: len(url), reverse=True)

                # Replace each URL in the text with the placeholder "URL"
                for url in urls:
                    text = text.replace(url, " URL ")

            # Replace numbers (including floats and scientific notation) with "NUMBER"
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)

            # Remove punctuation by replacing non-word characters with spaces
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)

            # Count the frequency of each word in the cleaned text
            word_counts = Counter(text.split())

            # Apply stemming if enabled and stemmer is available
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts

            # Append the word frequency dictionary for this email
            X_transformed.append(word_counts)

        # Return a numpy array of word count dictionaries (dtype=object)
        return np.array(X_transformed)

In [469]:
X_few = X_train[:3]  # Take the first 3 emails from the training set
X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)  # Transform those emails into word count vectors
X_few_wordcounts  # Display the result

array([Counter({'chuck': 1, 'murcko': 1, 'wrote': 1, 'stuff': 1, 'yawn': 1, 'r': 1}),
       Counter({'the': 11, 'of': 9, 'and': 8, 'all': 3, 'christian': 3, 'to': 3, 'by': 3, 'jefferson': 2, 'i': 2, 'have': 2, 'superstit': 2, 'one': 2, 'on': 2, 'been': 2, 'ha': 2, 'half': 2, 'rogueri': 2, 'teach': 2, 'jesu': 2, 'some': 1, 'interest': 1, 'quot': 1, 'url': 1, 'thoma': 1, 'examin': 1, 'known': 1, 'word': 1, 'do': 1, 'not': 1, 'find': 1, 'in': 1, 'our': 1, 'particular': 1, 'redeem': 1, 'featur': 1, 'they': 1, 'are': 1, 'alik': 1, 'found': 1, 'fabl': 1, 'mytholog': 1, 'million': 1, 'innoc': 1, 'men': 1, 'women': 1, 'children': 1, 'sinc': 1, 'introduct': 1, 'burnt': 1, 'tortur': 1, 'fine': 1, 'imprison': 1, 'what': 1, 'effect': 1, 'thi': 1, 'coercion': 1, 'make': 1, 'world': 1, 'fool': 1, 'other': 1, 'hypocrit': 1, 'support': 1, 'error': 1, 'over': 1, 'earth': 1, 'six': 1, 'histor': 1, 'american': 1, 'john': 1, 'e': 1, 'remsburg': 1, 'letter': 1, 'william': 1, 'short': 1, 'again': 1, 'becom

In [470]:
class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        # Max number of words to keep in vocabulary (most frequent)
        self.vocabulary_size = vocabulary_size

    def fit(self, X, y=None):
        # Aggregate word counts across all documents (emails)
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                # Clip count to 10 to reduce effect of very frequent words
                total_count[word] += min(count, 10)

        # Select the most common words up to the vocabulary size
        most_common = total_count.most_common(self.vocabulary_size)

        # Create a vocabulary mapping word -> index (starting from 1)
        # Index 0 is reserved for unknown words (not in vocabulary)
        self.vocabulary_ = {word: index + 1 for index, (word, count) in enumerate(most_common)}
        return self

    def transform(self, X, y=None):
        rows = []
        cols = []
        data = []

        # Convert each word count dictionary into sparse matrix format
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                # Only include words found in the vocabulary, else index 0 (ignored by sparse matrix)
                col_index = self.vocabulary_.get(word, 0)
                rows.append(row)
                cols.append(col_index)
                data.append(count)

        # Create a Compressed Sparse Row (CSR) matrix of shape (num_samples, vocabulary_size + 1)
        # +1 because index 0 is for unknown words (will be zeroed out)
        return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size + 1))

In [471]:
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
X_few_vectors

<Compressed Sparse Row sparse matrix of dtype 'int64'
	with 20 stored elements and shape (3, 11)>

In [472]:
X_few_vectors.toarray()

array([[ 6,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
       [99, 11,  9,  8,  3,  1,  3,  1,  3,  2,  3],
       [67,  0,  1,  2,  3,  4,  1,  2,  0,  1,  0]])

In [473]:
vocab_transformer.vocabulary_

{'the': 1,
 'of': 2,
 'and': 3,
 'to': 4,
 'url': 5,
 'all': 6,
 'in': 7,
 'christian': 8,
 'on': 9,
 'by': 10}

In [474]:
# Create a pipeline to preprocess raw email data into numeric feature vectors
preprocess_pipeline = Pipeline([
    # Step 1: Convert each email to a word frequency Counter dictionary
    ("email_to_wordcount", EmailToWordCounterTransformer()),

    # Step 2: Convert the word count dictionaries into sparse numeric vectors
    ("wordcount_to_vector", WordCounterToVectorTransformer()),
])

# Fit the pipeline on the training emails and transform them into feature vectors
X_train_transformed = preprocess_pipeline.fit_transform(X_train)

In [475]:
# Initialize a Logistic Regression classifier with:
# - 'lbfgs' solver (efficient for small/medium datasets)
# - max_iter=1000 to allow enough iterations for convergence
# - random_state=42 for reproducibility
log_clf = LogisticRegression(solver="lbfgs", max_iter=1000, random_state=42)

# Perform 3-fold cross-validation on the training data
# Evaluates the model's performance by splitting data into 3 parts and training/testing
# verbose=3 shows detailed progress output during cross-validation
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3)

# Calculate and return the average accuracy score across the 3 folds
score.mean()

[CV] END ................................ score: (test=0.981) total time=   0.1s
[CV] END ................................ score: (test=0.981) total time=   0.3s
[CV] END ................................ score: (test=0.990) total time=   0.2s


np.float64(0.9841666666666665)

In [476]:
# Transform the test emails into feature vectors using the same preprocessing pipeline
X_test_transformed = preprocess_pipeline.transform(X_test)

# Initialize the Logistic Regression classifier with the same parameters
log_clf = LogisticRegression(solver="lbfgs", max_iter=1000, random_state=42)

# Train the classifier on the full training dataset
log_clf.fit(X_train_transformed, y_train)

# Predict labels (spam=1, ham=0) for the test dataset
y_pred = log_clf.predict(X_test_transformed)

# Calculate and print the precision of the predictions
# Precision = TP / (TP + FP), measures accuracy of positive (spam) predictions
print("Precision: {:.2f}%".format(100 * precision_score(y_test, y_pred)))

# Calculate and print the recall of the predictions
# Recall = TP / (TP + FN), measures ability to detect all positive (spam) cases
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

Precision: 94.90%
Recall: 97.89%
