### Fetch Data

**Importing modules**

In [43]:
import os
import tarfile
import urllib
from collections import Counter
import re
from html import unescape

import email
from email import policy

import numpy as np
import pandas as pd
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.model_selection import cross_val_score

from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.feature_extraction.text import CountVectorizer

import nltk

In [2]:
DOWNLOAD_ROOT = "https://spamassassin.apache.org/old/publiccorpus/"
HAM_URL = DOWNLOAD_ROOT + "20021010_hard_ham.tar.bz2"
SPAM_URL = DOWNLOAD_ROOT + "20021010_spam.tar.bz2"
SPAM_PATH = os.path.join("data", "spam")

def fetch_spam_data(spam_url=SPAM_URL, spam_path=SPAM_PATH):

    if not os.path.isdir(spam_path):
        os.makedirs(spam_path)

    for filename, url in (("ham.tar.bz2", HAM_URL), ("spam.tar.bz2", SPAM_URL)):

        path = os.path.join(spam_path, filename)
        
        if not os.path.isfile(path):
            urllib.request.urlretrieve(url, path)
        tar_bz2_file = tarfile.open(path)
        tar_bz2_file.extractall(path=SPAM_PATH)
        tar_bz2_file.close()

fetch_spam_data()
        

**Loading Emails**

In [129]:
HAM_DIR = os.path.join(SPAM_PATH, "hard_ham")
SPAM_DIR = os.path.join(SPAM_PATH, "spam")

ham_list = os.listdir(HAM_DIR)
spam_list = os.listdir(SPAM_DIR)



In [130]:
def load_emails(is_spam, filename, spam_path=SPAM_PATH):
    dir = "spam" if is_spam else "hard_ham"
    
    with open(os.path.join(spam_path, dir, filename), "rb") as file:
        return email.parser.BytesParser(policy=policy.default).parse(file)

ham = [load_emails(False, name) for name in ham_list]
spam = [load_emails(True, name) for name in spam_list]



In [131]:
def email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload()
    if isinstance(email, list):
        return "multipart({})".format(
            ",".join([
                email_structure(sub) for sub in payload
            ])
        )
    else:
        return email.get_content_type()


def structures_counter(emails):
    structures = Counter()
    for email in emails:
        structure = email_structure(email)
        structures[structure] += 1
    return structures

structures_counter(ham).most_common()
structures_counter(spam).most_common()



[('text/plain', 222),
 ('text/html', 181),
 ('multipart/alternative', 47),
 ('multipart/mixed', 43),
 ('multipart/related', 8)]

In [132]:
X = np.array(ham + spam, dtype='object')
y = np.array([0] * len(ham) + [1] * len(spam))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [143]:
def html_converter(html):
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub('<a\s.*?>', 'HYPERLINK', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', "", text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.S | re.M)

    return unescape(text)


def email_converter(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except:
            content = str(part.get_payload())

        if ctype == "text/plain":
            return content
        else:
            html = content

    if html:
        return html_converter(html)





X_train_email =pd.DataFrame([email_converter(x) for x in X_train], columns=["text"])
X_train_email

X_test_email = pd.DataFrame([email_converter(x) for x in X_test], columns=["text"])

y_train, y_test = pd.Series(y_train), pd.Series(y_test)

X_test_email.fillna('', inplace=True)


In [134]:
def email_converter(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except:
            content = str(part.get_payload())

        if ctype == "text/plain":
            return content
        else:
            html = content
    
    if html:
        return html_converter(html)



In [144]:
cv = CountVectorizer(max_features=3000, stop_words='english')
train_X = cv.fit_transform(X_train_email['text']).toarray()
test_X = cv.transform(X_test_email['text']).toarray()



In [145]:
forest_clf = RandomForestClassifier(random_state=0)
forest_clf.fit(train_X, y_train)


RandomForestClassifier(random_state=0)

In [146]:
cross_val_score(forest_clf, train_X, y_train, cv=3, scoring="accuracy")

array([0.91 , 0.945, 0.93 ])

In [151]:
y_pred = forest_clf.predict(test_X)

f1_score(y_test, y_pred)



# test_X

0.9607843137254903

In [152]:
precision_score(y_test, y_pred)


0.9333333333333333

In [153]:
recall_score(y_test, y_pred)


0.98989898989899