In [1]:
import json
import numpy as np
from collections import Counter
from sklearn.metrics import accuracy_score

In [2]:
def get_vocabulary(D):
    
    """
    The vocabulary is a set of tokens which appear more than once in the entire
    document collection plus the "<unk>" token.
    
    Args:
        D (list): A list of documents, where each document is represented as a list of tokens.
        
    Returns:
        vocabulary (set): A set of tokens.
            
    """
    
    # get words to vocabulary if repeated more than once
    min_threshold = 1 
    D = [j for i in D for j in i]
    D_count = Counter(D)
    vocab = {k for k,v in D_count.items() if v > min_threshold}
    vocab.add('unk')
    
    return vocab

In [3]:
class BBoWFeaturizer(object):
    
    def convert_document_to_feature_dictionary(self, doc, vocab):
        
        """
        This function returns a dictionary which maps from the name of the
        feature to the value of that feature.
        
        Args:
        doc (list): A list of tokens.
        vocab (set): A set of tokens.
        
        Returns:
        dictionary (dict): A feature representation.
            
        """

        # assign value of 1 to words present in vocab
        feature_dictionary = {k:1 for k in doc if k in vocab} 
        
        unk_count = len(set(doc).difference(vocab)) # handle unk
        if unk_count > 0:
            feature_dictionary['unk'] = 1
        
        return feature_dictionary

In [4]:
class CBoWFeaturizer(object):
    
    def convert_document_to_feature_dictionary(self, doc, vocab):
        
        """
        This function computes the count bag-of-words feature representation.
        and returns a dictionary which maps from the name of the
        feature to the value of that feature.
        
        Args:
        doc (list): A list of tokens.
        vocab (set): A set of tokens.
        
        Returns:
        dictionary (dict): A feature representation.
        
        """
        
        counter = Counter(doc)
        
        # assign freq to words present in vocab
        feature_dictionary = {k:l for k,l in counter.items() if k in vocab} 
        
        unk_count = len(set(doc).difference(vocab)) # handle unk
        if unk_count > 0:
            feature_dictionary['unk'] = unk_count
        
        return feature_dictionary

In [5]:
def compute_idf(D, vocab):
    
    """
    Where each document is represented as a list of tokens, 
    this function returns the IDF scores for every token in the vocab. 
    The IDFs is represented as a dictionary that 
    maps from the token to the IDF value. 
    If a token is not present in the vocab, it is mapped to "<unk>".
    
    Args:
    D (list): A list of documents.
    vocab (set): A set of tokens.

    Returns:
    idf (dict): IDF value.
    
    """

    D_len = len(D)
    
    idf = {}
    
    for v in vocab:
        idf[v] = 0
    
    idf_unk_temp = 0
    for d in D:
        idf['unk'] += idf_unk_temp  # handle unk
        idf_unk_temp = 0
        for v in set(d): # unique set of tokens in a single doc d
            if v in vocab:
                idf[v] += 1
            if v not in vocab:
                idf_unk_temp = 1
                
    for v in vocab:
        idf[v] = np.log(D_len / idf[v])
    
    return idf
    
class TFIDFFeaturizer(object):
    
    def __init__(self, idf):
        
        """The idf scores computed via `compute_idf`."""
        
        self.idf = idf
    
    def convert_document_to_feature_dictionary(self, doc, vocab):
        
        """
        This function computes the TF-IDF feature representation 
        and return a dictionary which maps from the name of the
        feature to the value of that feature.
        
        Args:
        doc (list): A list of tokens.
        vocab (set): A set of tokens.

        Returns:
        tf_idf (dict): TF-IDF feature.
        
        """
        
        counter = Counter(doc)
        
        # assign freq to words present in vocab
        tf = {k:l for k,l in counter.items() if k in vocab} 
        
        unk_count = len(set(doc).difference(vocab))  # handle unk
        if unk_count > 0:
            tf['unk'] = unk_count
        
        tf_idf = {}
        for v in tf.keys():
            tf_idf[v] = tf[v] * idf[v]
        
        return tf_idf

In [6]:
# Load Dataset
def load_dataset(file_path):
    D = []
    y = []
    with open(file_path, 'r') as f:
        for line in f:
            instance = json.loads(line)
            D.append(instance['document'])
            y.append(instance['label'])
    return D, y

def convert_to_features(D, featurizer, vocab):
    X = []
    for doc in D:
        X.append(featurizer.convert_document_to_feature_dictionary(doc, vocab))
    return X

In [7]:
def train_naive_bayes(X, y, k, vocab):
    
    """
    
    Computes the statistics for the Naive Bayes classifier.
    
    Args:
    X (list): A list of feature representations, where each representation
    is a dictionary that maps from the feature name to the value.
    y (list): A list of integers that represent the labels.
    k (float): A float which is the smoothing parameters.
    vocab (set): A set of vocabulary tokens.
    
    Returns:
        p_y: A dictionary from the label to the corresponding p(y) score
        p_v_y: A nested dictionary where the outer dictionary's key is
            the label and the innner dictionary maps from a feature
            to the probability p(v|y). For example, `p_v_y[1]["hello"]`
            should be p(v="hello"|y=1).
            
    """

    # Calculate prior probability
    y_unique = list(set(y))
    p_y = {}
    for i in y_unique:
        p_y[i] = sum(1 for x in y if x == i)/len(X)
    
    # Calculate conditional probability
    p_v_y = {}
    V_d = len(vocab)
    for j in y_unique:
        p_v_y[j] = {} # create nested dict
        for v in vocab:
            p_v_y[j][v] = 0

        # choose a class
        y_c = [c == j for c in y]  
        # filter X based on chosen class
        x_c = [_x for _x, _y in zip(X, y_c) if _y == True]  

        c = Counter()
        denom = 0
        for d in x_c:
            c.update(d) # calculate numerator
            denom += sum(d.values()) # calculate denominator
        num = dict(c)
        
        # numerator value update
        p_v_y[j] = {i: p_v_y[j].get(i, 0) + num.get(i, 0) for i in set(p_v_y[j]).union(num)} 

        # calculate cond prob by numerator / denominator
        for v in vocab:
            p_v_y[j][v] = (p_v_y[j][v] + k)/(denom + (k * V_d)) 
    
    return p_y, p_v_y

In [8]:
def predict_naive_bayes(D, p_y, p_v_y):
    
    """
    
    Runs the prediction rule for Naive Bayes.
    
    Note that any token which is not in p_v_y is mapped to
    "<unk>". Further, the input dictionaries are probabilities. They
    are converted to log-probabilities while computing
    the Naive Bayes prediction rule to prevent underflow errors.
    
    Args:
    D (list): A list of documents, where each document is a list of tokens.
    p_y (list): Output from `train_naive_bayes`.
    p_v_y (float): Output from `train_naive_bayes`.
    
    Returns:
        predictions: A list of integer labels, one for each document,
            that is the predicted label for each instance.
        confidences: A list of floats, one for each document, that is
            p(y|d) for the corresponding label that is returned.
            
    """

    predictions = []
    confidences = []
    for d in D:
        max_obj = {}
        for y in p_y.keys():
            max_obj[y] = p_y[y]
            for v in d:
                max_obj[y] += np.log(p_v_y[y][v] + 0.00000000000001) # handle 0 log -> -inf
        predictions.append(max(max_obj, key=max_obj.get))
        confidences.append(max(max_obj.values()))
    
    return predictions, confidences

In [9]:
# Variables that are named D_* are lists of documents where each
# document is a list of tokens. y_* is a list of integer class labels.
# X_* is a list of the feature dictionaries for each document.
D_train, y_train = load_dataset('data/imdb/train.jsonl')
D_valid, y_valid = load_dataset('data/imdb/valid.jsonl')
D_test, y_test = load_dataset('data/imdb/test.jsonl')

vocab = get_vocabulary(D_train)

In [10]:
# Training and Prediction

# UNK prep
for i in range(0,len(D_valid)):
    for j in range(0,len(D_valid[i])):
        if D_valid[i][j] not in vocab:
            D_valid[i][j] = 'unk'

# Hyperparameter selection
k = [0.001, 0.01, 0.1, 1.0, 10.0]

print('BBoWFeaturizer...')
best_acc = 0
for k_c in k:
    print('k: ', k_c)
    featurizer = BBoWFeaturizer()
    X_train = convert_to_features(D_train, featurizer, vocab)
    p_y, p_v_y = train_naive_bayes(X_train, y_train, k_c, vocab)
    preds, conf = predict_naive_bayes(D_valid, p_y, p_v_y)
    acc = accuracy_score(preds, y_valid)
    print('validation accuracy for k ' + str(k_c) + ' is ', acc)
    if acc > best_acc:
        BBoW_k = k_c
        best_acc = acc
print('Chosen k for BBoWFeaturizer is ', BBoW_k)   

print('CBoWFeaturizer...')
best_acc = 0
for k_c in k:
    print('k: ', k_c)
    featurizer = CBoWFeaturizer()
    X_train = convert_to_features(D_train, featurizer, vocab)
    p_y, p_v_y = train_naive_bayes(X_train, y_train, k_c, vocab)
    preds, conf = predict_naive_bayes(D_valid, p_y, p_v_y)
    acc = accuracy_score(preds, y_valid)
    print('validation accuracy for k ' + str(k_c) + ' is ', acc)
    if acc > best_acc:
        CBoW_k = k_c
        best_acc = acc
print('Chosen k for CBoWFeaturizer is ', CBoW_k) 

print('TFIDFFeaturizer...')
best_acc = 0
idf = compute_idf(D_train, vocab)
for k_c in k:
    print('k: ', k_c)
    featurizer = TFIDFFeaturizer(idf)
    X_train = convert_to_features(D_train, featurizer, vocab)
    p_y, p_v_y = train_naive_bayes(X_train, y_train, k_c, vocab)
    preds, conf = predict_naive_bayes(D_valid, p_y, p_v_y)
    acc = accuracy_score(preds, y_valid)
    print('validation accuracy for k ' + str(k_c) + ' is ', acc)
    if acc > best_acc:
        TFIDF_k = k_c
        best_acc = acc
print('Chosen k for TFIDFFeaturizer is ', TFIDF_k) 

BBoWFeaturizer...
k:  0.001
validation accuracy for k 0.001 is  0.8584
k:  0.01
validation accuracy for k 0.01 is  0.8648
k:  0.1
validation accuracy for k 0.1 is  0.8672
k:  1.0
validation accuracy for k 1.0 is  0.8612
k:  10.0
validation accuracy for k 10.0 is  0.8624
Chosen k for BBoWFeaturizer is  0.1
CBoWFeaturizer...
k:  0.001
validation accuracy for k 0.001 is  0.8556
k:  0.01
validation accuracy for k 0.01 is  0.8616
k:  0.1
validation accuracy for k 0.1 is  0.8632
k:  1.0
validation accuracy for k 1.0 is  0.8616
k:  10.0
validation accuracy for k 10.0 is  0.8604
Chosen k for CBoWFeaturizer is  0.1
TFIDFFeaturizer...
k:  0.001
validation accuracy for k 0.001 is  0.8308
k:  0.01
validation accuracy for k 0.01 is  0.8348
k:  0.1
validation accuracy for k 0.1 is  0.8368
k:  1.0
validation accuracy for k 1.0 is  0.8352
k:  10.0
validation accuracy for k 10.0 is  0.8384
Chosen k for TFIDFFeaturizer is  10.0


In [11]:
# Train models with chosen parameter k on train & validation data 
# and test the model on test data

# UNK prep
for i in range(0,len(D_test)):
    for j in range(0,len(D_test[i])):
        if D_test[i][j] not in vocab:
            D_test[i][j] = 'unk'
    
# Append train & validation data
D_train = D_train + D_valid
y_train = y_train + y_valid

In [12]:
print('BBoWFeaturizer...')
featurizer = BBoWFeaturizer()
X_train = convert_to_features(D_train, featurizer, vocab)
p_y, p_v_y = train_naive_bayes(X_train, y_train, BBoW_k, vocab)
preds, conf = predict_naive_bayes(D_test, p_y, p_v_y)
acc = accuracy_score(preds, y_test)
print('Test accuracy for BBoWFeaturizer ', acc)

BBoWFeaturizer...
Test accuracy for BBoWFeaturizer  0.8428


In [13]:
print('CBoWFeaturizer...')
featurizer = CBoWFeaturizer()
X_train = convert_to_features(D_train, featurizer, vocab)
p_y, p_v_y = train_naive_bayes(X_train, y_train, CBoW_k, vocab)
preds, conf = predict_naive_bayes(D_test, p_y, p_v_y)
acc = accuracy_score(preds, y_test)
print('Test accuracy for CBoWFeaturizer ', acc)

CBoWFeaturizer...
Test accuracy for CBoWFeaturizer  0.8488


In [14]:
print('TFIDFFeaturizer...')
idf = compute_idf(D_train, vocab)
featurizer = TFIDFFeaturizer(idf)
X_train = convert_to_features(D_train, featurizer, vocab)
p_y, p_v_y = train_naive_bayes(X_train, y_train, TFIDF_k, vocab)
preds, conf = predict_naive_bayes(D_test, p_y, p_v_y)
acc = accuracy_score(preds, y_test)
print('Test accuracy for CBoWFeaturizer ', acc)

TFIDFFeaturizer...
Test accuracy for CBoWFeaturizer  0.812
