# Naive Bayes Classifier

## Imports

In [1]:
import numpy
from scipy.stats import norm

### Class Definition

In [2]:
class NaiveBayesClassifier:
    def __init__(self, trainingFile, testingFile, threshold=5):
        self.Xtrain = numpy.loadtxt(trainingFile)
        self.n = self.Xtrain.shape[0] # number of training examples
        self.d = self.Xtrain.shape[1]-1 # number of features in each example
    
        """
        - Threshold for separating categorical and continuous features
        - Represents number of distinct values in a column
        """
        self.threshold = threshold 


        self.Xtest = numpy.loadtxt(testingFile)
        self.nn = self.Xtest.shape[0] # Number of points in the testing data.

        self.pos_idx = set() # map of positive example indices
        self.neg_idx = set() # map of negative example indices

        self.tp = 0 #True Positive
        self.fp = 0 #False Positive
        self.tn = 0 #True Negative
        self.fn = 0 #False Negative

        # we need to call the functions below to initialize the model
        self.find_idx_sets()
        self.p_positive, self.p_negative = self.main_class_probability()
    

    def col_type(self, column):
        """
        column: the column index to determine the type of
        Determine if the column is categorical or continuous.

        - Categorical if the number of distinct values is less than the threshold
        - Continuous if the number of distinct values is greater than the threshold
        """
        distinct_values = set()
        for i in range(self.n):
            distinct_values.add(self.Xtrain[i][column])
    
        if len(distinct_values) < self.threshold:
            # Categorical
            return 0
        else:
            # Continuous
            return 1
    

    def calc_col_mean(self, indexes, column):
        """
        indexes: can be pos_idx or neg_idx 
        column: the column index to calculate mean for
        Calculate mean of a column in the training data for the given indexes.
        """

        total = 0
        for i in indexes:
            total += self.Xtrain[i][column]
        mean = total / len(indexes)
                
        return mean
    
    
    def calc_col_std(self, indexes, column):
        """
        indexes: can be pos_idx or neg_idx 
        column: the column index to calculate std for
        Calculate std of a column in the training data for the given indexes.
        """
        total = 0
        for i in indexes:
            total += (self.Xtrain[i][column] - self.calc_col_mean(indexes, column))**2
        std = (total / len(indexes))**0.5
                
        return std
    

    def find_idx_sets(self):
        """
        Find the positive and negative example indices in the training data.
        """
        for element in range(0, self.n):
            if self.Xtrain[element][4] == 1.0:
                self.pos_idx.add(element)
            else:
                self.neg_idx.add(element)
        return len(self.pos_idx), len(self.neg_idx)


    def main_class_probability(self):
        """
        Calculate the prior probabilities of positive and negative classes.
        p_positive: probability of positive class
        p_negative: probability of negative class
        """
        p_positive = len(self.pos_idx) / self.n
        p_negative = len(self.neg_idx) / self.n
        return p_positive, p_negative
    

    def print_stats(self):
        """
        Print the statistics of the classifier.
        """
        print(f"True Positives: {self.tp}")
        print(f"False Positives: {self.fp}")
        print(f"True Negatives: {self.tn}")
        print(f"False Negatives: {self.fn}")
        
        accuracy = (self.tp + self.tn) / (self.tp + self.tn + self.fp + self.fn)
        print(f"Accuracy: {accuracy:.2f}")
        
        precision = self.tp / (self.tp + self.fp)
        print(f"Precision: {precision:.2f}")
        
        recall = self.tp / (self.tp + self.fn)
        print(f"Recall: {recall:.2f}")

    
    def classify(self):
        for row in range(self.nn):
            curr_prob_pos = 1.0
            curr_prob_neg = 1.0

            for col in range(self.d):
                if self.col_type(col) == 0:
                    # if categorical, find the probability of that feature given the class
                    # use the count of the feature in the training data to calculate the probability of that feature given the class

                    count_pos = 0
                    count_neg = 0
                    for i in self.pos_idx:
                        if self.Xtrain[i][col] == self.Xtest[row][col]:
                            count_pos += 1
                    for i in self.neg_idx:
                        if self.Xtrain[i][col] == self.Xtest[row][col]:
                            count_neg += 1
                    # calculate the probability of that feature given the class
                    prob_pos = count_pos / len(self.pos_idx)
                    prob_neg = count_neg / len(self.neg_idx)
                    # multiply the probability of that feature given the class with the current probability
                    curr_prob_pos *= prob_pos
                    curr_prob_neg *= prob_neg

                elif self.col_type(col) == 1:
                    # if continuous, find the mean and std of that feature given the class  
                    col_pos_mean = self.calc_col_mean(self.pos_idx, col)
                    col_pos_std = self.calc_col_std(self.pos_idx, col)
                    col_neg_mean = self.calc_col_mean(self.neg_idx, col)
                    col_neg_std = self.calc_col_std(self.neg_idx, col)

                    # calculate the probability of that feature given the class
                    curr_prob_pos *= norm.pdf(self.Xtest[row][col], col_pos_mean, col_pos_std)
                    curr_prob_neg *= norm.pdf(self.Xtest[row][col], col_neg_mean, col_neg_std)
                    
            # calculate the posterior probabilities
            curr_prob_pos *= self.p_positive
            curr_prob_neg *= self.p_negative

            # print(f"{curr_prob_pos=}, {curr_prob_neg=} ")
            
            # classify the row example
            if curr_prob_pos > curr_prob_neg:
                if self.Xtest[row][4] == 1.0:
                    self.tp += 1
                else:
                    print(f"False Positive at {row=}: ")
                    print(f"Predicted Class: ", 1.0)
                    print(f"Actual Class: {self.Xtest[row][4]}\n")
                    self.fp += 1
            else:
                if self.Xtest[row][4] == -1.0:
                    self.tn += 1
                else:
                    print(f"False Positive at {row=}: ")
                    print(f"Predicted Class: ", -1.0)
                    print(f"Actual Class: {self.Xtest[row][4]}\n")
                    self.fn += 1


### Testing and Classifying

In [3]:
irisTrainingFile = "./misc/irisTraining.txt"
irisTestingFile = "./misc/irisTesting.txt"

buyTrainingFile = "./misc/buyTraining.txt"
buyTestingFile = "./misc/buyTesting.txt"

In [4]:
iris_classifier = NaiveBayesClassifier(irisTrainingFile, irisTestingFile)
iris_classifier.classify()
iris_classifier.print_stats()

False Positive at row=17: 
Predicted Class:  1.0
Actual Class: -1.0

True Positives: 16
False Positives: 1
True Negatives: 33
False Negatives: 0
Accuracy: 0.98
Precision: 0.94
Recall: 1.00


In [5]:
buy_classifier = NaiveBayesClassifier(buyTrainingFile, buyTestingFile)
buy_classifier.classify()
buy_classifier.print_stats()

False Positive at row=0: 
Predicted Class:  1.0
Actual Class: -1.0

True Positives: 2
False Positives: 1
True Negatives: 1
False Negatives: 0
Accuracy: 0.75
Precision: 0.67
Recall: 1.00
