In [1]:
import numpy as np
from sklearn.metrics import classification_report
# from load_data import *

import pandas as pd
from sklearn.model_selection import train_test_split

import collections

In [2]:
class SnipsDataLoader():
    def __init__(self, train_path, valid_path=None, test_path=None):
        self.train_data = self.load_dataset(train_path)
        self.valid_data = self.load_dataset(valid_path) if valid_path != None else None
        self.test_data = self.load_dataset(test_path) if test_path != None else None
        
        self.create_label_mapping()
        
    def load_dataset(self, data_path):
        data = pd.read_csv(data_path, header=None, delimiter='\t')
        data.columns = ['labels', 'texts']
        output = {'X': data['texts'], 'y': data['labels']}
        return output
        
    def create_label_mapping(self):
        self.text_to_index_label_mapping = {}
        self.index_to_text_label_mapping = {}
        for i, label in enumerate(self.train_data['y'].unique()):
            self.text_to_index_label_mapping[label] = i
            self.index_to_text_label_mapping[i] = label
        
        self.train_data['y'] = \
            self.train_data['y'].map(lambda x: self.text_to_index_label_mapping[x])
        if self.valid_data:
            self.valid_data['y'] = \
                self.valid_data['y'].map(lambda x: self.text_to_index_label_mapping[x])
        if self.test_data:
            self.test_data['y'] = \
                self.test_data['y'].map(lambda x: self.text_to_index_label_mapping[x])
    
    def split_train_valid(self, valid_size, keep_class_ratios=True, random_state=0):
        X, y = self.train_data['X'], self.train_data['y']
        if keep_class_ratios:
            X_train, X_valid, y_train, y_valid = \
                train_test_split(X, y, test_size=valid_size, random_state=random_state, stratify=y)
        else:
            X_train, X_valid, y_train, y_valid = \
                train_test_split(X, y, test_size=valid_size, random_state=random_state)
            
        self.train_data = {'X': X_train, 'y': y_train}
        self.valid_data = {'X': X_valid, 'y': y_valid}
    
    def get_train_data(self):
        return list(self.train_data['X']), self.train_data['y'].to_numpy()
    
    def get_valid_data(self):
        return list(self.valid_data['X']), self.valid_data['y'].to_numpy()
    
    def get_test_data(self):
        return list(self.test_data['X']), self.test_data['y'].to_numpy()
    
    
    
class FeatureExtractor():
    def __init__(self, X_train, X_valid=None, X_test=None):
        self.X_train = X_train
        self.X_valid = X_valid
        self.X_test = X_test
    
    def extract_features(self, keep_words_threshold=5):
        self.keep_words_threshold = keep_words_threshold
        
        self.X_train = self.preprocess_data(self.X_train)
        if self.X_valid:
            self.X_valid = self.preprocess_data(self.X_valid)
        if self.X_test:
            self.X_test = self.preprocess_data(self.X_test)
        
        self.create_vocab(self.X_train)
        
        self.X_train = self.create_encodings(self.X_train)
        if self.X_valid:
            self.X_valid = self.create_encodings(self.X_valid)
        if self.X_test:
            self.X_test = self.create_encodings(self.X_test)
    
    def preprocess_data(self, text_data):
        output = []
        for example in text_data:
            words = [word.lower() for word in example.split()]
            output.append(words)
        return output
    
    def create_vocab(self, text_data):
        word_occurences = collections.defaultdict(int)
        for example in text_data:
            word_counts = self.get_word_counts(example)
            for word in word_counts.keys():
                word_occurences[word] += 1
        
        vocab_words = [word for word in sorted(word_occurences.keys()) 
                       if word_occurences[word] >= self.keep_words_threshold]
        self.vocab = {word: index for index, word in enumerate(vocab_words)}
        self.vocab_size = len(self.vocab)
        
    def create_encodings(self, text_data):
        num_examples = len(text_data)
        encodings = np.zeros((num_examples, self.vocab_size))
        
        for row, example in enumerate(text_data):
            word_counts = self.get_word_counts(example)
            for word, count in word_counts.items():
                if word in self.vocab:
                    col = self.vocab[word]
                    encodings[row, col] = 1
                    
        return encodings
                    
    def get_word_counts(self, word_list):
        counts = collections.defaultdict(int)
        for word in word_list:
            counts[word] += 1
        return counts
    
    def get_train_encodings(self):
        return self.X_train
    
    def get_valid_encodings(self):
        return self.X_valid
    
    def get_test_encodings(self):
        return self.X_test

In [3]:
TRAIN_PATH = './snips/snips_train_actual.csv'
TEST_PATH = './snips/snips_test_actual.csv'

In [4]:
data_loader = SnipsDataLoader(TRAIN_PATH, None, TEST_PATH)
data_loader.split_train_valid(valid_size=0.05, keep_class_ratios=True)

In [5]:
X_train, y_train = data_loader.get_train_data()
X_valid, y_valid = data_loader.get_valid_data()

In [6]:
feature_extractor = FeatureExtractor(X_train, X_valid)
feature_extractor.extract_features(keep_words_threshold=5)
X_train = feature_extractor.get_train_encodings()
X_valid = feature_extractor.get_valid_encodings()

In [16]:
class BernoulliNaiveBayes():
    def __init__(self):
        pass
    
    def fit(self, X, y):
        num_examples, vocab_size = X.shape
        num_labels = np.amax(y) + 1
        y_one_hot = np.eye(num_labels)[y]
        
        self.vocab_probs_1 = (1 + np.dot(X.T, y_one_hot)) / (2 + np.sum(y_one_hot, axis=0))
        self.vocab_probs_0 = 1 - self.vocab_probs_1
        self.prior_probs = np.mean(y_one_hot, axis=0)
        self.vocab_log_probs_1 = np.log(self.vocab_probs_1)
        self.vocab_log_probs_0 = np.log(self.vocab_probs_0)
        self.prior_log_probs = np.log(self.prior_probs)
    
    def predict(self, X):
        post_probs = np.dot(X, self.vocab_log_probs_1) + np.dot(1 - X, self.vocab_log_probs_0) + self.prior_log_probs
        predictions = np.argmax(post_probs, axis=1)
        return predictions

In [17]:
def calculate_accuracy(predictions, targets):
    return np.mean(predictions == targets)

In [18]:
model = BernoulliNaiveBayes()
model.fit(X_train, y_train)
y_predict = model.predict(X_valid)
calculate_accuracy(y_predict, y_valid)

0.9782608695652174

In [19]:
print(classification_report(y_valid, y_predict))

              precision    recall  f1-score   support

           0       0.99      1.00      0.99        97
           1       0.99      1.00      0.99        99
           2       1.00      1.00      1.00       100
           3       1.00      1.00      1.00        98
           4       0.92      0.94      0.93        98
           5       0.99      0.93      0.96        98
           6       0.96      0.98      0.97       100

    accuracy                           0.98       690
   macro avg       0.98      0.98      0.98       690
weighted avg       0.98      0.98      0.98       690



In [15]:
data_loader.index_to_text_label_mapping

{0: 'AddToPlaylist',
 1: 'BookRestaurant',
 2: 'GetWeather',
 3: 'RateBook',
 4: 'SearchCreativeWork',
 5: 'SearchScreeningEvent',
 6: 'PlayMusic'}