## Initialize

In [None]:
# Email Classifier

## Initialize

In [2]:
# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt

# Where to save the figures
PROJECT_ROOT_DIR = "."
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images")
os.makedirs(IMAGES_PATH, exist_ok=True)




In [5]:
import importlib
def import_module(package):
  try:
    return importlib.import_module(f"{package}")
  except ImportError:
    !pip install -U {package}
    return importlib.import_module(f"{package}")
#!pip uninstall --y urlextract 
#bs = import_module(package='BeautifulSoup',module='bs4')
#urlextract = import_module('urlextract')
#url_extractor = urlextract.URLExtract()
#bs = import_module('bs4').BeautifulSoup


## Fetch Data

In [8]:

# Generic method to fetch and optionally unzip any file
import tarfile
from urllib.request import urlretrieve
from urllib.parse import urlparse

def fetch_data(data_url, data_path, unzip=True):
    #create local path if not present
    filename = os.path.basename(urlparse(data_url).path)
    if not os.path.isdir(data_path):
        os.makedirs(data_path)
    file_path = os.path.join(data_path, filename)
    if not os.path.isfile(file_path):
        urllib.request.urlretrieve(data_url, file_path)
    if unzip:
        tar_bz2_file = tarfile.open(file_path)
        tar_bz2_file.extractall(path=data_path)
        tar_bz2_file.close()


In [9]:
# Fetch spam data

DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"
HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"
SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"
SPAM_PATH = os.path.join("datasets", "spam")
fetch_data(HAM_URL, SPAM_PATH)
fetch_data(SPAM_URL, SPAM_PATH)

In [11]:
#Load spam data

def files_in_dir(dir_path, min_file_len=20):
    return [name for name in sorted(os.listdir(dir_path)) if len(name) > min_file_len]


In [12]:
ham_path = os.path.join(SPAM_PATH, "easy_ham")
spam_path = os.path.join(SPAM_PATH, "spam")
ham_filenames = files_in_dir(ham_path)
spam_filenames = files_in_dir(spam_path)

In [13]:
#load emails from filesystem

import email
import email.policy

def load_email(filename, file_path):
    with open(os.path.join(file_path, filename), "rb") as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)

In [14]:
ham_emails = [load_email(name, ham_path) for name in ham_filenames]
spam_emails = [load_email(name, spam_path) for name in spam_filenames]

In [15]:
#return email content-type

def get_email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload()
    if isinstance(payload, list):
        return "multipart({})".format(", ".join([
            get_email_structure(sub_email)
            for sub_email in payload
        ]))
    else:
        return email.get_content_type()

In [16]:
#count emails per content-type
from collections import Counter

def structures_counter(emails):
    structures = Counter()
    for email in emails:
        structure = get_email_structure(email)
        structures[structure] += 1
    return structures

## Prepare Data

In [17]:
#split tran and test data

import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(ham_emails + spam_emails)
#y=0 for ham and y=1 for spam
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [22]:
#sperate spam emails with text/html content-type

html_spam_emails = [email for email in X_train[y_train==1]
                    if get_email_structure(email) == "text/html"]
sample_html_spam = html_spam_emails[7]


In [23]:
#convert html content to text

bs = import_module('bs4').BeautifulSoup
def email_to_text(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except: # in case of encoding issues
            content = str(part.get_payload())
        if ctype == "text/plain":
            return content
        else:
            html = content
    if html:
        return bs(html).get_text()

In [24]:
#sample email conversion
print(email_to_text(sample_html_spam)[:100], "...")

A:link {TEX-DECORATION: none}A:active {TEXT-DECORATION: none}A:visited {TEXT-DECORATION: none}A:hove ...


In [25]:
nltk = import_module('nltk')
stemmer = nltk.PorterStemmer()
for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
    print(word, "=>", stemmer.stem(word))


Computations => comput
Computation => comput
Computing => comput
Computed => comput
Compute => comput
Compulsive => compuls


In [38]:
urlextract = import_module('urlextract')    
url_extractor = urlextract.URLExtract()


In [27]:
from sklearn.base import BaseEstimator, TransformerMixin
import re

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
                 replace_urls=True, replace_numbers=True, stemming=True):
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = email_to_text(email) or ""
            if self.lower_case:
                text = text.lower()
            if self.replace_urls and url_extractor is not None:
                urls = list(set(url_extractor.find_urls(text)))
                urls.sort(key=lambda url: len(url), reverse=True)
                for url in urls:
                    text = text.replace(url, " URL ")
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*(?:[eE]\d+))?', 'NUMBER', text)
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)
            word_counts = Counter(text.split())
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts
            X_transformed.append(word_counts)
        return np.array(X_transformed)

In [41]:
from scipy.sparse import csr_matrix

class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        self.vocabulary_size = vocabulary_size
    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)
        most_common = total_count.most_common()[:self.vocabulary_size]
        self.most_common_ = most_common
        self.vocabulary_ = {word: index + 1 for index, (word, count) in enumerate(most_common)}
        return self
    def transform(self, X, y=None):
        rows = []
        cols = []
        data = []
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                rows.append(row)
                cols.append(self.vocabulary_.get(word, 0))
                data.append(count)
        return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size + 1))

In [42]:
from sklearn.pipeline import Pipeline

preprocess_pipeline = Pipeline([
    ("email_to_wordcount", EmailToWordCounterTransformer()),
    ("wordcount_to_vector", WordCounterToVectorTransformer()),
])

X_train_transformed = preprocess_pipeline.fit_transform(X_train)

## Train Model

In [43]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

log_clf = LogisticRegression(solver="lbfgs", random_state=42, max_iter=1000)
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3)
score.mean()

[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


[CV]  ................................................................
[CV] .................................... , score=0.980, total=   0.3s
[CV]  ................................................................


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.3s remaining:    0.0s


[CV] .................................... , score=0.981, total=   0.3s
[CV]  ................................................................


[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    0.6s remaining:    0.0s


[CV] .................................... , score=0.989, total=   0.4s


[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:    1.0s finished


0.9833333333333334

In [45]:
from sklearn.metrics import precision_score, recall_score

X_test_transformed = preprocess_pipeline.transform(X_test)

log_clf = LogisticRegression(solver="lbfgs", random_state=42, max_iter=1000)
log_clf.fit(X_train_transformed, y_train)

y_pred = log_clf.predict(X_test_transformed)

print("Precision: {:.2f}%".format(100 * precision_score(y_test, y_pred)))
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

Precision: 94.90%
Recall: 97.89%
