In [1]:
import numpy as np
import codecs
import string

In [2]:
def test_train_split(X, y):
    train = np.array([], dtype=np.int64)
    test = np.array([], dtype=np.int64)
    for cl in np.unique(y):
        class_indeces = np.where(y == cl)[0]
        train = np.append(train, class_indeces[:int(0.8*class_indeces.shape[0])])
        test = np.append(test, class_indeces[int(0.8*class_indeces.shape[0]):])
        
    return X[train], y[train], X[test], y[test]

In [3]:
def get_words(X):
    punctuation = string.punctuation+'…'+'“'+'–'+'‘'+'£'+'»'
    X = np.core.chararray.lower(X)
    X = np.core.chararray.translate(X, str.maketrans(punctuation, ' '*len(punctuation))) # remove punctuation
    X = np.core.chararray.translate(X, str.maketrans(string.digits, ' '*len(string.digits))) # remove digits
    return np.core.chararray.split(X)

In [4]:
def vectorize(X):
    dictionary = {}
    
    pure_words = get_words(X)
    tmp = np.unique(np.array([word for message in pure_words for word in message]))
    
    for i, word in enumerate(tmp):
        dictionary[word] = i
        
    X_hashed = np.array([[dictionary[word] for word in message] for message in pure_words])
    
    vectorization = np.zeros((len(X), len(dictionary)), dtype=np.int64)
    for i, message in enumerate(X_hashed):
        for word in message:
            vectorization[i, word] = vectorization[i, word] + 1
            
    return vectorization, dictionary

In [5]:
y, X = np.core.chararray.decode(np.loadtxt(
        codecs.open('spam', encoding='latin1'), dtype=np.bytes_, delimiter='\t', unpack=True))

In [6]:
X_train, y_train, X_test, y_test = test_train_split(X, y)

In [7]:
class NaiveBayes:
    def __init__(self, alpha):
        self.alpha = alpha
        self.classes_prior_prob = np.array([], dtype=np.float64)
        
    def fit(self, X, y):
        
        self.classes = np.unique(y)
        
        vectorization, self.dictionary = vectorize(X)
        self.dict_size = vectorization.shape[1]
        self.classes_words_count = np.zeros(len(self.classes), dtype=np.float64)
        self.words_cond_prob = np.zeros((len(self.classes), self.dict_size), dtype=np.float64)
        
        for i, cl in enumerate(self.classes):
            indeces = y == cl
            self.classes_prior_prob = np.append(self.classes_prior_prob, np.sum(indeces)/len(y))
            self.classes_words_count[i] = np.sum(vectorization[indeces])
            
            for j in range(self.dict_size):
                self.words_cond_prob[i,j] = np.sum(vectorization[indeces,j])
            
        
        self.words_cond_prob = np.concatenate((self.words_cond_prob, np.array([[0,0]]).T), axis=1)
        self.words_cond_prob = self.words_cond_prob + self.alpha
        for i, _ in enumerate(self.classes):
            self.words_cond_prob[i] = self.words_cond_prob[i] / (self.classes_words_count[i] + self.alpha*self.dict_size)
    
    def predict(self, X):
        result = []
        for message in get_words(X):
            ind = np.array([self.dictionary[word] if word in self.dictionary else self.dict_size for word in message])
            if len(ind) == 0:
                result.append(self.classes[np.argmax(np.log(self.classes_prior_prob))])
            else:
                result.append(self.classes[np.argmax(np.log(self.classes_prior_prob) 
                                                 + np.sum(np.log(self.words_cond_prob[:,ind]), axis=1))])
        return result
    
    def score(self, X, y):
        return np.sum(self.predict(X) == y)/len(y)        

In [8]:
nb = NaiveBayes(1)

In [9]:
nb.fit(X_train, y_train)

In [10]:
nb.score(X_test, y_test)

0.97670250896057342

In [11]:
nb.score(X_train, y_train)

0.99147599820547327