In [None]:
#Apache Spam Classifier

# Get the Data

In [44]:
import requests
import os
import shutil

URL_ROOT = 'https://spamassassin.apache.org/old/publiccorpus/'
HAM_FILE = '20021010_easy_ham.tar.bz2'
SPAM_FILE = '20021010_spam.tar.bz2'


MAIN_PATH = os.getcwd()
SOURCE_PATH = os.path.join(MAIN_PATH, 'data/source/')

def reset_directory(directory):
    if os.path.exists(directory):
        shutil.rmtree(directory)
    os.makedirs(directory) 

def download_spam():
    """ Download Spam/Ham sources into local directory """
    reset_directory(SOURCE_PATH)
    
    for filename in [HAM_FILE, SPAM_FILE]:
        r = requests.get(URL_ROOT + filename)
        filepath = os.path.join(SOURCE_PATH, filename)
        with open(filepath, 'wb') as destination:
            destination.write(r.content)



In [45]:
download_spam()

In [46]:
import tarfile

EMAILS_PATH = os.path.join(SOURCE_PATH, 'emails/')

def extract_spam():
    """ Extracts spam email files from compressed directory """
    reset_directory(EMAILS_PATH)
    
    for file in os.listdir(SOURCE_PATH):
        if file.endswith(".bz2"): 
            filepath = os.path.join(SOURCE_PATH, file)
            tar = tarfile.open(filepath, mode='r:bz2')
            tar.extractall(EMAILS_PATH)
        else:
             continue


In [47]:
extract_spam()

# Prepare the Data

In [48]:
import random
import glob
import re



TRAINING_PATH = os.path.join(MAIN_PATH, 'data/training/source/')
TEST_PATH = os.path.join(MAIN_PATH, 'data/test/source/')

EMAIL_SPAM_PATH = os.path.join(EMAILS_PATH, 'spam/')
EMAIL_HAM_PATH = os.path.join(EMAILS_PATH, 'easy_ham/')

NUM_SPAM = len(os.listdir(EMAIL_SPAM_PATH))
NUM_HAM = len(os.listdir(EMAIL_HAM_PATH))



def split_spam_train_test(seed=50, percent_test=20):
    """ Splits spam email files into test/train directories """
    reset_directory(TRAINING_PATH)
    reset_directory(TEST_PATH)
    random.seed(seed)
    
    spam_test_indcs = random.sample([i for i in range(NUM_SPAM)], k=int(NUM_SPAM*percent_test/100))
    ham_test_indcs = random.sample([i for i in range(NUM_HAM)], k=int(NUM_HAM*percent_test/100))
    
    for file in os.listdir(EMAIL_SPAM_PATH):
        _id = int(file.split('.')[0])
        source_path = os.path.join(EMAIL_SPAM_PATH, file)

        if _id in spam_test_indcs:
            destination = os.path.join(TEST_PATH, file + '.spam')
            os.popen(f"cp {source_path} {destination}") 

        else:
            destination = os.path.join(TRAINING_PATH, file + '.spam')
            os.popen(f"cp {source_path} {destination}") 

        
    for file in os.listdir(EMAIL_HAM_PATH):
        _id = int(file.split('.')[0])
        source_path = os.path.join(EMAIL_HAM_PATH, file)

        if _id in ham_test_indcs:
            destination = os.path.join(TEST_PATH, file + '.ham')
            os.popen(f"cp {source_path} {destination}") 

        else:
            destination = os.path.join(TRAINING_PATH, file + '.ham')
            os.popen(f"cp {source_path} {destination}") 




In [49]:
split_spam_train_test()

In [244]:
import nltk
import pandas as pd
import email




class EmailParser():
    def __init__(self, file_path):
        self.y = isSpam(file_path.split('.')[-1])
        self.mail_objc = self.open_mail_objc(file_path)
        self.X = None
        
    
    def isSpam(self, spam_string):
        if spam_string == 'spam':
            return True
        else:
            return False
    
    def open_mail_objc(self, file_path):
        msg_str = open(file_path, encoding='latin1').read()
        return email.message_from_string(msg_str)
        
    def process_mail_component(self, mail_objc):
        
        part_type = mail_objc.get_content_type()
        if part_type in ['text/plain']:
            return mail_objc.get_payload()
        else:
            return None
        
        
    def build_msg_content(self):
        print('hello')
        prcssd_msg = ''
        
        mail_objc = self.mail_objc
        if mail_objc.is_multipart():
            for msg in mail_objc.walk():
                payload = self.process_mail_component(msg)
                if payload:
                    prcssd_msg += payload
    

            return prcssd_msg
        
        else:

            return self.target, self.process_mail_component(mail_objc)
            
    
             



def spam_email_transform(directory):
    print('hi')
    target_tokenset_pairs = []
    
    for file in os.listdir(directory)[0:2]:
        if not file.startswith('.'):
            file_path = os.path.join(directory + file)
            email_parser = EmailParser(file_path)
            y, final_msg_str = email_parser.build_msg_content()
            print(final_msg_str)
            print()






In [None]:
import nltk
import pandas as pd
import email
nltk.download('punkt')
nltk.download('stopwords')
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.tokenize import word_tokenize


def tokenize_text(text: str, remove_stop=True, agg_ints=True, agg_leading_zeros=True, stemmer=True):
        
        text, count = re.subn(r"[^a-zA-Z0-9]", ' ', text) #alphanumeric lowercase


        tokens = word_tokenize(text)
        if agg_ints:
            tokens = [re.sub(r"^[0-9]+$", 'INTEGER', token) for token in tokens]
        if agg_leading_zeros:
            tokens = [re.sub(r"^[0]+.*", 'LEADING_ZERO', token) for token in tokens]
        if remove_stop:
            tokens = [token for token in tokens if token not in stopwords.words("english")]
        if stemmer:
            tokens = [PorterStemmer().stem(w) for w in tokens]
        return tokens

    
def parse_msg_list(msg_list: list):
    full_msg = ''
    for msg in msg_list:
        msg = msg.get_payload()
        if type(msg) == list:
            full_msg += parse_msg_list(msg)
        else:
            full_msg += msg
    return full_msg


    
def create_target_tokenset_pairs(directory: str):
    target_tokenset_pairs = []
    
    for file_path in os.listdir(directory):
        if not file_path.startswith('.'):
            target = file_path.split('.')[-1]
            msg_str = open(directory+file_path, encoding='latin1').read()
            payload = email.message_from_string(msg_str).get_payload()
            if payload == list:
                full_msg = parse_msg_list(payload)
            else:
                full_msg = payload
            try:
                msg_tokens = tokenize_text(full_msg)
            except:
                print(type(full_msg))
                print(full_msg)
                print()
            target_tokenset_pairs.append((target, msg_tokens))
    return target_tokenset_pairs



def create_wordset(list_of_target_tokensets:list):
    return sorted(list(set([token for tokenset in list_of_target_tokensets for token in tokenset])))
    
    


In [None]:
training_target_tokenset_pairs = create_target_tokenset_pairs(cwd+training_path)
test_target_tokenset_pairs = create_target_tokenset_pairs(cwd+test_path)


training_wordset = create_wordset([t[1] for t in training_target_tokenset_pairs])

In [None]:
training_wordset

In [None]:


def calculate_BOW(vocab:list, tokens: list):
    bow_dict = dict.fromkeys(vocab,0)
    for token in tokens:
        bow_dict[token]=tokens.count(token)
    return bow_dict

def create_bow_df(target_token_pairs:list, vocab:list):
    bows = []
    for pair in target_token_pairs[:10]:
        target = pair[0]
        tokens = pair[1]
        bow = calculate_BOW(vocab, tokens)
        if target == 'spam':
            bow['target_isSpam'] = 1
        else:
            bow['target_isSpam'] = 0
        bows.append(bow)
    
    df = pd.DataFrame(bows) 
    
    return df.loc[:,'target_isSpam'], df.drop('target_isSpam', axis=1)

 
        




In [None]:
y_train, X_train = create_bow_df(training_target_tokenset_pairs, vocab)
y_test, X_test = create_bow_df(test_target_tokenset_pairs, vocab)





In [None]:
training_path = '/data/training/processed/'
test_path = '/data/test/processed/' 
create_directory(cwd, training_path)
create_directory(cwd, test_path)


y_train.to_csv(cwd+training_path+'y_train.csv')
X_train.to_csv(cwd+training_path+'X_train.csv')

X_test.to_csv(cwd+test_path+'X_test.csv')
y_test.to_csv(cwd+test_path+'y_test.csv')