In [116]:
from __future__ import division, print_function
import numpy as np
import math

class NaiveBayes():
    """The Gaussian Naive Bayes classifier. """
        
    def fit(self, X, y):
        self.X, self.y = X, y
        self.classes = np.unique(y)
        self.parameters = []
        # Calculate the mean and variance of each feature for each class
        for i, c in enumerate(self.classes):
            X_c = X[np.where(y == c)]
            self.parameters.append([])
            for col in X_c.T:
                parameters = {"mean": col.mean(), "var": col.var()}
                self.parameters[i].append(parameters)

    def _calculate_likelihood(self, mean, var, x):
        coeff = 1.0 / math.sqrt(2.0 * math.pi * var + 1e-4)
        exponent = math.exp(-(math.pow(x - mean, 2) / (2 * var + 1e-4)))
        return coeff * exponent
    
    def _classify(self, sample):
        """ 
        Classification using Bayes Rule P(Y|X) = P(X|Y)*P(Y)/P(X),
        P(X|Y) = P(x1,x2,x3|Y) = P(x1|Y)*P(x2|Y)*P(x3|Y)
        assume P(X) = 1
        """
        posteriors = []
        for i, c in enumerate(self.classes):
            X_c = self.X[np.where(self.y == c)] 
            posterior = len(X_c)/len(self.X) # P(Y)
            for feature_value, params in zip(sample, self.parameters[i]):
                likelihood = self._calculate_likelihood(params["mean"], params["var"], feature_value)
                posterior *= likelihood   # P(Y|X)
            posteriors.append(posterior)
        return self.classes[np.argmax(posteriors)]

    def predict(self, X_test):
        y_pred = [self._classify(sample) for sample in X_test]
        return y_pred

In [118]:
X = np.array([[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7]]).T
y = np.array([0, 0, 1, 1, 2, 2])
X_t=np.array([[7,8], [0,1]])
NB = NaiveBayes()
NB.fit(X, y)
NB.predict(X_t)

[2, 0]

In [None]:
import re
import collections
import numpy as np
import math

def tokenize(message):
    message = message.lower()
    all_words = re.findall("[a-z0-9']+", message)
    return set(all_words)

"""training set consists of pairs (message, is_spam)"""
def count_words(training_set):
    counts = collections.defaultdict(lambda: [0, 0])
    for message, is_spam in training_set:
        for word in tokenize(message):
            counts[word][0 if is_spam else 1] += 1
    return counts

"""turn the word_counts into a list of triplets - w, p(w | spam) and p(w | ~spam)"""
def word_probabilities(counts, total_spams, total_non_spams, k=0.5):
    return [(w, (spam + k) / (total_spams + 2 * k), (non_spam + k) / (total_non_spams + 2 * k)) 
            for w, (spam, non_spam) in counts.items()]

def spam_probability(word_probs, message):
    message_words = tokenize(message)
    log_prob_if_spam = log_prob_if_not_spam = 0.0
    for word, prob_if_spam, prob_if_not_spam in word_probs:
        if word in message_words:
            log_prob_if_spam += math.log(prob_if_spam)
            log_prob_if_not_spam += math.log(prob_if_not_spam)
        else:
            log_prob_if_spam += math.log(1.0 - prob_if_spam)
            log_prob_if_not_spam += math.log(1.0 - prob_if_not_spam)
                
    prob_if_spam = math.exp(log_prob_if_spam)
    prob_if_not_spam = math.exp(log_prob_if_not_spam)
    return prob_if_spam / (prob_if_spam + prob_if_not_spam)

class NaiveBayesClassifier:
    def __init__(self, k=0.5):
        self.k = k
        self.word_probs = []
    def train(self, training_set):
        num_spams = len([is_spam
                            for message, is_spam in training_set
                            if is_spam])
        num_non_spams = len(training_set) - num_spams
        
        word_counts = count_words(training_set)
        self.word_probs = word_probabilities(word_counts,
                                                num_spams,
                                                num_non_spams,
                                                self.k)
    def classify(self, message):
        return spam_probability(self.word_probs, message)


In [None]:
from sklearn.model_selection import train_test_split
import pickle
import numpy as np

authors_file = "data/email_authors.pkl"
authors_file_handler = open(authors_file, "rb")
authors = pickle.load(authors_file_handler)
authors_file_handler.close()

words_file = "data/word_data.pkl"
words_file_handler = open(words_file, "rb")
word_data = pickle.load(words_file_handler)
words_file_handler.close()

features_train, features_test, labels_train, labels_test = train_test_split(word_data, authors, test_size=0.1, random_state=42)

train_data = list(zip(features_train, labels_train))
test_data = list(zip(features_test, labels_test))

In [None]:
classifier = NaiveBayesClassifier()
classifier.train(train_data[:100])

classified = [(subject, is_spam, classifier.classify(subject)) for subject, is_spam in test_data[:100]]

In [None]:
from numpy import linalg as LA

text = features_train[:30]

def vector_counts(text):
    n_doc = len(text)
    counts = collections.defaultdict(lambda: [0]*n_doc)
    for n,m in enumerate(text):
        for word in tokenize(m):
                counts[word][n] += 1
    mx = np.array([k for k in counts.values()]).T
    return mx

def Tfidf_transformer(vector):
    n_d = vector.shape[0]
    df_t = np.count_nonzero(vector, axis=0)
    idf_t = np.log(n_d/df_t) + 1
    tf_idf = vector * idf_t[None,:]
    tf_idf = np.divide(tf_idf, LA.norm(tf_idf, axis=1)[:,None])
    return tf_idf
    
Tfidf_transformer(vector_counts(text))