In [None]:
import os
import tarfile
import urllib
import urlextract
import sklearn.model_selection
import pandas as pd
import numpy as np
import nltk
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from zlib import crc32
from pandas.plotting import scatter_matrix
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import ElasticNet
from sklearn import linear_model
from sklearn.model_selection import GridSearchCV
from scipy import stats
from collections import Counter
import email
import email.policy
import re
from html import unescape


In [None]:
DOWNLOAD_ROOT = "https://spamassassin.apache.org/old/publiccorpus/"
SPAM_PATH = os.path.join("datasets", "spam")
HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"
SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"
def fetch_spam_data(spam_url, ham_url, spam_path):
    if not os.path.isdir(spam_path):
        os.mkdir(spam_path)
    for filename, url in (("ham.tar.bz2", ham_url), ("spam.tar.bz2",spam_url)):
        path = os.path.join(spam_path,filename)
        if not os.path.isfile(path):
            urllib.request.urlretrieve(url,path)
        tar_bz2_file = tarfile.open(path)
        tar_bz2_file.extractall(path=spam_path)
        tar_bz2_file.close()
        

In [None]:
fetch_spam_data(SPAM_URL, HAM_URL, SPAM_PATH)

In [None]:
HAM_DIR = os.path.join(SPAM_PATH, "easy_ham")
SPAM_DIR = os.path.join(SPAM_PATH, "spam")
ham_filenames = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20]
spam_filenames = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]

In [None]:
def load_emails(is_spam, filename, spam_path = SPAM_PATH):
    directory = "spam" if is_spam else "easy_ham"
    with open(os.path.join(spam_path, directory, filename), "rb") as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)


In [None]:
spam_emails = [load_emails(True, spam) for spam in spam_filenames]
ham_emails = [load_emails(False, ham) for ham in ham_filenames]

In [None]:
print(ham_emails[1].get_content().strip())

In [None]:
print(spam_emails[3].get_content().strip())

In [None]:
X = np.array(ham_emails + spam_emails, dtype=object)
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

train_x, test_x, train_y, test_y = train_test_split(X,y,test_size = 0.2, random_state = 42)

In [None]:
def html_to_plain_text(html):
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)
def get_email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload()
    if isinstance(payload, list):
        return "multipart({})".format(", ".join([
            get_email_structure(sub_email)
            for sub_email in payload
        ]))
    else:
        return email.get_content_type()
def structures_counter(emails):
    structures = Counter()
    for email in emails:
        structure = get_email_structure(email)
        structures[structure] += 1
    return structures
def email_to_text(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain","text/html"):
            continue
        try: 
            content = part.get_content()
        except:
            content = str(part.get_payload())
        if ctype == "text/plain":
            return content
        else:
            html = content
    if html:
        return html_to_plain_text(html)
class EmailWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers = True, lower_case = True,
                 remove_punctuation = True, replace_urls=True,
                 replace_numbers = True, stemming = True):
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming
    def fit(self, X, y= None):
        return self
    def transform(self,X,y =None):
        transformed_text = []
        for email in X:
            text = email_to_text(email) or ""
            if self.lower_case: text = text.lower()
            if self.replace_urls:
                urls = list(set(url_extractor.find_urls(text)))
                urls.sort(key = lambda url: len(url), reverse = True)
                for url in urls:
                    text = text.replace(url, "URL")
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)
            word_counts = Counter(text.split())
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word,count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts
                transformed_text.append(word_counts)
            return np.array(transformed_text)

In [None]:
structures_counter(ham_emails).most_common()

In [None]:
structures_counter(spam_emails).most_common()

In [None]:
html_spam_emails = [email for email in train_x[train_y == 1] if get_email_structure(email) == "text/html"]
sample_spam = html_spam_emails[7]
print(sample_spam.get_content().strip()[:1000], "")

In [None]:
print(html_to_plain_text(sample_spam.get_content())[:1000], "")

In [None]:
train_email_text = [email_to_text(email) for email in train_x]
print(train_email_text[2])

In [None]:
stemmer = nltk.PorterStemmer()
url_extractor = urlextract.URLExtract()

In [None]:
sample_transform = train_x[:10]
sample_transform_wordcounts = EmailToWordCounterTransformer().fit_transform(sample_transform)

In [None]:
sample_transform_wordcounts