<a href="https://colab.research.google.com/github/AngelCBC/spam-classifier/blob/main/spam_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%cd "/content"

/content


In [None]:
# Install the urlextract module.

!pip3 install urlextract

Collecting urlextract
  Downloading urlextract-1.5.0-py3-none-any.whl (20 kB)
Collecting uritools
  Downloading uritools-3.0.2-py3-none-any.whl (12 kB)
Collecting platformdirs
  Downloading platformdirs-2.4.0-py3-none-any.whl (14 kB)
Installing collected packages: uritools, platformdirs, urlextract
Successfully installed platformdirs-2.4.0 uritools-3.0.2 urlextract-1.5.0


In [None]:
# Download some spamassassin datasets.

import os, tarfile
from urllib.request import urlretrieve

DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"

FILE_NAMES = ["20030228_easy_ham.tar.bz2",
            "20030228_spam.tar.bz2",
            "20021010_hard_ham.tar.bz2",
            "20030228_easy_ham_2.tar.bz2",
            "20030228_spam_2.tar.bz2"]

def fetch_data(download_url=DOWNLOAD_ROOT, file_names=FILE_NAMES):
    for name in file_names:
        db_url = download_url + name
        urlretrieve(db_url, name)
        tar_file = tarfile.open(name)
        tar_file.extractall(path=os.getcwd())
        tar_file.close()
        os.remove(name)

fetch_data()

In [None]:
# Parse the emails.

from glob import glob
from email.policy import default
from email.parser import BytesParser
from os import listdir

def parse_emails():
    folder_paths = list(filter(lambda x: ("spam" in x) or ("ham" in x), listdir()))
    email_list, type_list = list(), list()
    for folder in folder_paths:
        email_type = 1 if "spam" in folder else 0
        email_paths = glob(folder + "/*")
        for path in email_paths:
            with open(path, "rb") as fp:
                mail = BytesParser(policy=default).parse(fp)
            email_list.append(mail)
            type_list.append(email_type)
    return email_list, type_list

emails, labels = parse_emails()
print(f"{len(emails)} emails are included")

6051 emails are included


In [None]:
# Training and testing split.

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(emails, labels,
                                                    test_size=0.33, 
                                                    random_state=42,
                                                    shuffle=True,
                                                    stratify=labels)

In [None]:
# Replace urls with "URL".

from urlextract import URLExtract

def replace_urls(doc):
    url_extractor = URLExtract()
    urls = url_extractor.find_urls(doc)
    for url in urls:
        doc = doc.replace(url, " URL ")
    return doc

In [None]:
# Replace numbers with "NUM".

import re

def replace_nums(doc):
    filtered = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUM', doc)
    return filtered

In [None]:
# Normalize a certain word.

import nltk, string

def normalize_token(word):
    translator = str.maketrans("", "", string.punctuation) # delete punct
    word_modif = word.translate(translator).lower() # lowercase
    stemmer = nltk.PorterStemmer() # stemmer
    return stemmer.stem(word_modif)

In [None]:
# Create the vocabulary for one email. 

def build_vocabulary(doc):
    doc_vocab = set()
    for word in doc.split():
        doc_vocab.add(normalize_token(word))
    return doc_vocab

In [None]:
# Whole vocabulary creation with the training set emails.

whole_vocab = set()

for ix, doc in enumerate(X_train):
    try:
        doc = replace_urls(doc.get_body().get_content())
        doc = replace_nums(doc)
        doc_vocab = build_vocabulary(doc)
        whole_vocab = whole_vocab.union(doc_vocab)
    except:
        # Delete bad candidates from the training set:
        X_train.pop(ix)
        y_train.pop(ix)

# Get rid of common words:

no_info_words = {"", "from", "to", "a", "an", "is", "of", "and", "are", "the"}
whole_vocab = whole_vocab - no_info_words

# Get rid of long words:

filtered_vocab = [x for x in list(whole_vocab) if len(x) < 10]

print(f"The vocabulary consists of {len(filtered_vocab)} words")

The vocabulary consists of 33348 words


In [None]:
# Clean the test set.

for ix, doc in enumerate(X_test):
    try:
        doc = doc.get_body().get_content()
    except:
        X_test.pop(ix)
        y_test.pop(ix)

In [None]:
# Word X_train matrix creation.

import pandas as pd

word_matrix = {}
for ix, doc in enumerate(X_train):
    doc_counts = {}
    doc = replace_urls(doc.get_body().get_content())
    doc = replace_nums(doc)
    doc_words = [normalize_token(x) for x in doc.split()]
    for token in filtered_vocab:
        doc_counts[token] = doc_words.count(token)
    doc_name = "doc_train {}".format(ix)
    word_matrix[doc_name] = doc_counts

X_train_matrix = pd.DataFrame(word_matrix).T

# Word X_test matrix creation.

word_matrix = {}
for ix, doc in enumerate(X_test):
    doc_counts = {}
    doc = replace_urls(doc.get_body().get_content())
    doc = replace_nums(doc)
    doc_words = [normalize_token(x) for x in doc.split()]
    for token in filtered_vocab:
        doc_counts[token] = doc_words.count(token)
    doc_name = "doc_test {}".format(ix)
    word_matrix[doc_name] = doc_counts

X_test_matrix = pd.DataFrame(word_matrix).T

In [None]:
len(X_train_matrix), len(y_train), len(X_test_matrix), len(y_test)

(4006, 4006, 1972, 1972)

In [None]:
# Traing classification acc with LogisticRegression model and CV.

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

log_clf = LogisticRegression(max_iter=1000, random_state=42)
score = cross_val_score(log_clf, X_train_matrix, y_train, cv=3, verbose=3)
score.mean()

[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


[CV] END ................................ score: (test=0.974) total time= 3.2min


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:  3.3min remaining:    0.0s


[CV] END ................................ score: (test=0.973) total time=  41.3s


[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:  3.9min remaining:    0.0s


[CV] END ................................ score: (test=0.971) total time= 3.3min


[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:  7.2min finished


0.9725408733095606

In [None]:
# Precision and Recall computation of the test set classification.

from sklearn.metrics import precision_score, recall_score

log_clf = LogisticRegression(solver="lbfgs", max_iter=1000, random_state=42)
log_clf.fit(X_train_matrix, y_train)

y_pred = log_clf.predict(X_test_matrix)

print("Precision: {:.2f}%".format(100 * precision_score(y_test, y_pred)))
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression


Precision: 97.14%
Recall: 95.85%
