## Imports & Constants

In [17]:
import numpy as np
import pandas as pd
import numpy.typing as npt
from abc import ABC, abstractmethod
from string import ascii_uppercase
from sklearn.naive_bayes import MultinomialNB, GaussianNB

from dataset_types import ReviewDataSet
from feature_generation import FeatureSetGenerator
from feature_normalization import FeatureSetNormalizer

POSITIVE_REVIEWS_DIR = "./data/pos/"
NEGATIVE_REVIEWS_DIR = "./data/neg/"

## Data

In [2]:
dataset = ReviewDataSet([POSITIVE_REVIEWS_DIR, NEGATIVE_REVIEWS_DIR]).load()

In [3]:
feature_set_a = FeatureSetGenerator(dataset)\
    .stem()\
    .remove_punctuation()\
    .remove_stopwords()\
    .create_n_grams(1)

feature_set_b = FeatureSetGenerator(dataset)\
    .stem()\
    .remove_punctuation()\
    .remove_stopwords()\
    .create_n_grams(1)

feature_set_c = FeatureSetGenerator(dataset)\
    .stem()\
    .remove_punctuation()\
    .remove_stopwords()\
    .create_everygrams(2)

norm_feature_set_a = FeatureSetNormalizer(feature_set_a).perform_tf_norm()
norm_feature_set_b = FeatureSetNormalizer(feature_set_b).perform_tf_idf_norm()
norm_feature_set_c = FeatureSetNormalizer(feature_set_c).perform_tf_idf_norm()

# Freeing some memory.
del feature_set_a
del feature_set_b
del feature_set_c

In [4]:
X_train_a, y_train_a, X_dev_a, y_dev_a, X_test_a, y_test_a = norm_feature_set_a.split_into_train_dev_test_arrays("polarity", 0.3)
X_train_b, y_train_b, X_dev_b, y_dev_b, X_test_b, y_test_b = norm_feature_set_b.split_into_train_dev_test_arrays("polarity", 0.3)
X_train_c, y_train_c, X_dev_c, y_dev_c, X_test_c, y_test_c = norm_feature_set_c.split_into_train_dev_test_arrays("polarity", 0.3)

# Freeing some memeory.
del norm_feature_set_a
del norm_feature_set_b
del norm_feature_set_c

## Naive Bayes Classifier

Naive Bayes uses a 'bag of words approach', where individual words constitute its features, and the other of the words is ignored.
- The adjective 'Naive' connotes the asssumption that the occurence of features in a dataset are mutually independent.
    - In reality, this is a performance-damaging assumption

[Variants of Naive Bayes Classifiers](https://en.wikipedia.org/wiki/Naive_Bayes_classifier)
- With a `mutlinomial` event model, samples (feature vectors) the frequencies with which certain events
have been generated by a `multinomial` (p_1, ..., p_n) where p_i is the probability that event i occurs.
    - Or K such multinomials in the _multiclass_ case
    - This is the event model typically used for document classification (e.g., sentiment analysis)
        - Events can represent the occurence of a word in a single document
        - In our case, events represent the TF-IDF of words in a document
- `Guassian` distributions are typically used when dealing with continuous data
    - I gather `mutlinomial` is much better for NLP applications

### Helpful Definitions
**Prior Probability**: The probability of an event (e.g., a spam email) before the collection of new data.
- I.e., if prior observations are used to calculate the probability, we call it the prior probability

**Conditional Likelihoods**:

**Maximum a Posteriori**:

In [5]:
class IClassifier(ABC):

    @abstractmethod
    def fit(self):
        raise NotImplementedError

    @abstractmethod
    def predict(self):
        raise NotImplementedError


class NaiveBayesClassifier(IClassifier):

    def __init__(self, k: int = 2, alpha: float = 1):
        """
        Instantiates a Mutlinomial Naive Bayes Classifier.

        :param k: The number of classes in the classification problem.
        :param alpha: A lidsstone smoothing parameter.
        """
        self.k = k
        self.alpha = alpha

    def fit(self, data: npt.NDArray, labels: npt.NDArray):
        num_samples, num_features = data.shape

        # Allocate memory for the class prior probabilities and class conditional feature likelihoods.
        self.class_prior_probas = np.zeros(shape=(self.k,))
        self.class_conditional_feature_likelihoods = np.zeros(shape=(self.k, num_features))

        for cls in range(self.k):
            samples_of_class = data[labels == cls]
            class_sample_count = samples_of_class.shape[0]

            # Probability class cls occurs. P(Y)
            self.class_prior_probas[cls] = class_sample_count / num_samples

            # Term frequencies for class `cls`.
            cls_global_feature_totals = samples_of_class.sum(axis=0) # shape = (num_features, )

            # Compute the conditional probability for each feature given the class
            # (including Lidstone smoothing). P(X | Y)
            self.class_conditional_feature_likelihoods[cls, :] = (
                    (cls_global_feature_totals + self.alpha) /
                    (np.sum(cls_global_feature_totals) + self.alpha * num_features)
                )

    def predict(self, data: npt.NDArray) -> npt.NDArray:
        # Calculate the log of the priors.
        log_class_priors = np.log(self.class_prior_probas)

        # Calculate the log probabilities of each class for each sample. y = b + Wx
        logits = log_class_priors + (data @ np.log(self.class_conditional_feature_likelihoods).T)

        # For each sample, get the index of the maximum logit (i.e., the predicted class)
        return np.argmax(logits, axis=1)

    def legacy_predict(self, data: npt.NDArray):
        num_samples, _ = data.shape

        # Allocate memory for our predictions.
        predictions = np.zeros(shape=(num_samples,))

        # Calculate the log of class prior probabilities.
        log_class_priors = np.log(self.class_prior_probas)

        for idx, sample in enumerate(data):
            # Log probabilities of each class for the current sample.
            logits = np.zeros(shape=(self.k,))

            for cls in range(self.k):
                # The log probability of each feature given the class.
                feature_log_probabilities = sample * np.log(self.class_conditional_feature_likelihoods[cls, :])

                # Sum the feature log probabilties to get the total log probability of the sample
                # given the class.
                log_prob_features_given_class = np.sum(feature_log_probabilities)

                # Add the prior probability of the class itself to get the final log probability
                # for the class.
                logits[cls] = log_class_priors[cls] + log_prob_features_given_class

            predictions[idx] = np.argmax(logits)

        return predictions

## Evaluation & Comparison Classes

In [31]:
class ClassifierPerformanceSummary:

    def __init__(self, accuracy: float, precision: float, recall: float, f1: float):
        self.accuracy = accuracy
        self.precision = precision
        self.recall = recall
        self.f1 = f1

    def __repr__(self) -> str:
        return f"""
        Accuracy:  {self.accuracy*100:.4f}%
        Precision: {self.precision*100:.4f}%
        Recall:    {self.recall*100:.4f}%
        F1:        {self.f1*100:.4f}%
        """
    
    def as_dict(self) -> dict:
        return {
            'accuracy': self.accuracy,
            'precision': self.precision,
            'recall': self.recall,
            'f1': self.f1
        }

    def as_df(self) -> pd.DataFrame:
        return pd.DataFrame(self.as_dict(), index=[0])


class BinaryClassifierEvaluator:

    def __init__(self, ground_truth: npt.NDArray, predictions: npt.NDArray):
        self.predictions = predictions
        self.ground_truth = ground_truth
        self.confusion_matrix = self.produce_confusion_matrix()
        """
        [[ TP, FP ],
         [ FN, TN ]]
        """

    def produce_confusion_matrix(self) -> npt.NDArray:
        confusion_matrix = np.zeros(shape=(2, 2))

        # True positives: where both prediction and ground truth are 1.
        confusion_matrix[0, 0] = np.sum((self.predictions == 1) & (self.ground_truth == 1))

        # False positives: where prediction is 1, but ground truth is 0.
        confusion_matrix[0, 1] = np.sum((self.predictions == 1) & (self.ground_truth == 0))
        
        # False negatives: where prediction is 0, but ground truth is 1.
        confusion_matrix[1, 0] = np.sum((self.predictions == 0) & (self.ground_truth == 1))
        
        # True negatives: where both prediction and ground truth are 0.
        confusion_matrix[1, 1] = np.sum((self.predictions == 0) & (self.ground_truth == 0))

        return confusion_matrix.astype(int)

    def calculate_accuracy(self) -> float:
        """
        Calculates (TP + TN) / Num Predictions.
        """
        return (self.confusion_matrix[0, 0] + self.confusion_matrix[1, 1]) / self.confusion_matrix.sum()

    def calculate_recall(self) -> float:
        """
        Calculates TP / (TP + FN)
        """
        return self.confusion_matrix[0, 0] / self.confusion_matrix[:, 0].sum()

    def calculate_precision(self) -> float:
        """
        Calculates TP / (TP + FP).
        """
        return self.confusion_matrix[0, 0] / self.confusion_matrix[0, :].sum()
    
    def calculate_f1(self) -> float:
        """
        Calculates the harmonic mean of precision and recall.
        """
        precision = self.calculate_precision()
        recall = self.calculate_recall()

        return 2 * (precision * recall) / (precision + recall)
    
    def get_summary(self) -> ClassifierPerformanceSummary:
        return ClassifierPerformanceSummary(
            self.calculate_accuracy(),
            self.calculate_precision(),
            self.calculate_recall(),
            self.calculate_f1()
        )
    

class BaseComparator:
    """Base Comparator Class."""

    def __init__(self, X_trains: list[npt.NDArray], y_trains: list[npt.NDArray]):
        """
        Initialises a comparator. The length of X_trains must equal the length of y_trains.

        :param X_trains: a list of training datasets.
        :param y_trains: a list of training label sets.
        """
        self.X_trains = X_trains
        self.y_trains = y_trains


class FeatureSetComparator(BaseComparator):
    """Class for comparing feature sets."""

    def train_and_evaluate(
            self, 
            cls: IClassifier, 
            X_train: npt.NDArray, 
            y_train: npt.NDArray, 
            X_test: npt.NDArray, 
            y_test: npt.NDArray, 
            hyperparams: dict
            ) -> pd.DataFrame:
        
        classifier: IClassifier = cls(**hyperparams)
        classifier.fit(X_train, y_train)

        predictions = classifier.predict(X_test)
        return BinaryClassifierEvaluator(y_test, predictions).get_summary().as_dict()
    
    def compare(
            self, 
            classifier_cls: IClassifier, 
            X_tests: list[npt.NDArray], 
            y_tests: list[npt.NDArray], 
            hyperparams: dict
            ) -> pd.DataFrame:
        
        performance_data = pd.DataFrame()
        for i in range(len(self.X_trains)):
            row_index = f"{classifier_cls.__name__} - Set {ascii_uppercase[i]}"
            performance_data[row_index] = self.train_and_evaluate(
                classifier_cls,
                self.X_trains[i],
                self.y_trains[i],
                X_tests[i],
                y_tests[i],
                hyperparams
            )

        return performance_data.T


class ClassifierComparator(BaseComparator):
    """Class for comparing classifiers across multiple feature sets."""

    def compare(
            self, 
            classifier_classes: list[IClassifier], 
            X_tests: list[npt.NDArray], 
            y_tests: list[npt.NDArray]
            ) -> pd.DataFrame:
        
        classifier_data = pd.DataFrame()
        for cls in classifier_classes:
            performance = FeatureSetComparator(self.X_trains, self.y_trains)\
                .compare(cls, X_tests, y_tests, {})
            
            classifier_data = pd.concat([classifier_data, performance], axis=0)

        return classifier_data

## Evaluating our Naive Bayes Classifier

### Comparing performance across feature sets

In [43]:
comparator = FeatureSetComparator(
    [X_train_a, X_train_b, X_train_c],
    [y_train_a, y_train_b, y_train_c]
)

naive_bayes_performance = comparator.compare(
    NaiveBayesClassifier,
    [X_dev_a, X_dev_b, X_dev_c],
    [y_dev_a, y_dev_b, y_dev_c],
    {}
)

naive_bayes_performance

Unnamed: 0,accuracy,precision,recall,f1
NaiveBayesClassifier - Set A,0.798333,0.894273,0.676667,0.770398
NaiveBayesClassifier - Set B,0.83,0.839041,0.816667,0.827703
NaiveBayesClassifier - Set C,0.856667,0.868966,0.84,0.854237


### Evaluating the model on the test split of the best feature set

In [46]:
classifier = NaiveBayesClassifier(2, 1)
classifier.fit(X_train_c, y_train_c)

test_predictions = classifier.predict(X_test_c)
test_performance = BinaryClassifierEvaluator(y_test_c, test_predictions).get_summary().as_df()

test_performance.index = [f"NaiveBayesClassifier - Test Set C"]
test_performance

Unnamed: 0,accuracy,precision,recall,f1
NaiveBayesClassifier - Test Set C,0.868333,0.879725,0.853333,0.866328


## Evaluating Sklearn's Multinomial Naive Bayes Classifier

### Comparing performance across feature sets and models

In [42]:
comparator = ClassifierComparator(
    [X_train_a, X_train_b, X_train_c],
    [y_train_a, y_train_b, y_train_c]
)

classifier_performance = comparator.compare(
    [MultinomialNB, GaussianNB, NaiveBayesClassifier],
    [X_dev_a, X_dev_b, X_dev_c],
    [y_dev_a, y_dev_b, y_dev_c],
)

classifier_performance

Unnamed: 0,accuracy,precision,recall,f1
MultinomialNB - Set A,0.798333,0.894273,0.676667,0.770398
MultinomialNB - Set B,0.83,0.839041,0.816667,0.827703
MultinomialNB - Set C,0.856667,0.868966,0.84,0.854237
GaussianNB - Set A,0.67,0.665584,0.683333,0.674342
GaussianNB - Set B,0.66,0.655844,0.673333,0.664474
GaussianNB - Set C,0.758333,0.760943,0.753333,0.757119
NaiveBayesClassifier - Set A,0.798333,0.894273,0.676667,0.770398
NaiveBayesClassifier - Set B,0.83,0.839041,0.816667,0.827703
NaiveBayesClassifier - Set C,0.856667,0.868966,0.84,0.854237


### Evaluating the best model on the test split of the best feature set

In [50]:
mnb_classifier = MultinomialNB()
mnb_classifier.fit(X_train_c, y_train_c)

sklearn_predictions = mnb_classifier.predict(X_test_c)
sklearn_test_performance = BinaryClassifierEvaluator(y_test_c, sklearn_predictions).get_summary().as_df()

sklearn_test_performance.index = [f"MultinomialNB - Test Set C"]
sklearn_test_performance

Unnamed: 0,accuracy,precision,recall,f1
MultinomialNB - Test Set C,0.868333,0.879725,0.853333,0.866328
