In [12]:
import os
import sys
import collections
import re
import math
import copy

In [13]:
class Dataset:
    """
    Dataset class for all related training subclasses and variables
    """
    classes = ["ham", "spam"]  # ham = 0 for not spam, spam = 1 for is spam
    train_set = None
    validation_set = None
    test_set = None
    weights = None
    learning_rate = .01  # Natural learning rate constant
    penalty = 0.001  # penalty (lambda) constant
    epochs = 10  # Number of iterations

    def learn_weights(self):
        """
        Learn weights by using gradient ascent
        :return:
        """
        # Adjust each weight...
        for weight_string in self.weights:
            weight_sum = 0.0

            # ...using all training instances
            for key in self.train_set:
                doc = self.train_set[key]
                # y_sample is true y value (classification) of the doc
                y_sample = 0.0

                if doc.true_class == self.classes[1]:
                    y_sample = 1.0

                # Only add to the sum if the doc contains the token (the count of it would be 0 anyways)
                if weight_string in doc.word_bag:
                    probability = self.predict_probability(self.classes[1], doc)
                    weight_sum += float(doc.word_bag[weight_string]) * (y_sample - probability)

            new_weight = self.weights[weight_string] + (
                    (self.learning_rate * weight_sum) - (float(self.penalty) * self.weights[weight_string]))
            # print("Weight for {} is {}, adjusted to {}".format(weight_string, self.weights[weight_string], new_weight))
            self.weights[weight_string] = new_weight

    def train_loop(self):
        """
        # Adjust weights for every epoch
        :return:
        """
        dataset.learn_weights()
        for x in range(0, self.epochs):
            print('Epoch {}/{}'.format(x, self.epochs))
            dataset.learn_weights()

    def predict_probability(self, guess, doc):
        """
        Calculate conditional probability for the specified doc. Where class_prob is 1|X or 0|X
        1 is spam and 0 is ham
        :param guess:
        :param doc:
        :return:
        """

        # Total tokens in doc. Used to normalize word counts to stay within 0 and 1 for avoiding overflow
        weight_sum = self.weights['weight_zero']
        for word in doc.word_bag:
            # print(word)
            if word in self.weights:
                # print(word)
                # sum of weights * token count for each token in document
                count = float(doc.word_bag[word])
                weight_sum += self.weights[word] * count
            # else:
            #     # self.weights[word] = 0.0
            #     print("{} not weighted".format(word))

        result = None
        exponent = math.exp(float(weight_sum))
        if guess == self.classes[0]:
            result = 1.0 / (1.0 + exponent)
        elif guess == self.classes[1]:
            result = exponent / (1.0 + exponent)
        return result

    def apply_logistic_regression(self, doc):
        """
        Apply algorithm to guess class for specific instance of test set
        :return:
        """
        score = dict()
        score[0] = self.predict_probability(self.classes[0], doc)
        score[1] = self.predict_probability(self.classes[1], doc)
        if score[1] > score[0]:
            return self.classes[1]
        else:
            return self.classes[0]

    def logistic_regression(self):
        # Apply algorithm on test set
        correct_guesses = 0.0
        for key in self.test_set:
            doc = self.test_set[key]
            doc.learned_class = (self.apply_logistic_regression(doc))
            if doc.learned_class == doc.true_class:
                correct_guesses += 1.0

        filtered_test_set = filter_doc_set(self.test_set)

        # Apply algorithm on filtered test set
        correct_guesses_filtered = 0.0
        for key in filtered_test_set:
            doc = filtered_test_set[key]
            doc.learned_class = self.apply_logistic_regression(doc)
            if doc.learned_class == doc.true_class:
                correct_guesses_filtered += 1.0

        print("Correct guesses before filtering stop words:\t%d/%s" % (correct_guesses, len(self.test_set)))
        print("Accuracy before filtering stop words:\t\t\t%.4f%%" % (
                100.0 * float(correct_guesses) / float(len(self.test_set))))
        print(
            "Correct guesses after filtering stop words:\t\t%d/%s" % (correct_guesses_filtered, len(filtered_test_set)))
        print("Accuracy after filtering stop words:\t\t\t%.4f%%" % (
                100.0 * float(correct_guesses_filtered) / float(len(filtered_test_set))))

In [14]:
class Document:
    """
    Document class to store email instances easier
    """
    title = None
    # text = None
    word_bag = None
    true_class = None  # spam or ham

    # weights = {'weight_zero': 1.0}  # x0 assumed 1 for all documents (training examples)
    weights = None
    learned_class = None  # spam or ham

    # Constructor
    def __init__(self,
                 title,
                 # text,
                 bag,
                 true_class):
        # self.text = text
        self.word_bag = bag
        self.true_class = true_class
        self.title = title

In [15]:
def bag_of_words(text):
    """
    counts frequency of each word in given text
    order of sequence doesn't matter
    :param text:
    :return:
    """
    bags_of_words = collections.Counter(re.findall(r'\w+', text))
    return dict(bags_of_words)

In [16]:
def make_data_set(directory, true_class, split):
    """
    Read all text files in the given directory and construct the data set, D
    :param directory: the directory path should just be like "train/ham" for example
    :param true_class: True class is the true classification of the email (spam or ham)
    :param split: whether or not to split the dataset into a second validation set
    :return: the dictionary to store the email in
    """
    # if split:
    #     print("splitting dataset")
    #         # dataset.validation_set =
    #     print(int(len(dataset.filtered_training_set) * .7))

    dictionary = dict()
    dictionary_2 = dict()
    file_list = os.listdir(directory)
    total = len(file_list)
    part_1 = int(total * .7)
    part_2 = total - part_1
    # print(part_1)
    # print(part_2)
    count = 0

    for dir_entry in file_list:
        dir_entry_path = os.path.join(directory, dir_entry)
        if os.path.isfile(dir_entry_path):
            with open(dir_entry_path, 'r', errors='ignore') as text_file:
                # stores dictionary of dictionary of dictionary as explained above in the initialization
                key = dir_entry[:4]
                # print(key)
                title = dir_entry
                text = text_file.read()
                bag = bag_of_words(text)
                # print(count)
                if split & (count >= part_1):
                    # print("adding to dict2")
                    dictionary_2.update({key: Document(title, bag, true_class)})
                else:
                    # print("adding to dict1")
                    dictionary.update({key: Document(title, bag, true_class)})
                count += 1
    # print("dataset built!")
    return dictionary, dictionary_2

In [17]:
def set_stop_words():
    """
    Set the stop words
    :return:
    """
    stop_words_dir = 'stop_words.txt'
    with open(stop_words_dir, 'r') as txt:
        stops = (txt.read().splitlines())
    return stops

In [18]:
def remove_stop_words(stops, data_set):
    """
    Remove stop words from data set and store in dictionary
    :param stops:
    :param data_set:
    :return:
    """
    filtered_data_set = copy.deepcopy(data_set)
    for i in stops:
        for j in filtered_data_set:
            if i in filtered_data_set[j].word_bag:
                del filtered_data_set[j].word_bag[i]
    return filtered_data_set

In [19]:
def extract_vocab(data_set):
    """
    Extracts the vocabulary of all the text in a data set from a word_bag
    :param data_set:
    :return: v, a list of vocabulary
    """
    v = []
    for i in data_set:
        for j in data_set[i].word_bag:
            if j not in v:
                v.append(j)
    return v

In [20]:
def filter_doc_set(doc_set):
    # Set the stop words list
    stop_words = set_stop_words()

    # Set up filtered sets without stop words
    filtered_doc_set = remove_stop_words(stop_words, doc_set)
    return filtered_doc_set

In [21]:
def build_data_sets(train_spam_dir, train_ham_dir, test_spam_dir, test_ham_dir):
    """
    takes directories holding the data text files as parameters. "train/ham" for example
    :param classes:
    :param train_spam_dir:
    :param train_ham_dir:
    :param test_spam_dir:
    :param test_ham_dir:
    :return:
    """
    dataset = Dataset()

    # Stores emails as dictionaries. email_file_name : Document (class defined below)
    dataset.train_set = dict()
    dataset.test_set = dict()

    dataset.validation_set = dict()

    # Set up initial data sets. Dictionaries containing the text, word frequencies, and true/learned classifications
    train_set_spam, validate_set_spam = make_data_set(train_spam_dir, dataset.classes[1], True)
    train_set_ham, validate_set_ham = make_data_set(train_ham_dir, dataset.classes[0], True)
    test_set_spam, temp1 = make_data_set(test_spam_dir, dataset.classes[1], False)
    test_set_ham, temp2 = make_data_set(test_ham_dir, dataset.classes[0], False)

    dataset.train_set.update(train_set_spam)
    dataset.train_set.update(train_set_ham)

    dataset.validation_set.update(validate_set_spam)
    dataset.validation_set.update(validate_set_ham)

    dataset.test_set.update(test_set_spam)
    dataset.test_set.update(test_set_ham)

    filtered_training_set = filter_doc_set(dataset.train_set)

    # Extract training set vocabulary/tokens in the training set
    # training_set_vocab = extract_vocab(dataset.train_set)
    filtered_training_set_vocab = extract_vocab(filtered_training_set)

    # build initial weights from filtered vocab
    # store weights as dictionary. w0 initially 0.0, others initially 0.0. token : weight value
    dataset.weights = {'weight_zero': 0.0}
    # filtered_weights = {'weight_zero': 0.0}

    # Set all weights in training set vocabulary to be initially 0.0. w0 ('weight_zero') is initially 0.0
    # for i in training_set_vocab:
    #     weights[i] = 0.0
    for i in filtered_training_set_vocab:
        dataset.weights[i] = 0.0

    return dataset

In [22]:
dataset_roots = ["data/dataset 1/", "data/dataset 2/", "data/dataset 3/"]

for i in range(len(dataset_roots)):
    training_spam_dir = dataset_roots[i] + "train/spam"
    training_ham_dir = dataset_roots[i] + "train/ham"
    test_spam_dir = dataset_roots[i] + "test/spam"
    test_ham_dir = dataset_roots[i] + "test/ham"
    print("Start Dataset {}".format(i + 1))
    dataset = build_data_sets(training_spam_dir, training_ham_dir, test_spam_dir, test_ham_dir)
    # dataset.split_training_set()
    # dataset.train_loop()
    # dataset.logistic_regression()


Start Dataset 1
Start Dataset 2
Start Dataset 3


In [23]:
def test():
    """
    Test/debugging
    :return:
    """
    dataset_root = "data/dataset 1/"
    training_spam_dir = dataset_root + "train/spam"
    training_ham_dir = dataset_root + "train/ham"
    test_spam_dir = dataset_root + "test/spam"
    test_ham_dir = dataset_root + "test/ham"

    temp_bag = bag_of_words("hello world it is i Achilles")

    temp_path = "data/dataset 1/train/spam"
    temp_class = "spam"
    temp_dataset = make_data_set(temp_path, temp_class)

    set_stop_words()

    temp_path = "data/dataset 1/train/spam"
    temp_class = "spam"
    remove_stop_words(set_stop_words(), temp_dataset)

    extract_vocab(temp_dataset)

    temp_training_spam_dir = "data/dataset 1/train/spam"
    temp_training_ham_dir = "data/dataset 1/train/ham"
    temp_test_spam_dir = "data/dataset 1/test/spam"
    temp_test_ham_dir = "data/dataset 1/test/ham"
    temp_datasets = build_data_sets(temp_training_spam_dir, temp_training_ham_dir, temp_test_spam_dir,
                                    temp_test_ham_dir)

    temp_doc = temp_datasets.train_set["0100"]
    temp_guess = "spam"
    temp_datasets.predict_probability(temp_guess, temp_doc)

    temp_datasets.learn_weights()

    temp_datasets.logistic_regression()

    # learn_weights(temp_datasets, 10)