In [434]:
import os
import random

### Data Class

In [435]:
class Data:
    def __init__(self, id: int, class_: str | None, abstract: str) -> None:
        self.id: int = id
        self.class_: str = class_
        self.abstract: str = abstract

    def get_type(self) -> str:
        '''Returns: "test" | "train"'''
        return "train" if self.class_ else "test"

### DataModel Class

In [436]:
class DataModel:
    def __init__(self, data_path: str) -> None:
        self.data: list[Data] = []
        self.vocabulary_size: int = 0
        if not os.path.exists(data_path):
            self.data = []
        else:
            with open(data_path, 'r') as f:
                lines = f.readlines()
            for line in lines[1:]:
                line_split = line.strip().replace('"', '').split(',')
                if len(line_split) == 3:
                    id, class_, abstract = line_split
                    data = Data(id, class_, abstract)
                    self.data.append(data)
                else:
                    id, abstract = line_split
                    data = Data(id, None, abstract)
                    self.data.append(data)
        self.vocabulary_size = self.get_vocabulary_size()
    

    def get_vocabulary_size(self) -> int:
        '''Returns the number of unique words in the dataset.'''
        vocabulary = set()
        for data in self.data:
            words = data.abstract.split()
            for word in words:
                vocabulary.add(word)
        self.vocabulary_size = len(vocabulary)
        return self.vocabulary_size


    def split_model(self, proportion: float) -> 'DataModel':
        '''Splits this dataset into two data sets based on the proportion of the current data to be in the new split.'''
        split_data = DataModel('')
        # random.shuffle(self.data)
        split_index = int(len(self.data) * proportion)
        split_data.data = self.data[:split_index]
        self.data = self.data[split_index:]
        self.vocabulary_size = self.get_vocabulary_size()
        return split_data

### Basic Naive Bayes Classifier

In [437]:
class StandardNaiveBayes:
    def __init__(self) -> None:
        self.training_data: DataModel = DataModel(
            data_path=os.path.join(os.path.join("data", "trg.csv"))
        )
        self.testing_data: DataModel = DataModel(
            data_path=os.path.join(os.path.join("data", "tst.csv"))
        )
        self.classes = [c for c in set([data.class_ for data in self.training_data.data])]
        self.class_counts = self.get_class_counts()
        self.class_probabilities = [count / len(self.training_data.data) for count in self.class_counts]
        self.word_counts = self.get_word_counts()


    def run_test_data(self, fileout: str, type_: str = "test") -> None:
        with open(fileout, 'w') as f:
            f.write("id,class\n")
            if type_ == "test":
                for data in self.testing_data.data:
                    f.write(f"{data.id},{self.classify_abstract(data.abstract)}\n")
            elif type_ == "train":
                for data in self.training_data.data:
                    f.write(f"{data.id},{self.classify_abstract(data.abstract)}\n")
            else:
                raise ValueError("Invalid type_ argument. Must be 'test', 'validation', or 'train'.")


    def get_word_probability(self, word: str, class_index: int) -> float:
        '''
        p(class|word) = p(word|class) * p(class) / p(word)
        '''
        word_count = self.word_counts[class_index].get(word, 0)
        class_count = self.class_counts[class_index]
        word_in_class = word_count / class_count
        class_probability = self.class_probabilities[class_index]
        word_in_data = sum([self.word_counts[i].get(word, 0) for i in range(len(self.classes))]) / len(self.training_data.data)
        if word_in_data == 0:
            return 0
        return word_in_class * class_probability / word_in_data


    def test_classify_abstract(self, abstract: str) -> str:
        abstract_words = abstract.split()
        class_probabilities = []
        for i in range(len(self.classes)):
            cur_class_probability = self.class_probabilities[i]
            cur_class_probability = 1
            for word in abstract_words:
                cur_word_probability = self.get_word_probability(word, i)
                if cur_word_probability == 0:
                    continue
                cur_class_probability *= cur_word_probability
                print(f"WORD: {word} | CLASS: {self.classes[i]} | PROB: {cur_class_probability}")
            class_probabilities.append(cur_class_probability)
        max_class = self.classes[class_probabilities.index(max(class_probabilities))]
        print(f"Final Probabilities: {class_probabilities}")
        return max_class


    def classify_abstract(self, abstract: str) -> str:
        '''Classifies the abstract into one of the classes. Returns the class. Uses the Naive Bayesian Classifier algorithm.'''
        abstract_words = abstract.split()
        class_probabilities = []
        for i in range(len(self.classes)):
            cur_class_probability = self.class_probabilities[i]
            cur_class_probability = 1
            for word in abstract_words:
                cur_word_probability = self.get_word_probability(word, i)
                if cur_word_probability == 0:
                    continue
                cur_class_probability *= cur_word_probability
            class_probabilities.append(cur_class_probability)
        max_class = self.classes[class_probabilities.index(max(class_probabilities))]
        return max_class


    def get_class_counts(self) -> list[int]:
        '''Returns the count of each class in the training data. Match the order of the classes with the order of self.classes (classes[i] -> class_counts[i])'''
        class_counts = [0] * len(self.classes)
        for data in self.training_data.data:
            class_counts[self.classes.index(data.class_)] += 1
        return class_counts
    

    def get_word_counts(self) -> list[dict[str, int]]:
        '''Returns the count of each word in each class. Match the order of the classes with the order of self.classes (classes[i] -> word_counts[i])'''
        word_counts = [{} for _ in range(len(self.classes))]
        for data in self.training_data.data:
            for word in data.abstract.split():
                if word not in word_counts[self.classes.index(data.class_)]:
                    word_counts[self.classes.index(data.class_)][word] = 1
                else:
                    word_counts[self.classes.index(data.class_)][word] += 1
        return word_counts

### Main Routine

In [438]:
if __name__ == "__main__":
    '''

    classifier = StandardNaiveBayes()
    classifier.run_test_data("output.csv", type_="test")

    -->    Accuracy Obtained: 0.800 (Kaggle)
    '''

# Discussion of Standard Naive Bayesian Classifier
...

# Improvements 
...
- Add-1 LaPlace Smoothing
- Multinomial Naive Bayes

In [439]:
class ImprovedNaiveBayes:
    def __init__(self, validation_data_split: float, alpha: float) -> None:
        self.training_data: DataModel = DataModel(
            data_path=os.path.join(os.path.join("data", "trg.csv"))
        )
        self.testing_data: DataModel = DataModel(
            data_path=os.path.join(os.path.join("data", "tst.csv"))
        )
        self.validation_data: DataModel = self.training_data.split_model(validation_data_split)
        print(f"Size of training data: {len(self.training_data.data)}")
        print(f"Size of validation data: {len(self.validation_data.data)}")
        print(f"Size of testing data: {len(self.testing_data.data)}")
        print(f"Vocab size of training data: {self.training_data.vocabulary_size}")


        self.alpha = alpha
        self.classes = [c for c in set([data.class_ for data in self.training_data.data])]
        self.class_counts = self.get_class_counts()
        self.word_counts = self.get_word_counts() # Change: Implemented Add-One Laplace Smoothing
        # self.word_counts: list[dict[str, int]] = [{} for _ in range(len(self.classes))]
        # self.class_counts: list[int] = [0] * len(self.classes)
        self.vocab_size = 0
        # self.estimate_parameters()
        self.class_probabilities = [count / len(self.training_data.data) for count in self.class_counts]


    def estimate_parameters(self) -> None:
        for data in self.training_data.data:
            class_index = self.classes.index(data.class_)
            self.class_counts[class_index] += 1
            for word in data.abstract.split():
                # Update word counts for each class
                self.word_counts[class_index][word] = self.word_counts[class_index].get(word, 0) + 1
                # Update vocabulary size if word is totally new (for all classes)
                for i in range(len(self.classes)):
                    if word not in self.word_counts[i]:
                        self.vocab_size += 1
                        i = len(self.classes) # Break the loop

        # Smoothing and parameter calculation
        for class_index in range(len(self.classes)):
            class_total_with_smoothing = self.class_counts[class_index] + self.alpha * self.vocab_size
            for word in self.word_counts[class_index]:
                self.word_counts[class_index][word] = (self.word_counts[class_index][word] + self.alpha) / class_total_with_smoothing


    def get_validation_accuracy(self) -> float:
        correct = 0
        for data in self.validation_data.data:
            predicted_class = self.classify_abstract(data.abstract)
            if predicted_class == data.class_:
                correct += 1
        return correct / len(self.validation_data.data)


    def run_test_data(self, fileout: str, type_: str = "test") -> None:
        with open(fileout, 'w') as f:
            f.write("id,class\n")
            if type_ == "test":
                for data in self.testing_data.data:
                    f.write(f"{data.id},{self.classify_abstract(data.abstract)}\n")
            elif type_ == "train":
                for data in self.training_data.data:
                    f.write(f"{data.id},{self.classify_abstract(data.abstract)}\n")
            else:
                raise ValueError("Invalid type_ argument. Must be 'test', 'validation', or 'train'.")


    def get_word_probability(self, word: str, class_index: int) -> float:
        '''
        p(class|word) = p(word|class) * p(class) / p(word)
        '''
        word_count = self.word_counts[class_index].get(word, 0) + 1 # Add-One Laplace Smoothing
        class_count = self.class_counts[class_index]
        word_in_class = word_count / class_count
        class_probability = self.class_probabilities[class_index]
        word_in_data = sum([self.word_counts[i].get(word, 0) for i in range(len(self.classes))]) / len(self.training_data.data) + 1
        return word_in_class * class_probability / word_in_data


    def test_classify_abstract(self, abstract: str) -> str:
        abstract_words = abstract.split()
        class_probabilities = []
        for i in range(len(self.classes)):
            cur_class_probability = self.class_probabilities[i]
            cur_class_probability = 1
            for word in abstract_words:
                cur_word_probability = self.get_word_probability(word, i)
                if cur_word_probability == 0:
                    continue
                cur_class_probability *= cur_word_probability
                print(f"WORD: {word} | CLASS: {self.classes[i]} | PROB: {cur_class_probability}")
            class_probabilities.append(cur_class_probability)
        max_class = self.classes[class_probabilities.index(max(class_probabilities))]
        print(f"Final Probabilities: {class_probabilities}")
        return max_class


    def classify_abstract(self, abstract: str) -> str:
        '''Classifies the abstract into one of the classes. Returns the class. Uses the Naive Bayesian Classifier algorithm.'''
        abstract_words = abstract.split()
        class_probabilities = []
        for i in range(len(self.classes)):
            cur_class_probability = self.class_probabilities[i]
            cur_class_probability = 1
            for word in abstract_words:
                cur_word_probability = self.get_word_probability(word, i)
                if cur_word_probability == 0:
                    continue
                cur_class_probability *= cur_word_probability
            class_probabilities.append(cur_class_probability)
        max_class = self.classes[class_probabilities.index(max(class_probabilities))]
        return max_class


    def get_class_counts(self) -> list[int]:
        '''Returns the count of each class in the training data. Match the order of the classes with the order of self.classes (classes[i] -> class_counts[i])'''
        class_counts = [0] * len(self.classes)
        for data in self.training_data.data:
            class_counts[self.classes.index(data.class_)] += 1
        return class_counts
    

    def get_word_counts(self) -> list[dict[str, int]]:
        '''Returns the count of each word in each class. Match the order of the classes with the order of self.classes (classes[i] -> word_counts[i])'''
        word_counts = [{} for _ in range(len(self.classes))]
        for data in self.training_data.data:
            for word in data.abstract.split():
                if word not in word_counts[self.classes.index(data.class_)]:
                    word_counts[self.classes.index(data.class_)][word] = 1
                else:
                    word_counts[self.classes.index(data.class_)][word] += 1
        return word_counts
    

    def save(self):
        '''Saves the word counts to a txt file'''
        with open("word_counts.txt", 'w') as f:
            for i in range(len(self.classes)):
                f.write('-'*100 + '\n')
                f.write(f"Class: {self.classes[i]}\n")
                f.write(f"Class Count: {self.class_counts[i]}\n")
                f.write(f"Class Probability: {self.class_probabilities[i]}\n\n")
                for word, count in self.word_counts[i].items():
                    f.write(f"{word}: {count}\n")


In [440]:
if __name__ == "__main__":
    classifier = ImprovedNaiveBayes(
        validation_data_split=0.5,
        alpha = 0.1
    )
    # classifier.run_test_data("new_output.csv", type_="test")
    print("Accuracy: ", classifier.get_validation_accuracy())
    classifier.save()

Size of training data: 2800
Size of validation data: 1200
Size of testing data: 1000
Vocab size of training data: 25911
Accuracy:  0.7891666666666667
