In [9]:
import string
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.naive_bayes import GaussianNB
from sklearn.naive_bayes import MultinomialNB
from sklearn import metrics
import nltk
from nltk.stem import WordNetLemmatizer
nltk.download("wordnet",quiet=True)
nltk.download("omw-1.4",quiet=True)
nltk.download('punkt',quiet=True)
nltk.download("stopwords",quiet=True)
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords


class NaiveBayesImplementation():
    vocab = {}
    token_class_matrix = {}
    naive_bayes_classifier = -1
    classes = {}
    token_class_matrix_org = {}
    vectorizer = -1
    processing_technique = -1
    word_count_mat = -1
    class_prior_prob = {}

    def calculate_prior(self,data_y):
        classes = sorted(list(data_y.unique()))
        prior = {}
        for i in classes:
            prior[i] = (len(data_y[data_y==i])/len(data_y))
        self.class_prior_prob = prior
        return prior
    
    def prob_token_given_class(self,token,cl):
        prob_word = self.word_count_mat[token]/self.word_count_mat["tot_words"]
        prob_class = self.class_prior_prob[cl]
        tf_val = self.token_class_matrix_org[token][cl]
        icf_val = np.log10((len(self.classes)+1)/(self.token_class_matrix_org[token]["freq"]+1))
        numerator = tf_val * icf_val
        denominator = 0
        for i in self.classes:
            tf_val = self.token_class_matrix_org[token][i]
            icf_val = np.log10((len(self.classes)+1)/(self.token_class_matrix_org[token]["freq"]+1))
            tf_icf_val = tf_val * icf_val
            denominator = denominator + tf_icf_val
        prob_class_given_token = numerator/denominator
        fin_prob_token_given_class = (prob_class_given_token * prob_word)/(prob_class)
        return fin_prob_token_given_class


    def get_token_class_matrix(self,data_X, data_y): # matrix with row as token and column as class, stores count of token in the class
        classes = data_y.unique()
        new_dict = {}
        for i in classes:
            new_dict[i] = 0
        mat = {}
        for i in range(len(data_X)):
            article = data_X.iloc[i]
            this_class = data_y.iloc[i]
            for token in article.split():
                if token not in mat:
                    mat[token] = new_dict.copy()
                mat[token][this_class] += 1
        for i in range(len(data_X)):
            article = data_X.iloc[i]
            this_class = data_y.iloc[i]
            for token in article.split():
                if("freq" not in mat[token]):
                    freq = 0
                    for j in mat[token]:
                        if(mat[token][j] > 0):
                            freq+=1
                    mat[token]["freq"] = freq
        return mat

    def get_vocabulary(self,data_X):
        vocabulary = {}
        self.word_count_mat = {}
        k = 0
        tot_words = 0
        for i in range(len(data_X)):
            article = data_X.iloc[i]
            tokens = article.split()
            for token in tokens:
                if(token not in vocabulary):
                    vocabulary[token] = k
                    k+=1
                if(token not in self.word_count_mat):
                    self.word_count_mat[token] = 1
                else:
                    self.word_count_mat[token]+=1
                tot_words+=1
        self.word_count_mat["tot_words"] = tot_words
        return vocabulary

    def get_token_class_tficf_matrix(self,data_X, data_y): # matrix with row as token and column as class, stores tficf value of token for each class
        self.token_class_matrix_org = self.get_token_class_matrix(data_X,data_y)
        self.classes = {}
        id_val = 0
        for cl in data_y.unique():
            self.classes[cl] = id_val
            id_val+=1 
        self.vocab = self.get_vocabulary(data_X)
        mat = []
        for i in range(len(data_X)):
            article = data_X.iloc[i]
            article_mat = []
            for cl in self.classes:
                class_vec = [0] * len(self.vocab)
                for token in article.split():
                    token_id = self.vocab[token]
                    tf_val = self.token_class_matrix_org[token][cl]
                    icf_val = np.log10((len(self.classes)+1)/(self.token_class_matrix_org[token]["freq"]+1))
                    tf_icf_val = tf_val * icf_val
                    class_vec[token_id] = tf_icf_val
                article_mat.append(class_vec)
            mat.append(article_mat)
        return mat
    
    def fit(self,data_X,data_y,model = "Multinomial", processing_technique = "tficf",ngram = (1,1)):
        self.processing_technique = processing_technique
        if(processing_technique == "tficf"):
            self.token_class_matrix = self.get_token_class_tficf_matrix(data_X,data_y)
            new_arr = np.array(self.token_class_matrix)
            classifier_arr = []
            for i in new_arr:
                classifier_arr.append(i.flatten())
            classifier_arr = np.array(classifier_arr)
        elif(processing_technique == "tfidf"):
            cv=TfidfVectorizer() 
            classifier_arr = cv.fit_transform(data_X)
            self.vectorizer = cv
        else:
            cv=CountVectorizer(ngram_range = ngram) 
            classifier_arr = cv.fit_transform(data_X)
            self.vectorizer = cv
        if(model == "Gaussian"):
            self.naive_bayes_classifier = GaussianNB()
        elif(model == "Multinomial"):
            self.naive_bayes_classifier = MultinomialNB()
        self.naive_bayes_classifier.fit(classifier_arr, data_y)

    def predict(self,data_X):
        if(self.processing_technique == "tficf"):
            mat = []
            for i in range(len(data_X)):
                article = data_X.iloc[i]
                article_mat = []
                for cl in self.classes:
                    class_vec = [0] * len(self.vocab)
                    for token in article.split():
                        if(token in self.vocab):
                            token_id = self.vocab[token]
                            tf_val = self.token_class_matrix_org[token][cl]
                            icf_val = np.log10((len(self.classes)+1)/(self.token_class_matrix_org[token]["freq"]+1))
                            tf_icf_val = tf_val * icf_val
                            class_vec[token_id] = tf_icf_val
                    article_mat.append(class_vec)
                mat.append(article_mat)
            mat = np.array(mat)
            classifier_arr = []
            for i in mat:
                classifier_arr.append(i.flatten())
            classifier_arr = np.array(classifier_arr)
        elif(self.processing_technique== "tfidf"):
            classifier_arr = self.vectorizer.transform(data_X)
        else:
            classifier_arr = self.vectorizer.transform(data_X)

        pred = self.naive_bayes_classifier.predict(classifier_arr) 
        return pred
    
    def show_metrics(self,y_test,y_pred):
        score1 = metrics.accuracy_score(y_test, y_pred)
        print("Accuracy:   %0.3f" % (score1*100))

        score2 = metrics.recall_score(y_test, y_pred, average = "micro")
        print("Recall:   %0.3f" % (score2))

        score3 = metrics.precision_score(y_test, y_pred, average = "micro")
        print("Precision:   %0.3f" % (score3))

        score4 = metrics.f1_score(y_test, y_pred, average = "micro")
        print("F1 Score:   %0.3f" % (score4))

def preProcessData(dataset):
    data_X = dataset.copy()
    wnl = WordNetLemmatizer()
    for i in range(len(data_X)):
        article = data_X[i]
        new_s = article.lower() # data converted to lower case
        translate_table = dict((ord(char), " ") for char in string.punctuation)   
        new_s = new_s.translate(translate_table) # punctuations removed from data
        li = word_tokenize(new_s) # words tokenized 
        stop_words = set(stopwords.words("english")) # stop words identified
        filter_li = []
        for words in li:
            if(words not in stop_words):
                filter_li.append(words) # stop words removed
        for j in range(len(filter_li)):
            filter_li[j] = wnl.lemmatize(filter_li[j], pos="v") # every token lemmatized to find root form 
        processed_article =  " ".join(filter_li) # tokens joined back with a space 
        data_X[i] = processed_article #new string written back to dataset object
    return data_X

def read_dataset(path = 'Q2_Dataset/BBC News Train.csv', preprocess = True):
    dataset = pd.read_csv(path) 
    dataset.drop(columns=["ArticleId"])
    data_X = dataset['Text'] 
    data_y = dataset['Category']
    if(preprocess):
        data_X = preProcessData(dataset = data_X)
    return data_X,data_y

def check_test_train(data_X, data_y, test = 0.30, train = 0.70):
    if(train + test > 1):
        print("Invalid Split")
        return -1
    X_train, X_test, y_train, y_test = train_test_split(data_X, data_y, test_size=test,train_size=train,random_state=0)
    
    print("Using TFICF:")
    nb = NaiveBayesImplementation()
    nb.fit(X_train,y_train)
    y_pred = nb.predict(X_test)
    nb.show_metrics(y_test,y_pred)
    print()

    print("Using TFIDF Vectorizer:")
    nb1 = NaiveBayesImplementation()
    nb1.fit(X_train,y_train,processing_technique="tfidf")
    y_pred = nb1.predict(X_test)
    nb1.show_metrics(y_test,y_pred)
    print()

    print("Using n-gram(Unigram):")
    nb2 = NaiveBayesImplementation()
    nb2.fit(X_train,y_train,processing_technique="count_vec")
    y_pred = nb2.predict(X_test)
    nb2.show_metrics(y_test,y_pred)
    print()

    print("Using n-gram(Bigram):")
    nb3 = NaiveBayesImplementation()
    nb3.fit(X_train,y_train,processing_technique="count_vec",ngram=(2,2))
    y_pred = nb3.predict(X_test)
    nb3.show_metrics(y_test,y_pred)
    print()

    print("Using n-gram(Unigram and Bigram):")
    nb4 = NaiveBayesImplementation()
    nb4.fit(X_train,y_train,processing_technique="count_vec",ngram=(1,2))
    y_pred = nb4.predict(X_test)
    nb4.show_metrics(y_test,y_pred)

data_X, data_y = read_dataset()
splits = [(),(),(),(),()]
for i in range(len(splits)):
    test_split = (i+1)/10
    splits[i] = (test_split, 1 - test_split)

for i in splits:
    print("Data For Test =",i[0],"and Train =",i[1],":")
    check_test_train(data_X,data_y,i[0],i[1])
    print("---------------------------------------------")

Data For Test = 0.1 and Train = 0.9 :
Using TFICF:
Accuracy:   93.289
Recall:   0.933
Precision:   0.933
F1 Score:   0.933

Using TFIDF Vectorizer:
Accuracy:   95.973
Recall:   0.960
Precision:   0.960
F1 Score:   0.960

Using n-gram(Unigram):
Accuracy:   95.973
Recall:   0.960
Precision:   0.960
F1 Score:   0.960

Using n-gram(Bigram):
Accuracy:   95.302
Recall:   0.953
Precision:   0.953
F1 Score:   0.953

Using n-gram(Unigram and Bigram):
Accuracy:   95.302
Recall:   0.953
Precision:   0.953
F1 Score:   0.953
---------------------------------------------
Data For Test = 0.2 and Train = 0.8 :
Using TFICF:
Accuracy:   95.638
Recall:   0.956
Precision:   0.956
F1 Score:   0.956

Using TFIDF Vectorizer:
Accuracy:   96.644
Recall:   0.966
Precision:   0.966
F1 Score:   0.966

Using n-gram(Unigram):
Accuracy:   97.315
Recall:   0.973
Precision:   0.973
F1 Score:   0.973

Using n-gram(Bigram):
Accuracy:   96.980
Recall:   0.970
Precision:   0.970
F1 Score:   0.970

Using n-gram(Unigram and