# Sentiment Classification with Multinomial Naive Bayes

In [1]:
import os
import numpy as np
import math
from sklearn.model_selection import train_test_split
from collections import Counter

## Data and Features Extraction

In [2]:
data_path = 'datasets/data-tagged/'
classes = [0, 1]

In [13]:
def process_file(filepath):
    """Given a file, returns a list of tokens for that file"""
    x = []
    with open(filepath, 'r') as f:
        for l in f:
            # Filter lines which consist only of new line operator
            if l == '\n':
                continue
            
            token, pos_tagging = l.split('\t')
            x.append(token)
    return x

def preprocess_data(datapath, sentiment='POS'):
    idx = 0
    X = []
    y = []
    sentiment_value = 1 if sentiment == 'POS' else 0
    
    # For file in the folder
    current_path = datapath + sentiment
    for f in os.listdir(current_path):
        x = process_file(current_path + '/' + f)
        X.append(x)
        y.append(sentiment_value)

    return X, y

def get_unigram_dictionary(X, cutoff=1):
    token_counter = Counter(np.concatenate(X))
    idx = 0
    token_to_idx = {}
    
    for x in X:
        for token in x:
            if token_counter[token] >= cutoff and token not in token_to_idx:
                token_to_idx[token] = idx
                idx += 1
                
    return token_to_idx

def get_bigram_dictionary(X, cutoff=1, token_to_idx={}):
    X_bigram = []
    for x in X:
        X_bigram += [(x[i], x[i + 1]) for i, _ in enumerate(x) if i < len(x) - 1 ]

    token_counter = Counter(X_bigram)
    idx = len(token_to_idx)
    
    for x in X:
        x_bigram = [(x[i], x[i + 1]) for i, _ in enumerate(x) if i < len(x) - 1 ]
        for token in x_bigram:
            if token_counter[token] >= cutoff and token not in token_to_idx:
                token_to_idx[token] = idx
                idx += 1
                
    return token_to_idx

def get_dictionary(X, cutoff=1, unigram=True, bigram=False):
    """
    Returns a dictionary which maps each token to its index in the feature space.
    Tokens which appear less times than specified by the cutoff are discarded
    """
    token_to_idx = {}
    if unigram:
        token_to_idx = get_unigram_dictionary(X, cutoff)
    if bigram:
        token_to_idx = get_bigram_dictionary(X, cutoff, token_to_idx)
    
    print("Generated {} features with cutoff of {}".format(len(token_to_idx), cutoff))
                    
    return token_to_idx

def featurize_data(X, token_to_idx):
    """Convert each sample from a list of tokens to a multinomial bag of words representation"""
    X_unigram_and_bigram = []
    for x in X:
        X_unigram_and_bigram.append(x + [(x[i], x[i + 1]) for i, _ in enumerate(x) if i < len(x) - 1 ])
        
    X_feat = []
    for x in X:
        x_feat = np.zeros((len(token_to_idx)))
        for token in x:
            if token in token_to_idx:
                x_feat[token_to_idx[token]] += 1
        X_feat.append(x_feat)
    
    return X_feat

In [14]:
# a = [[('aa', 'bb'), ('aa', 'ccc'), ('aa', 'bb')], [('aa', 'ccc'), ('aa', 'bbb')]]
# b = []

# for l in a:
#     for item in l:
#         b.append(item)
# b

In [15]:
X_pos, y_pos = preprocess_data(data_path, 'POS')
X_neg, y_neg = preprocess_data(data_path, 'NEG')

In [16]:
X_train = X_pos[:900] + X_neg[:900]
y_train = y_pos[:900] + y_neg[:900]

X_test = X_pos[900:] + X_neg[900:]
y_test = y_pos[900:] + y_neg[900:]

print("X_train has size {}".format(np.array(X_train).shape))
print("y_train has size {}".format(np.array(y_train).shape))
print()
print("X_test has size {}".format(np.array(X_test).shape))
print("y_test has size {}".format(np.array(y_test).shape))

X_train has size (1800,)
y_train has size (1800,)

X_test has size (200,)
y_test has size (200,)


In [24]:
X_train = X_pos[:90] + X_neg[:90]
y_train = y_pos[:90] + y_neg[:90]

X_test = X_pos[900:] + X_neg[900:]
y_test = y_pos[900:] + y_neg[900:]

In [25]:
token_to_idx = get_dictionary(X_train, cutoff=1, unigram=False, bigram=True)

X_train = featurize_data(X_train, token_to_idx)
X_test = featurize_data(X_test, token_to_idx)

Generated 74720 features with cutoff of 1


## Multinomial Naive Bayes Model

In [26]:
class MultinomialNaiveBayes():
    def __init__(self, classes, num_feat, smoothing_value=0):
        # Number of features the model uses
        self.num_feat = num_feat
        # List of the classes
        self.classes = classes
        # Dictionary mapping each class to the prior probability p(C=c)
        self.class_to_prior = {c: 0 for c in classes}
        # self.class_to_feature_to_cond_prob[c][x] is used to store the estimate of the conditional probability p(X=x|C=c)
        self.class_to_feature_to_cond_prob = {c: np.zeros((num_feat,)) for c in classes}
        # A smoothing value of 0 is equivalent to no smoothing
        self.smoothing_value = smoothing_value
        
    def fit(self, X, y):
        y = np.array(y)
        X = np.array(X)
        # Computer priors
        for c in y:
            self.class_to_prior[c] += 1
        self.class_to_prior.update({c: self.class_to_prior[c] / len(y) for c in self.classes})
        
        # Compute estimate of the conditional probability p(X=x|C=c)
        for c in self.classes:
            X_c = X[y == c]
            features_frequencies = np.sum(X_c, axis=0)
            self.class_to_feature_to_cond_prob[c] = (features_frequencies + self.smoothing_value) / sum(features_frequencies + self.smoothing_value)
        
    def predict(self, X):
        return np.argmax(np.stack([self.compute_scores(X, c) for c in self.classes], axis=-1), axis=1)
    
    def compute_scores(self, X, c):
        # If smoothing is not applied, some conditional probability will be zero and so we can't take the log of them
        # To solve the problem, we change 0 to a very small number (10 ** -50)
        adjusted_cond_prob = np.array([p if p != 0 else 10 ** -50 for p in self.class_to_feature_to_cond_prob[c]])
        return np.log(self.class_to_prior[c]) + np.matmul(X, np.log(adjusted_cond_prob))

#     def compute_score(self, x, c):
#         # Compute score (unnormalized log probability) for given class
#         return np.log(self.class_to_prior[c]) + np.dot(x, np.log(self.smooth(self.class_to_feature_to_cond_prob[c])))

## Test Model

In [27]:
%%time
model = MultinomialNaiveBayes(classes, len(X_train[0]), smoothing_value=1)

model.fit(X_train, y_train)

CPU times: user 130 ms, sys: 140 ms, total: 270 ms
Wall time: 270 ms


In [28]:
%%time
y_pred = model.predict(X_test)
n_correct = sum(1 for i, _ in enumerate(y_pred) if y_pred[i] == y_test[i])

print("{0:.2f}% of sentences are correctly classified".format(n_correct * 100 / len(X_test)))

50.00% of sentences are correctly classified
CPU times: user 204 ms, sys: 136 ms, total: 339 ms
Wall time: 389 ms


## Comparison with Sklearn

In [29]:
%%time
from sklearn.naive_bayes import MultinomialNB

n_correct = 0

clf = MultinomialNB(alpha=1.0)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)
n_correct = sum(1 for i, _ in enumerate(y_pred) if y_pred[i] == y_test[i])

print("{0:.2f}% of sentences are correctly classified".format(n_correct * 100 / len(X_test)))

50.00% of sentences are correctly classified
CPU times: user 173 ms, sys: 56 ms, total: 229 ms
Wall time: 183 ms
