In [2]:
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
import os

In [3]:
# Download the set of stopwords from nltk
nltk.download("stopwords", "E:")

[nltk_data] Downloading package stopwords to E:...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
SPAM_FOLDER = r"./spam/"
HAM_FOLDER = r"./ham/"

In [5]:
spam = []
ham = []
stopwords_list = stopwords.words("english")
bag_of_words = []

def tokenize(text, category=None):
    """Function that takes in a line of text and returns tokenized representation."""
    words = re.findall("[a-zA-Z']+", text)
    words = [w.lower() for w in words]
    token = []
    for word in words:
        if len(word) <= 2:
            continue
        if word not in stopwords_list:
            if word not in bag_of_words:
                bag_of_words.append(word)
            token.append(word)
    if category:
        category.append(token)
    else:
        return token

def read_files():
    for is_spam in (True, False):
        parent_path = [HAM_FOLDER, SPAM_FOLDER][is_spam]
        category = [ham, spam][is_spam]
        for file_path in os.listdir(parent_path):
            with open(parent_path + file_path) as f:
                text = "".join(f.readlines()[2:])
                tokenize(text, category)

read_files()
bag_of_words.sort()

In [6]:
# Vectorize training data
from collections import Counter

n_document = len(spam) + len(ham)
n_feature = len(bag_of_words)
n_spam = len(spam)
n_ham = len(ham)

y = np.array([0 for _ in range(n_ham)] + [1 for _ in range(n_spam)])
X = np.zeros((n_document, n_feature))

for i in range(n_document):
    if i < n_ham:
        count = Counter(ham[i])
    else:
        count = Counter(spam[i - n_ham])
    for j in range(n_feature):
        this_word = bag_of_words[j]
        X[i, j] = count[this_word] if this_word in count else 0

In [7]:
X.shape == (n_document, n_feature)

True

In [8]:
X.shape[0] == y.size

True

In [18]:
class MultinomialNaiveBayes:
    @classmethod
    def train_test_split(X, y, ratio_of_train=0.8):
        df = np.append(X, y.reshape((X.shape[0], 1)), axis=1)
        
    def fit(self, X, y):
        # concat y to X so that we can reference by class.
        X = np.append(X, y.reshape((X.shape[0], 1)), axis=1)
        
        # our priori is the proportion of spam.
        self.priori = n_spam / n_document
        
        # then, we need to calculate P(word | class) for each word.
        # here, we select the words only
        words_given_spam = X[X[:,-1] == 1,:-1]
        words_given_ham = X[X[:,-1] == 0,:-1]
        
        # calculate P(word | class) = count(word in class) / count(words in class) plus Laplace smoothing.
        self.p_words_given_spam = (np.sum(words_given_spam, axis=0) + 1) / (np.sum(words_given_spam) + n_feature)
        self.p_words_given_ham = (np.sum(words_given_ham, axis=0) + 1) / (np.sum(words_given_ham) + n_feature)
    
    def predict(self, text):
        # calculate for the likelihood of X given the class of spam/ham.
        words = [w for w in re.findall("[a-zA-Z']+", text.lower()) if w not in stopwords_list]
        count = Counter(words)
        X = np.zeros(n_feature)
        for i in range(n_feature):
            if bag_of_words[i] in count:
                X[i] = count[bag_of_words[i]]
                
        # here, we multiply P(x_i | c) together
        p_words_given_spam = np.product(np.power(self.p_words_given_spam.astype("float64"), X)) * self.priori
        p_words_given_ham = np.product(np.power(self.p_words_given_ham.astype("float64"), X)) * (1 - self.priori)
        
        if p_words_given_spam > p_words_given_ham:
            return "spam"
        else:
            return "ham"
    
    def confusion_matrix(self, X, y):
        pass

In [19]:
nn = MultinomialNaiveBayes()

In [20]:
nn.fit(X, y)

In [47]:
TEST_HAM_PATH = "./test_ham/"
TEST_SPAM_PATH = "./test_spam/"

test_spam_res = []
test_ham_res = []

for path in os.listdir(TEST_SPAM_PATH):
    file_path = TEST_SPAM_PATH + path
    with open(file_path, "r") as f:
        test_spam_res.append(nn.predict("".join(f.readlines()[2:])))

for path in os.listdir(TEST_HAM_PATH):
    file_path = TEST_HAM_PATH + path
    with open(file_path, "r") as f:
        test_ham_res.append(nn.predict("".join(f.readlines()[2:])))

In [51]:
count_spam = Counter(test_spam_res)
count_ham = Counter(test_ham_res)

In [52]:
count_spam

Counter({'spam': 45, 'ham': 36})

In [53]:
count_ham

Counter({'ham': 1995, 'spam': 17})

In [63]:
# The score of our naive-bayes
fpr, tpr, tnr, fnr = 36 / (45 + 36), 45 / (45 + 36), 1995 / (1995 + 17), 17 / (1995 + 17)
print(f"TNR({tnr:.2f})    FPR({fpr:.2f})\nFNR({fnr:.2f})    TPR({tpr:.2f})")

TNR(0.99)    FPR(0.44)
FNR(0.01)    TPR(0.56)


In [55]:
# The following is using model provided by sklearn
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import confusion_matrix

In [58]:
# Instantiate a nn model with Laplace smoothing, learning class priori
nn = MultinomialNB(alpha=1, fit_prior=True)
nn = nn.fit(X, y)

In [60]:
# Score of the training set
nn.score(X, y)

0.99625

In [72]:
# Prepare testing set
testing_words = []
testing_labels = []

for path in os.listdir(TEST_SPAM_PATH):
    file_path = TEST_SPAM_PATH + path
    with open(file_path, "r") as f:
        testing_words.append([w for w in re.findall("[a-zA-Z']+", "".join(f.readlines()[2:]).lower()) if w not in stopwords_list])
        testing_labels.append(1)

for path in os.listdir(TEST_HAM_PATH):
    file_path = TEST_HAM_PATH + path
    with open(file_path, "r") as f:
        testing_words.append([w for w in re.findall("[a-zA-Z']+", "".join(f.readlines()[2:]).lower()) if w not in stopwords_list])
        testing_labels.append(0)

testing_X = np.zeros((len(testing_words), n_feature))
testing_y = np.array(testing_labels)

for i in range(len(testing_words)):
    count = Counter(testing_words[i])
    for j in range(n_feature):
        if bag_of_words[j] in count:
            testing_X[i,j] = count[bag_of_words[j]]

In [76]:
# Show some statistics of the model
testing_y_hat = nn.predict(testing_X)
confusion_matrix(testing_y, testing_y_hat)

array([[1990,   22],
       [   2,   79]], dtype=int64)