In [1]:
#!/usr/bin/env python
# coding: utf-8
#===============================================================================
#
#           FILE: bigramMNB_4_ntb.py 
#         AUTHOR: Bianca Ciobanica
#	       EMAIL: bianca.ciobanica@student.uclouvain.be
#
#           BUGS: 
#        VERSION: 3.10.6
#        CREATED: 25-10-2023 
#
#===============================================================================
#    DESCRIPTION: sources used for this code : 
#               https://medium.com/@johnm.kovachi/implementing-a-multinomial-naive-bayes-classifier-from-scratch-with-python-e70de6a3b92e
#               https://pandas.pydata.org/docs/getting_started/intro_tutorials/03_subset_data.html
#               https://datasciencedojo.com/blog/naive-bayes-from-scratch-part-1/#
# 
#    
#          USAGE: 
#===============================================================================

In [2]:
import pandas as pd
from nltk.tokenize import WordPunctTokenizer
from nltk.lm import Vocabulary
from nltk.lm.preprocessing import pad_both_ends
from math import log, log2
from nltk.util import ngrams
from collections import Counter

In [3]:
def remove_html_tags(text):
  import re
  clean = re.compile('<.*?>')
  return re.sub(clean, '', text)

In [4]:
def map_tokens_to_UNK(tokens,v):
    return [token if token in v else '<UNK>' for token in tokens]

In [5]:
def create_vocabulary(df):
        tokenized_corpus = [token 
                            for row in df
                            for token in row] # flattened corpus
      
        return Vocabulary(tokenized_corpus, unk_cutoff=unk_threshold)

In [6]:
# initialize training corpus
df_corpus = pd.read_csv("data/train.csv")
unk_threshold = 3

# preprocess 
df_corpus['Body'] = df_corpus['Body'].apply(lambda x: remove_html_tags(x))
df_corpus['Body_tokenized'] = df_corpus['Body'].apply(lambda x: WordPunctTokenizer().tokenize(x))

# add_padding
n_order = 2
df_corpus['Body_tokenized_padded'] = df_corpus['Body_tokenized'].apply(lambda x: list(pad_both_ends(x, n=n_order)))

# restricted voc 
training_padded_v = create_vocabulary(df_corpus['Body_tokenized_padded'])

In [7]:
# map UNK words
df_corpus['Body_tokenized_padded_UNK'] = df_corpus['Body_tokenized_padded'].apply(lambda x: map_tokens_to_UNK(x, training_padded_v))

# add bigrams
df_corpus['Body_padded_bigrams'] = [list(ngrams(tokens, n_order))
                                       for tokens in df_corpus['Body_tokenized_padded_UNK']
                                      ]

df_bigrams_flattened = [bigram for row in df_corpus['Body_padded_bigrams'] for bigram in row] # with restricted voc
df_tokens_flattened = [t for row in df_corpus['Body_tokenized_padded_UNK'] for t in row] # with restricted voc

df_bigrams_counter = Counter(df_bigrams_flattened)
df_unigrams_counter = Counter(df_tokens_flattened)

In [8]:
print(df_corpus.keys())
print(df_corpus.shape)

Index(['Body', 'Y', 'Body_tokenized', 'Body_tokenized_padded',
       'Body_tokenized_padded_UNK', 'Body_padded_bigrams'],
      dtype='object')
(14000, 6)


In [9]:
def test_vocab_on_bigrams():
    # asked chatGPT to create this list of bigrams for my test 
    v = create_vocabulary([
            [('when', 'you'), ('you', 'are'), ('are', 'happy')],
            [('i', 'love'), ('love', 'programming'), ('programming', 'so'), ('so', 'much')],
            [('this', 'is'), ('is', 'a'), ('a', 'test')],
            [('when', 'you'), ('you', 'are'), ('are', 'sad')],
        ])
    return v
#testv = test_vocab_on_bigrams()
#print(testv[('you', 'are')])

In [10]:
classes_counts = len(df_corpus['Y'])
classes = df_corpus['Y'].unique().tolist()
#print(classes)

In [11]:
# P(class | doc) = (P(doc | class) * P(class)) / P(doc)
# p(  x   |  y ) = (P( y  |  x   ) * P( x ) ) / P( y )

In [12]:
class NaiveBayesClassifier:
    # initally created a NB model which i would then initialize per class
    # But then realized i should create a model which iterates through each class as shown in SLP
    
    def __init__(self, docs, classes, alpha=0, voc = None):
        self.bigdoc = docs
        self.classes = classes
        self.alpha = alpha
        self.bigdoc_v = voc # Counter object
        self.bigdoc_v_size = self.bigdoc_v.total()
        
        self.bow = {}
        self.logprior = {}
        self.logll = {}
       
    def train(self):
        if self.bigdoc_v is None or len(self.bigdoc_v) == 0:
            raise ValueError("Cannot train on an empty vocabulary.")
        
        total_doc_counts = len(self.bigdoc)
        # go through each class
        for label in self.classes:
            # get features from class
            class_docs = self.bigdoc[self.bigdoc['Y'] == label]
            class_counts = len(class_docs)
            
            # get P(c)
            self.logprior[label] = log(class_counts / total_doc_counts)
            
            # generate bag of words
            self.bow[label] = self.extract_features(class_docs)
            logll = 0
            
            # calculate log likelihood
            class_bow = self.bow[label]
            total_class_tokens = class_bowl.total()
            self.logll[label] = {}
            
            for bigram in df_bigrams_flattened:
                bigram_prob = class_bow.get(bigram,0)
                self.logll[label][token] = log(( bigram_prob + self.alpha) / (total_class_tokens + self.bigdoc_v_size * self.alpha))

    def check_consistency(self):
        probs = {}
        for label in self.classes:
            p = 0
            for token in self.probability[label]:
                p += self.probability[label][token]
            probs[label] = p
            
        print(probs)
        
        is_consistent = False
        p_class1, p_class2 = probs.values()
        if round(p_class1) == 1 and round (p_class2) == 1:
            is_consistent = True
        return "Is consistent : " + str(is_consistent)
        
    def test_model(self, test_doc):
        sums = {} # {"class" : logprior}
        #C_NB = argmax (logprior + sum logll)
        for label in self.classes:
            v = self.bow[label]
            sums[label] = self.logprior[label]
            # go through each word
            for token in test_doc:
                if token not in v:
                    continue
                sums[label] = sums[label] + self.logll[label][token]
        
        # get class with highest score
        argmax = max(sums, key=sums.get)
        return argmax

    def test_accuracy(self, data=None, data_type=None):
        if data is None or data.empty:
            raise ValueError("Cannot train on an empty corpus.")
            
        correct_predictions = 0;
        total_predictions = 0;
    
        for index, row in data.iterrows(): # used GPT to help me find how to iter on rows in my df
            true_label = row['Y']
            predicted_label = self.test_model(row[data_type])
            #print(true_label, "< true =?= predicted >", predicted_label)
    
            total_predictions += 1
            if predicted_label == true_label:
                correct_predictions += 1
    
        accuracy = correct_predictions / total_predictions
        return round(accuracy * 100, 3)
            
    def __repr__(self):
        s = ""
        s += "total docs    = " + str(len(self.bigdoc)) + "\n"
        s += "labels        = " + str(self.classes)+ "\n"
        s+= "_____________________\n"

        for label in self.classes:
            s += str(label) + "\n"
            s += "P(c)          = " + str(self.logprior[label]) + "\n"
           # s += "Logll         = " + str(self.logll[label]) + "\n"
            s+= "_____________________\n"
        return s 
        

In [23]:
class BigramMultinomialNB(NaiveBayesClassifier):
    def __init__(self, docs, classes, alpha = 0, voc = None):
        
        super().__init__(docs,classes, alpha = alpha, voc = voc)

    def extract_features (self, docs):
        """ creates bigram counts per class
        """
        flattened_bigrams = [bigram 
                          for row in docs['Body_padded_bigrams']
                          for bigram in row]
        
        flattened_unigrams = [token 
                          for row in docs['Body_tokenized_padded_UNK']
                          for token in row]
        
        bigram_counts = Counter(flattened_bigrams)
        unigram_counts = Counter(flattened_unigrams)
        
        return bigram_counts, unigram_counts

    def train(self):
        if self.bigdoc_v is None or len(self.bigdoc_v) == 0:
            raise ValueError("Cannot train on an empty vocabulary.")
        
        total_doc_counts = len(self.bigdoc)
        # go through each class
        for label in self.classes:
            # get features from class
            class_docs = self.bigdoc[self.bigdoc['Y'] == label]
            class_counts = len(class_docs)
            
            # get P(c)
            self.logprior[label] = log(class_counts / total_doc_counts)
            
            # generate bag of words
            self.bow[label] = {}
            class_bigrams, class_unigrams = self.extract_features(class_docs)

            self.bow[label]['bigrams'] = class_bigrams
            self.bow[label]['unigrams'] = class_unigrams
            logll = 0
            
            # calculate log likelihood
            self.logll[label] = {}

            for bigram in df_bigrams_flattened:
                context = bigram[0]
                prob_bigram = class_bigrams.get(bigram,0.0)
                #print(bigram, bigram_prob)
                self.logll[label][bigram] = log((prob_bigram + self.alpha) / (class_unigrams[context] + self.bigdoc_v[context] * self.alpha))

    def test_model(self, test_doc):
        sums = {} # {"class" : logprior}
        #C_NB = argmax (logprior + sum logll)
        for label in self.classes:
            sums[label] = self.logprior[label]
            # go through each bigram
            for bigram in test_doc:
                if bigram in df_bigrams_counter:
                    sums[label] += self.logll[label][bigram]
                    
        # get class with highest score
        argmax = max(sums, key=sums.get)
        return argmax

In [24]:
# initialize test corpus and prepocess it
test_corpus = pd.read_csv("data/test.csv")
# preprocess 
test_corpus['Body'] = test_corpus['Body'].apply(lambda x: remove_html_tags(x))
test_corpus['Body_tokenized'] = test_corpus['Body'].apply(lambda x: WordPunctTokenizer().tokenize(x))
test_corpus['Body_tokenized_padded'] = test_corpus['Body_tokenized'].apply(lambda x: list(pad_both_ends(x, n=n_order)))

# map unk if oov from training corpus
test_corpus['Body_tokenized_padded_UNK'] = test_corpus['Body_tokenized_padded'].apply(lambda x: map_tokens_to_UNK(x, training_padded_v))

# add bigrams
test_corpus['Body_padded_bigrams'] = [list(ngrams(tokens, n_order))
                                       for tokens in test_corpus['Body_tokenized_padded_UNK']
                                      ]
print(test_corpus.keys())
print(test_corpus.shape)

Index(['Body', 'Y', 'Body_tokenized', 'Body_tokenized_padded',
       'Body_tokenized_padded_UNK', 'Body_padded_bigrams'],
      dtype='object')
(3500, 6)


In [25]:
def test_perplexity(test_data, alpha):
    """
        testing perplexity on class **unconditional** bigrams
        p(w_2 | w_1 ) = P( bigram counts / w_1 counts )
    """
    log_prob_res = 0.0
    m = 0
    for i in range(1, len(test_data)): # for each question
        for bigram in test_data[i]:
            # for each token
            context_counts = df_unigrams_counter[bigram[0]]
            total_tokens = len(df_tokens_flattened)
            
            log_prob_res += log2( (df_bigrams_counter[bigram] + alpha) / (context_counts + (total_tokens * alpha)))
            m += 1
        
    ll = log_prob_res / m #log likelihood
    perplexity = pow(2.0, -ll)
    
    return perplexity

In [26]:
def optimal_value(l):
    res = {}
    for alpha in l:
        pp = test_perplexity(df_corpus['Body_padded_bigrams'], alpha)
        res[alpha] = pp
    return min(res, key=res.get)
    
alpha = optimal_value([0.1, 0.01, 0.001, 0.0001])
print("Optimal alpha : \n",alpha)

Optimal alpha : 
 0.0001


In [27]:
# create binary class conditional model
# our voc is the total number of bigrams in training corpus
# restricted voc was used to map any oov  token ( with threshold = 3) to '<UNK>'
bigramMNB_Model = BigramMultinomialNB(df_corpus, classes, alpha=alpha, voc = df_unigrams_counter) # smoothing according to optimal alpha
bigramMNB_Model.train()

print(bigramMNB_Model)

total docs    = 14000
labels        = ['HQ', 'LQ']
_____________________
HQ
P(c)          = -0.6931471805599453
_____________________
LQ
P(c)          = -0.6931471805599453
_____________________



In [28]:
bigramMNB_accuracy = bigramMNB_Model.test_accuracy(test_corpus, data_type='Body_padded_bigrams')
print("Accuracy prediction for bigram MNB on test data :\n", bigramMNB_accuracy)

Accuracy prediction for bigram MNB on test data :
 86.371
