In [5]:
# Define data directories

%cd C:\Users\Natallia\PycharmProjects\spam-classifier

SPAM = "spam"
NON_SPAM = "non_spam"

EMAILS_DIR = os.path.join("training_emails")
EMAILS_SPAM_DIR = os.path.join(EMAILS_DIR, SPAM)
EMAILS_NON_SPAM_DIR = os.path.join(EMAILS_DIR, NON_SPAM)


C:\Users\Natallia\PycharmProjects\spam-classifier


In [4]:
# Helper to fetch training emails
# Skip if the emails are already downloaded

import email.policy

import os
import tarfile
from six.moves import urllib

HARD_HAM="hard_ham"
EASY_HAM="easy_ham"


DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"
SPAM_URL = DOWNLOAD_ROOT + "20021010_spam.tar.bz2"
NON_SPAM_URL = DOWNLOAD_ROOT + "20030228_"+EASY_HAM+".tar.bz2"


def fetch_spam_data(data_path=EMAILS_DIR):
    if not os.path.isdir(data_path):
        os.makedirs(data_path)
    for filename, url in (("ham.tar.bz2", NON_SPAM_URL), ("spam.tar.bz2", SPAM_URL)):
        path = os.path.join(data_path, filename)
        if not os.path.isfile(path):
            urllib.request.urlretrieve(url, path)
        tar_bz2_file = tarfile.open(path)
        tar_bz2_file.extractall(data_path)
        tar_bz2_file.close()
    os.remove(os.path.join(data_path,"ham.tar.bz2"))    
    os.remove(os.path.join(data_path,"spam.tar.bz2"))
    os.rename(os.path.join(data_path,EASY_HAM), os.path.join(EMAILS_DIR,"non_spam"))    
   
fetch_spam_data()


In [6]:
# load email test set 

import email.parser
import email.policy
import os
import re


spam_fileNames = [name for name in os.listdir(EMAILS_SPAM_DIR)]
non_spam_fileNames = [name for name in os.listdir(EMAILS_NON_SPAM_DIR)]


# Load emails (text) from the predefined folders
def load_email(is_spam, filename, file_path = EMAILS_DIR):
    directory = SPAM if is_spam else NON_SPAM
    with open(os.path.join(file_path, directory, filename), "rb") as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)


spam_emails = [load_email(is_spam=True, filename=name) for name in spam_fileNames]
non_spam_emails = [load_email(is_spam=False, filename=name) for name in non_spam_fileNames]


In [7]:
# extract text from test emails

from html import unescape


def html_to_plain_text(html):
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    # TODO test changes
    text = re.sub('<a\\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)


def email_to_text(email):
    html = None
    for part in email.walk():
        c_type = part.get_content_type()
        if c_type not in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except: # in case of encoding issues
            content = str(part.get_payload())
        if c_type == "text/plain":
            return content
        else:
            html = content
    if html:
        return html_to_plain_text(html)


spam_data = [email_to_text(x) for x in spam_emails]
non_spam_data = [email_to_text(x) for x in non_spam_emails]


In [8]:
# save test email text in a folder with expected structure

DATA_DIR = os.path.join("training_data")
DATA_SPAM_DIR = os.path.join(DATA_DIR, SPAM)
DATA_NON_SPAM_DIR = os.path.join(DATA_DIR, NON_SPAM)

for directory in [DATA_DIR, DATA_SPAM_DIR, DATA_NON_SPAM_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)
        

# save preprocessed email text 
def save_email_text(is_spam, filename, email, spam_path = DATA_DIR):
    directory = SPAM if is_spam else NON_SPAM    
    print(email, file=open(os.path.join(spam_path, directory, filename), 'w', encoding='utf-8'))
   

assert len(spam_fileNames) == len(spam_data)
assert len(non_spam_fileNames) == len(non_spam_data)


for i, text in enumerate(spam_data):
    save_email_text(is_spam=True, filename=spam_fileNames[i], email=text)
     
for i, text in enumerate(non_spam_data, start=0):
    save_email_text(is_spam=False, filename=non_spam_fileNames[i], email=text)
