In [1]:
import numpy as np

training_spam = np.loadtxt(open("data/training_spam.csv"), delimiter=",").astype(int)
testing_spam = np.loadtxt(open("data/testing_spam.csv"), delimiter=",").astype(int)

In [2]:
import numpy as np


class NaiveBayesClassifier:
    def __init__(self, training_data, testing_data):
        self.training_data = training_data
        self.testing_data = testing_data
        self.prior_normal_probs = {}
    
    
    def probability_calc(self):
        label_arr = self.training_data[:, 0]
        # Initialise dictionaries for storing prior probabilities and feature probabilities
        prior_probs = {}
        all_feature_probs = {}
        # Laplace smoothing factor to avoid division by zero
        laplace_alpha = 1
        unique_labels = np.unique(label_arr)
        
        for label in unique_labels:
            # Count occurrences of each class label to calculate prior probabilities
            label_count = np.sum(label == label_arr)
            prior_probs[label] = np.log(label_count / len(label_arr))
            self.prior_normal_probs[label] = label_count / len(label_arr)
            probabilities = np.array([])
    
            # Initialise dictionary for storing probabilities of each feature given the class
            feature_probs = {}
            # Extract rows corresponding to the current class
            features = self.training_data[self.training_data[:, 0] == label][:, 1:]
    
            for feature in range(np.shape(features[0])[0]):  # Loop through all features 
                
                # Laplace smoothing to remove issues with zero frequency features
                prob = np.log((np.sum(features[:, feature]) + laplace_alpha) / (len(features) + laplace_alpha * 2))  
                
                probabilities = np.append(probabilities, prob) 
            
            all_feature_probs[label] = probabilities  
            
        return prior_probs, all_feature_probs



    def testing(self, prior_probs, all_feature_probs, test_element):
        label_probs = {}
        for label in np.unique(self.training_data[:, 0]):
            probabilities = np.array([])
            
            for feature in range(len(test_element)):  # Skip label in test_element
                if test_element[feature] == 1:  # Consider feature if present
                    feature_prob = all_feature_probs[label][feature]  
                    final_prob = prior_probs[label] + feature_prob
                    probabilities = np.append(probabilities, final_prob)
            
            # Calculate class probability by multiplying prior with product of feature probabilities
            label_probs[label] = np.sum(probabilities)

        return label_probs

    
    def probability_normalisation(self, log_probs):
        # Converts log probabilities to normalised probabilities 
        e_x = np.exp(log_probs - np.max(log_probs))
        return e_x / e_x.sum(axis=0)  


    def predict(self):
        prior_probs, all_feature_probs = self.probability_calc()
        normalised_probs = []

        for test_element in self.testing_data:
            log_probs = self.testing(prior_probs, all_feature_probs, test_element)
            normalised_prob = self.probability_normalisation(np.array(list(log_probs.values())))
            normalised_probs.append(normalised_prob)

        normalised_probs = np.array(normalised_probs)
        
        # Find the indices of the max values in each row
        max_indices = np.argmax(normalised_probs, axis=1)
        
        # Initialise an array to hold the transformed values
        transformed_probabilities = np.zeros_like(max_indices, dtype=float)
        
        # Iterate over each row
        for i, index in enumerate(max_indices):
            if index == 0:  # If the max value comes from the second column
                transformed_probabilities[i] = 1 - normalised_probs[i, index]
            else:  # If the max value comes from the first column
                transformed_probabilities[i] = normalised_probs[i, index]

        return transformed_probabilities, self.prior_normal_probs

In [23]:
class LogisticRegressionClassifier:
    def __init__(self, training_data, training_cycles, learning_rate, lr_bias, buffer_divisor, lambda_penalty):
        self.training_data = training_data
        self.training_cycles = training_cycles
        self.learning_rate = learning_rate
        self.lr_bias = lr_bias
        self.buffer_divisor = buffer_divisor
        self.lambda_penalty = lambda_penalty
    

    def train(self, features, constant, training_features, training_labels):
        errors = [] 
        train_count = training_features.shape[1] 

        for cycle in range(self.training_cycles):
            
            # Calculate the linear combination of the features multiplied by their respected weights plus the constant. Feature array transpose needed for multiplication 
            logistic_input = np.dot(features.T, training_features) + constant
            probability = self.sigmoid_function(logistic_input)
            
            # Feature gradient decent using Lasso regularisation to add a penalty
            feature_gradient = (1 / train_count) * np.dot(training_features, (probability - training_labels).T)
            feature_gradient += (self.lambda_penalty / train_count) * np.sign(features)

            # Constant gradient decent 
            constant_gradient = (1 / train_count) * np.sum(probability - training_labels)

            # Update the values of the features and constant weightings
            features = features - self.learning_rate * feature_gradient
            constant = constant - self.learning_rate * constant_gradient

        return features, constant


    
    def sigmoid_function(self, logistic_input):
        
        # Predict the probability that logistic_input belongs to class 1. Clip the input to prevent overflow of small numbers.
        logistic_input = np.clip(logistic_input, -1000, 1000) 
        probabilities = 1 / (1 + np.exp(-logistic_input))
        
        return probabilities


    
    def classify(self, features, constant, test_data):
        # Create array for prediction same length as the amount of test_data elements
        test_data_count = test_data.shape[1]  

        # Compute logistic predictions using trained features weight and trained constant 
        probabilities = self.sigmoid_function(np.dot(features.T, test_data) + constant)

        return probabilities
    

    
    def predict(self, test_data):

        # Split the training data 
        training_labels = self.training_data[:, 0].astype(int)
        training_features = self.training_data[:, 1:].astype(int)
        
        # Change the shape of the feature and training arrays so they can be used for matrix math calculations 
        training_features = training_features.T
        training_labels = training_labels.reshape(1, -1)
        
        # Train the model using gradient descent
        default_features = np.zeros((training_features.shape[0], 1))
        default_constant = 0
        features, constant = self.train(default_features, default_constant, training_features, training_labels)
        
        # Make predictions on the test data
        lr_probabilities = self.classify(features, constant, test_data.T)
        
        # Instantiate the bayes classifier, run predict and return probabilities 
        naive_bayes = NaiveBayesClassifier(self.training_data, test_data)
        bayes_probabilities, prior_probabilities = naive_bayes.predict()

        # Combining the probabilities 
        same_side_condition = ((lr_probabilities > 0.5) & (bayes_probabilities > 0.5)) | ((lr_probabilities < 0.5) & (bayes_probabilities < 0.5))
        average_values = (lr_probabilities * self.lr_bias + bayes_probabilities) / 2
        combined_probabilities = np.where(same_side_condition, average_values, lr_probabilities)
        
        # Using prior probs to add a bias within the buffer to 0.5
        max_key = max(prior_probabilities, key=prior_probabilities.get)
        max_value = prior_probabilities[max_key]
        buffer = max_value - 0.5
        
        # Adjusting decision thresholds based on the buffer
        upper_threshold = 0.5 + buffer / self.buffer_divisor
        lower_threshold = 0.5 - buffer / self.buffer_divisor

        print(buffer, (buffer / self.buffer_divisor))
        
        # Condition to check if probabilities are decisively above or below the adjusted thresholds
        buffer_condition = (combined_probabilities > upper_threshold) | (combined_probabilities < lower_threshold)
        
        # Keep lr_probabilities where buffer_condition is true, otherwise, use a default probability
        combined_probabilities_buf = np.where(buffer_condition, combined_probabilities, max_value)
        
        # Generating predictions based on the adjusted combined_probabilities
        predictions = (combined_probabilities_buf > 0.5).astype(int)

        probabilities = [bayes_probabilities, lr_probabilities, combined_probabilities, combined_probabilities_buf]

        return predictions, probabilities

# 10000, 0.335, 0.6, 0.6, -0.4)
# 9000, 0.32, 0.5, 0.58, -0.38)

def create_classifier():
    classifier = LogisticRegressionClassifier(training_spam, 10000, 0.335, 0.6, 0.6, -0.4)
    return classifier

classifier = create_classifier()
predictions, probabilities = classifier.predict(testing_spam[:, 1:])

for probability in probabilities:
    predictions = (probability > 0.5).astype(int)
    accuracy = np.count_nonzero(predictions == testing_spam[:, 0])/testing_spam[:, 0].shape[0]
    print(accuracy)

0.10933333333333328 0.18222222222222215
0.896
0.938
0.936
0.95


In [4]:
# This is the class I used to tune my logistic - bayes classifier. There are 6 different hyperparameters that I tuned 

class HyperparameterOptimiser:
    def __init__(self, data, training_cycle_params, learning_rate_params, lr_bias_params, buffer_divisor_params, lambda_params, data_split_count, data_test_ratio, data_split_toggle):
        self.data = data
        self.training_cycle_params = training_cycle_params
        self.learning_rate_params = learning_rate_params
        self.lr_bias_params = lr_bias_params
        self.buffer_divisor_params = buffer_divisor_params
        self.lambda_params = lambda_params
        self.data_split_count = data_split_count
        self.data_test_ratio = data_test_ratio
        self.optimal_params = {}
        self.best_accuracy = 0
        self.data_split_toggle = data_split_toggle

    # Grid search to loop through all my hyperparameters 
    
    def grid_search(self):
        best_accuracy = 0
        optimised_parameters = {'learning_rate': None, 'training_cycles': None}
        
        for learning_rate in self.learning_rate_params:
            print('Learning Rate: ', learning_rate)
            for cycles in self.training_cycle_params:
                print('Cycle: ', cycles)
                for lr_bias in self.lr_bias_params:
                    print('lr_bias: ', lr_bias)
                    for buffer_divisor in self.buffer_divisor_params:
                        print('buffer_divisor: ', buffer_divisor)
                        for lambda_ in self.lambda_params:
                            print('lambda_: ', lambda_)
 
                            self.data_run(learning_rate, cycles, lr_bias, buffer_divisor, lambda_)

    
    
    def data_run(self, learning_rate, cycles, lr_bias, buffer_divisor, lambda_):

        if self.data_split_toggle == False:
            test_data = testing_spam
            train_data = training_spam
            classifier = LogisticRegressionClassifier(train_data, int(cycles), learning_rate, lr_bias, buffer_divisor, lambda_)
            predictions = classifier.predict(test_data[:, 1:])[0]
            accuracy = np.count_nonzero(predictions == test_data[:, 0])/test_data[:, 0].shape[0]
        else:
            dataset_size = self.data.shape[0]
            test_size = int(dataset_size * self.data_test_ratio)
            split_accuracies = np.array([])
            
            for _ in range(self.data_split_count):
                np.random.shuffle(data)
                test_data = self.data[:test_size]
                train_data = self.data[test_size:]
            
                classifier = LogisticRegressionClassifier(train_data, int(cycles), learning_rate, lr_bias, buffer_divisor, lambda_)
                predictions = classifier.predict(test_data[:, 1:])[0]
                accuracy = np.count_nonzero(predictions == test_data[:, 0])/test_data[:, 0].shape[0]

                split_accuracies = np.append(split_accuracies, accuracy)
                print('Split ' + str(_), 'Accuracy: ', accuracy)
            
            print('Average accuracy: ', np.mean(split_accuracies))
            print('\n')
            accuracy = np.mean(split_accuracies)
        
        if accuracy > self.best_accuracy:
            self.best_accuracy = accuracy
            self.optimal_params['learning_rate'] = learning_rate
            self.optimal_params['training_cycles'] = cycles
            self.optimal_params['lr_bias'] = lr_bias
            self.optimal_params['buffer_divisor'] = buffer_divisor
            self.optimal_params['lambda_'] = lambda_
            print('\n')
            print(f"New best accuracy: {self.best_accuracy:.4f} with Cycles={cycles}, LR={learning_rate}, LR Bias={lr_bias}, Buffer Divisor={buffer_divisor}, Lambda={lambda_}")
            print('\n')

    

    def run(self):
        optimised_parameters = self.grid_search()
        print("Optimised Parameters: ", self.optimal_params)
        print("Best Accuracy: ", self.best_accuracy)




data = np.genfromtxt(open("data/all_spam_data.csv"), delimiter=",", encoding='utf-8-sig')

hyperparameter_optimiser = HyperparameterOptimiser( 
            data, 
            training_cycle_params = np.arange(9000, 11000, 500).tolist(), 
            learning_rate_params = np.arange(0.32, 0.345, 0.005).tolist(), 
            lr_bias_params = np.arange(0.5, 0.7, 0.02).tolist(), 
            buffer_divisor_params = np.arange(0.5, 0.7, 0.02).tolist(), 
            lambda_params = np.arange(-0.5, -0.3, 0.02).tolist(), 
            data_split_count = 10, 
            data_test_ratio = 0.33,
            data_split_toggle = False )

# hyperparameter_optimiser.run()

# New best accuracy: 0.9500 with Cycles=10000, LR=0.335, LR Bias=0.6, Buffer Divisor=0.6, Lambda=-0.40000000000000013