In [21]:
import os
import numpy as np
from tqdm import tqdm
from collections import Counter
import matplotlib.pyplot as plt
from dataset import dataset
#from knn import knn
#from enn import enn
#from kmeans import kmeans

# All files were developed collaboratively

In [22]:
class knn:
    def __init__(self, data: dataset, prediction_type_flag: str, k_n=1, sigma=1.0, suppress_plots=True):
        '''
        - Set a variable equal to the tune and validation sets
        - instantiate self variables
        '''
        self.suppress_plots = suppress_plots
        self.k_n = k_n
        self.sigma = sigma
        self.tune_set = data.tune_set
        self.validate_set = data.validate_set
        self.prediction_type = prediction_type_flag
        self.predictions = []
        self.answers = []
        return
    def plot_loss(self, metrics: list, parameter: str, increment):
        # Extract the number of epochs and loss metrics
        metrics = np.array(metrics)
        epochs = np.arange(1, metrics.shape[0] + 1) * increment  # Assuming epochs start from 1
        loss1 = metrics[:, 0]  # First loss metric
        loss2 = metrics[:, 1]  # Second loss metric

        # Plotting
        plt.figure(figsize=(10, 6))
        plt.plot(epochs, loss1, label='Loss Metric 1', marker='o')
        plt.plot(epochs, loss2, label='Loss Metric 2', marker='o')

        # Adding labels and title
        plt.xlabel(f'{parameter} Value')
        plt.ylabel('Loss')
        plt.title(f'Loss Metrics vs. {parameter} value')
        plt.legend()
        plt.grid(True)

        # Show the plot
        plt.show()
        plt.close()
    def tune(self, epochs=15, k_n_increment=1, sigma_increment=1):
        # CONSIDER ADDING INCREMENT PARAMETER, WHERE THE PARAMETER DECIDES HOW MUCH EACH PARAMETER
        # IS INCREMENTED PER EPOCH. SELF.K_N AND SELF.SIGMA WOULD NEED TO INITIALLY BE SET TO THE
        # INCREMENT, AND IN THE FINAL CALCULATION WHEN CHOOSING THE INDICE THE SELF.K_N/SIGMA WOULD
        # NEED TO BE MULTIPLIED BY THE INCREMENT
        '''
        SIGMA IS PRIMARILY AFFECTING THE MSE, CONSIDER ONLY USING MSE TO DETERMINE SIGMA
        '''
        '''
        Use default parameters to predict the tune set using each set of 9 partitions as the model.
        Performance should be calculated and averaged across the ENTIRE set of models with the given
        hyperparameter. A hyperparameter is incremented, and predictions is re-run. This process
        repeats until the desired number of epochs are reached.
        '''
        k_n_scores = []
        sigma_scores = []
        self.k_n = k_n_increment
        self.sigma = sigma_increment
        for i in tqdm(range(epochs), desc="Tuning K_n..."):
            self.k_n += k_n_increment
            if (self.prediction_type == 'regression'):
                k_n_scores.append(self.regress(True))
            else:
                k_n_scores.append(self.classify(True))
        if (self.suppress_plots == False):
            self.plot_loss(k_n_scores, 'K_n', k_n_increment)
            

        if (self.prediction_type == 'regression'):    
            for i in tqdm(range(epochs), desc="Tuning sigma..."):
                self.sigma += sigma_increment
                sigma_scores.append(self.regress(True))
            if (self.suppress_plots == False):
                self.plot_loss(sigma_scores, 'Sigma', sigma_increment)

        k_n_scores = np.array(k_n_scores)
        if (self.prediction_type == 'regression'):
            best_k_n_epochs = np.argmin(k_n_scores, axis=0)
        else:
            best_k_n_epochs = np.argmax(k_n_scores, axis=0)
        self.k_n = (round(np.mean(best_k_n_epochs)) + 1) * k_n_increment
        print(f"Tuned k_n: {self.k_n}")
        if (self.prediction_type == 'regression'):
            # CURRENTLY IS ONLY USING MSE TO TUNE SIGMA
            sigma_scores = np.array(sigma_scores)
            best_sigma_epochs = np.argmin(sigma_scores, axis=0)
            self.sigma = (round(np.mean(best_sigma_epochs[0] + 1))) * sigma_increment
            print(f"Tuned sigma: {self.sigma}")
    def classify(self, tuning_flag=False, demo=False):
        '''
        classify holdout set repeat for each fold
        '''
        Loss_values = np.zeros((10, 2))
        predictions = []
        answers = []
        hold_out_fold = self.tune_set
        for fold_idx in tqdm(range(10), leave=False):
            if (tuning_flag == False):
                hold_out_fold = self.validate_set[fold_idx]
            model = np.concatenate([self.validate_set[i] for i in range(10) if i != fold_idx])
            #print(model.shape)
            #print(hold_out_fold.shape)

            for i, test_point in enumerate(hold_out_fold):
                if (test_point[0] != 'null'):
                    true_label = test_point[-1]
                    neighbor_indices = self.get_neighbors(model, test_point, self.k_n)
                    #print(f"Neighbor Indices:\n{neighbor_indices}")
                    nearest_neighbors = model[neighbor_indices]
                    neighbor_labels = model[neighbor_indices, -1]
                    #print(f"Neighbor Labels: {neighbor_labels}")
                    label_counts = Counter(neighbor_labels)
                    predicted_label = label_counts.most_common(1)[0][0]
                    if (demo and fold_idx == 0 and i == 0):
                        print(f"Point being classified:\n{test_point}")
                        print(f"Nearest neighbors:\n{nearest_neighbors}")
                        print(f"Predicted class:\n{predicted_label}")
                    predictions.append(float(predicted_label))
                    answers.append(true_label)

            self.predictions = np.array(predictions)
            #print(self.predictions)
            self.predictions = np.rint(self.predictions).astype(int).astype(str)
            self.answers = np.array(answers).astype(float)
            self.answers = np.rint(self.answers).astype(int).astype(str)
            #print(f"Predictions: {self.predictions}")
            #print(f"Answers: {self.answers}")
            Loss_values[fold_idx] = self.calculate_loss()
            predictions = []
            answers = []

        if tuning_flag:
            average_loss = np.mean(Loss_values, axis=0)
            return average_loss  
        else:
            print(f"Loss: {Loss_values}")
            return Loss_values
        
    def RBF(self, test_point, nearest_neighbors, sigma):
        neighbor_values = nearest_neighbors[:, -1]
        print(f"Neighbor Values:\n{neighbor_values}")
        distances = np.array([np.linalg.norm(test_point[:-1].astype(float) - neighbor[:-1].astype(float)) for neighbor in nearest_neighbors])
        print(f"Distances from the test point to each of the neighbors:\n{distances}")
        rbf_weights = np.exp(- (distances ** 2) / (2 * sigma ** 2))
        print(f"Weight applied to each point based on its distance from the test point:\n{rbf_weights}")
        #print(f"Should be equal to last indice of the nearest neighbors: {nearest_neighbors[:, -1]}")
        weighted_sum = np.sum(rbf_weights * nearest_neighbors[:, -1].astype(float))
        print(f"Respective weight * respective neighbor value, all summed together:\n{weighted_sum}")
        weight_total = np.sum(rbf_weights)
        print(f"Sum of the RBF weights:\n{weight_total}")
        predicted_value = weighted_sum / weight_total if weight_total != 0 else np.mean(neighbor_values.astype(float))
        print(f"Final predicted value (weighted sum of neighbor values/RBF weight sum):\n{predicted_value}")
    
    def regress(self, tuning_flag=False, demo=False):
        '''
        regress each hold out set repeat for each fold
        '''
        Loss_values = np.zeros((10, 2))  
        predictions = []
        answers = []
        hold_out_fold = self.tune_set
        for fold_idx in tqdm(range(10), leave=False):
            if (tuning_flag == False):
                hold_out_fold = self.validate_set[fold_idx]
            model = np.concatenate([self.validate_set[i] for i in range(10) if i != fold_idx])
            #print(model.shape)
            #print(hold_out_fold.shape)

            for i, test_point in enumerate(hold_out_fold):
                if (test_point[0] != 'null'):
                    true_label = test_point[-1]
                    neighbor_indices = self.get_neighbors(model, test_point, self.k_n)
                    #print(f"Neighbor Indices:\n{neighbor_indices}")
                    nearest_neighbors = model[neighbor_indices]
                    #print(f"Nearest Neighbors: {nearest_neighbors}")
                    neighbor_values = nearest_neighbors[:, -1]

                    distances = np.array([np.linalg.norm(test_point[:-1].astype(float) - neighbor[:-1].astype(float)) for neighbor in nearest_neighbors])
                
                    rbf_weights = np.exp(- (distances ** 2) / (2 * self.sigma ** 2))
                    #print(f"Should be equal to last indice of the nearest neighbors: {nearest_neighbors[:, -1]}")
                    weighted_sum = np.sum(rbf_weights * nearest_neighbors[:, -1].astype(float))
                    weight_total = np.sum(rbf_weights)

                    predicted_value = weighted_sum / weight_total if weight_total != 0 else np.mean(neighbor_values.astype(float))
                    if (demo and fold_idx == 0 and i == 0):
                        print(f"Point being regressed:\n{test_point}")
                        print(f"Nearest neighbors:\n{nearest_neighbors}")
                        print(f"Predicted value:\n{predicted_value}")
                    predictions.append(predicted_value)
                    answers.append(true_label)

            self.predictions = np.array(predictions)
            self.answers = np.array(answers)

            # Calculate loss for the current fold and store it
            Loss_values[fold_idx] = self.calculate_loss()

        if tuning_flag:
            average_loss = np.mean(Loss_values, axis=0)
            return average_loss  
        else:
            return Loss_values
    def calculate_loss(self):
            '''
            Classifiction: 0/1 loss, F1 score
            Regression: Mean squared error, Mean absolute

            '''
            loss = []
            if(self.prediction_type == "classification"):
                accuracy = np.mean(self.predictions == self.answers)
                loss.append(float(accuracy))

                unique_classes = np.unique(self.answers)
                f1_scores = []
                for cls in unique_classes:
                    true_positives = sum((self.predictions == cls) & (self.answers == cls))
                    predicted_positives = sum(self.predictions == cls)
                    actual_positives = sum(self.answers == cls)

                    precision = true_positives / predicted_positives if predicted_positives > 0 else 0
                    recall = true_positives / actual_positives if actual_positives > 0 else 0

                    if precision + recall > 0:
                        f1 = 2 * (precision * recall) / (precision + recall)
                    else:
                        f1 = 0
                    f1_scores.append(f1)

                loss.append(float(np.mean(f1_scores)))

            else:
                mse = np.mean(self.answers.astype(float) - self.predictions.astype(float)) ** 2
                loss.append(float(mse))

                mae = np.mean(np.abs(self.answers.astype(float) - self.predictions.astype(float)))
                loss.append(float(mae))
            return loss
    def euclidean_distance(self, point1: np, point2: np):
        # np.linalg.norm calculates the euclidean distances between two points
        #print(f"Point 1 type: {point1.shape}")
        #print(f"Point 2 type: {point2.shape}")
        return np.linalg.norm(point1 - point2)
    def get_neighbors(self, model: np, test_point: np, k_n: int):
        '''
        - Feed this function a NxN numpy array where the first dimension is num of examples and the second dimension is num of freatures
        - The second argument is the reference point
        - the third argument is the point that is being referenced for distances
        - The method returns the class/regression value of the k_n nearest neighbors
        '''
        #print(f"Model shape: {model.shape}")
        distances = np.zeros((model.shape[0]), dtype=float)
        #print(f"Distances Shape: {distances.shape}")
        for i, model_point in enumerate(model):
            # calculate euclidean distance
            # COULD ALWAYS SWAP THIS FUNCTION CALL FOR THE ONE LINER
            if (model_point[0] != "null"):
                #print(f"test point: {test_point}")
                #print(f"model point: {model_point}")
                distances[i] = self.euclidean_distance(test_point[:-1].astype(float), model_point[:-1].astype(float))
            else:
                distances[i] = 10000000
        # np.partitions moves the K_n smallest values in an np array to the front of the array. We then slice the array to get the k_n smallest values
        smallest_distances = np.partition(distances, k_n)[:k_n]
        #print(f"Smallest distances: {smallest_distances}")
        neighbor_indices = np.where(np.isin(distances, smallest_distances))[0]
        #print(f"Neighbor Indices:\n{neighbor_indices}")
        nearest_neighbors = model[neighbor_indices]
        #print(type(nearest_neighbors))
        # CURRENTLY RETURNS THE INDICES OF THE NEAREST NEIGHBORS
        return neighbor_indices

In [23]:
class enn:
    def __init__(self, data: dataset, prediction_type_flag: str, k_n=1, sigma=1.0, epsilon=1, suppress_plots=True):
        '''
        - Set a variable equal to the tune and validation sets
        - instantiate self variables
        '''
        self.suppress_plots = suppress_plots
        self.k_n = k_n
        self.sigma = sigma
        self.epslion = epsilon
        self.tune_set = data.tune_set
        self.validate_set = data.validate_set
        self.prediction_type = prediction_type_flag
        self.predictions = []
        self.answers = []
        self.reduced_models = []
    def plot_loss(self, metrics: list, parameter: str, increment):
        # Extract the number of epochs and loss metrics
        metrics = np.array(metrics)
        epochs = np.arange(1, metrics.shape[0] + 1) * increment  # Assuming epochs start from 1
        loss1 = metrics[:, 0]  # First loss metric
        loss2 = metrics[:, 1]  # Second loss metric

        # Plotting
        plt.figure(figsize=(10, 6))
        plt.plot(epochs, loss1, label='Loss Metric 1', marker='o')
        plt.plot(epochs, loss2, label='Loss Metric 2', marker='o')

        # Adding labels and title
        plt.xlabel(f'{parameter} Value')
        plt.ylabel('Loss')
        plt.title(f'Loss Metrics vs. {parameter} value')
        plt.legend()
        plt.grid(True)

        # Show the plot
        plt.show()
        plt.close()
    def tune(self, epochs=15, k_n_increment=1, sigma_increment=1, demo=False):
        '''
        for fold_idx, fold in enumerate(self.validate_set):
            print(f"Number of examples in OG data fold {fold_idx+1}: {len(fold)}")
        for fold_idx, fold in enumerate(reduced_models):
            print(f"Number of examples in reduced data fold {fold_idx+1}: {len(fold)}")
        '''
        if (demo):
            self.reduced_models = self.reduce_dataset(self.validate_set, demo=True)
        else:
            self.reduced_models = self.reduce_dataset(self.validate_set, demo=True)
        k_n_scores = []
        sigma_scores = []
        self.k_n = k_n_increment
        self.sigma = sigma_increment
        for i in tqdm(range(epochs), desc="Tuning K_n..."):
            self.k_n += k_n_increment
            if (self.prediction_type == 'regression'):
                k_n_scores.append(self.regress(True))
            else:
                k_n_scores.append(self.classify(True))
        if (self.suppress_plots == False):
            self.plot_loss(k_n_scores, 'K_n', k_n_increment)
            

        if (self.prediction_type == 'regression'):    
            for i in tqdm(range(epochs), desc="Tuning sigma..."):
                self.sigma += sigma_increment
                sigma_scores.append(self.regress(True))
            if (self.suppress_plots == False):
                self.plot_loss(sigma_scores, 'Sigma', sigma_increment)

        k_n_scores = np.array(k_n_scores)
        if (self.prediction_type == 'regression'):
            best_k_n_epochs = np.argmin(k_n_scores, axis=0)
        else:
            best_k_n_epochs = np.argmax(k_n_scores, axis=0)
        self.k_n = (round(np.mean(best_k_n_epochs)) + 1) * k_n_increment
        print(f"Tuned k_n: {self.k_n}")
        if (self.prediction_type == 'regression'):
            # CURRENTLY IS ONLY USING MSE TO TUNE SIGMA
            sigma_scores = np.array(sigma_scores)
            best_sigma_epochs = np.argmin(sigma_scores, axis=0)
            self.sigma = (round(np.mean(best_sigma_epochs[0] + 1))) * sigma_increment
            print(f"Tuned sigma: {self.sigma}")
    def reduce_dataset(self, initial_set: np, epsilon = 0.05, demo=False):
        reduced_models = []
        padded_folds = []

        for fold_idx in tqdm(range(10), desc="reducing dataset...", leave=False):
            removal_indices = []
            model = np.concatenate([initial_set[i] for i in range(10) if i != fold_idx])
            #print(f"Fold {fold_idx} Model Shape: {model.shape}")
            #print(hold_out_fold.shape)
            for test_point_idx, test_point in enumerate(model):
                if (test_point[0] != 'null'):
                    # Create a new array excluding the test point
                    self_classify_model = np.delete(model, test_point_idx, axis=0)
                    true_label = test_point[-1]
                    neighbor_indices = self.get_neighbors(self_classify_model, test_point, self.k_n)
                    #print(f"Neighbor Indices:\n{neighbor_indices}")
                    if (self.prediction_type == "classification"):
                        neighbor_labels = self_classify_model[neighbor_indices, -1]
                        #print(f"Neighbor Labels: {neighbor_labels}")
                        label_counts = Counter(neighbor_labels)
                        predicted_label = label_counts.most_common(1)[0][0]
                        if (predicted_label != true_label):
                            removal_indices.append(test_point_idx)
                    else:
                        nearest_neighbors = self_classify_model[neighbor_indices]
                        #print(f"Nearest Neighbors: {nearest_neighbors}")
                        neighbor_values = nearest_neighbors[:, -1]
                        distances = np.array([np.linalg.norm(test_point[:-1].astype(float) - neighbor[:-1].astype(float)) for neighbor in nearest_neighbors])
                        rbf_weights = np.exp(- (distances ** 2) / (2 * self.sigma ** 2))
                        #print(f"Should be equal to last indice of the nearest neighbors: {nearest_neighbors[:, -1]}")
                        weighted_sum = np.sum(rbf_weights * nearest_neighbors[:, -1].astype(float))
                        weight_total = np.sum(rbf_weights)

                        predicted_value = weighted_sum / weight_total if weight_total != 0 else np.mean(neighbor_values.astype(float))
                        if ((abs(float(predicted_value) - float(true_label)) <= epsilon * float(predicted_value)) == False):
                            removal_indices.append(test_point_idx)
            #print(f"Fold {fold_idx+1} Shape: {np.delete(model, removal_indices, axis=0).shape}")
            reduced_models.append(np.delete(model, removal_indices, axis=0))
        
        #print(reduced_models[0])
        max_rows = max(fold.shape[0] for fold in reduced_models)
        # Pad each array to have the same number of rows (max_rows)
        for fold in reduced_models:
            pad_width = max_rows - fold.shape[0]
            padded_fold = np.pad(fold, ((0, pad_width), (0, 0)), mode='constant', constant_values='null')
            padded_folds.append(padded_fold)
        # Stack the padded arrays into a 3D array
        padded_reduced_models = np.stack(padded_folds)
        print(padded_reduced_models.shape)
        if (demo == True):
            print(f"Example removed from original dataset:\n{model[removal_indices[0]]}")
            print(f"Does the example exist in the original dataset? - {np.any(np.all(initial_set == model[removal_indices[0]], axis=2))}")
            print(f"Does the example exist in the reduced dataset? - {np.any(np.all(padded_reduced_models == model[removal_indices[0]], axis=2))}")
        return padded_reduced_models
        #else: # regression


        #return
    def classify(self, tuning_flag=False):
        '''
        classify holdout set repeat for each fold
        '''
        Loss_values = np.zeros((10, 2))
        predictions = []
        answers = []
        hold_out_fold = self.tune_set
        for fold_idx in tqdm(range(10), leave=False):
            if (tuning_flag == False):
                hold_out_fold = self.validate_set[fold_idx]
            model = self.reduced_models[fold_idx]
            #print(model.shape)
            #print(hold_out_fold.shape)

            for test_point in hold_out_fold:
                if (test_point[0] != 'null'):
                    true_label = test_point[-1]
                    neighbor_indices = self.get_neighbors(model, test_point, self.k_n)
                    #print(f"Neighbor Indices:\n{neighbor_indices}")
                    neighbor_labels = model[neighbor_indices, -1]
                    #print(f"Neighbor Labels: {neighbor_labels}")
                    label_counts = Counter(neighbor_labels)
                    predicted_label = label_counts.most_common(1)[0][0]

                    predictions.append(float(predicted_label))
                    answers.append(true_label)

            self.predictions = np.array(predictions)
            self.predictions = np.rint(self.predictions).astype(int).astype(str)
            self.answers = np.array(answers).astype(float)
            self.answers = np.rint(self.answers).astype(int).astype(str)
            #print(f"Predictions: {self.predictions}")
            #print(f"Answers: {self.answers}")
            Loss_values[fold_idx] = self.calculate_loss()
            predictions = []
            answers = []

        if tuning_flag:
            average_loss = np.mean(Loss_values, axis=0)
            return average_loss  
        else:
            print(f"Loss: {Loss_values}")
            return Loss_values  
    def regress(self, tuning_flag=False):
        '''
        regress each hold out set repeat for each fold
        '''
        Loss_values = np.zeros((10, 2))  
        predictions = []
        answers = []
        hold_out_fold = self.tune_set
        for fold_idx in tqdm(range(10), leave=False):
            if (tuning_flag == False):
                hold_out_fold = self.validate_set[fold_idx]
            model = self.reduced_models[fold_idx]
            #print(model.shape)
            #print(hold_out_fold.shape)

            for test_point in hold_out_fold:
                if (test_point[0] != 'null'):
                    true_label = test_point[-1]
                    neighbor_indices = self.get_neighbors(model, test_point, self.k_n)
                    #print(f"Neighbor Indices:\n{neighbor_indices}")
                    nearest_neighbors = model[neighbor_indices]
                    nearest_neighbors = nearest_neighbors[~np.any(nearest_neighbors == 'null', axis=1)]
                    #print(f"Nearest Neighbors: {nearest_neighbors}")
                    neighbor_values = nearest_neighbors[:, -1]

                    distances = np.array([np.linalg.norm(test_point[:-1].astype(float) - neighbor[:-1].astype(float)) for neighbor in nearest_neighbors if neighbor[0] != 'null'])
                
                    rbf_weights = np.exp(- (distances ** 2) / (2 * self.sigma ** 2))
                    #print(f"Should be equal to last indice of the nearest neighbors: {nearest_neighbors[:, -1]}")
                    weighted_sum = np.sum(rbf_weights * nearest_neighbors[:, -1].astype(float))
                    weight_total = np.sum(rbf_weights)

                    predicted_value = weighted_sum / weight_total if weight_total != 0 else np.mean(neighbor_values.astype(float))

                    predictions.append(predicted_value)
                    answers.append(true_label)
                    
            self.predictions = np.array(predictions)
            self.answers = np.array(answers)

            # Calculate loss for the current fold and store it
            Loss_values[fold_idx] = self.calculate_loss()

        if tuning_flag:
            average_loss = np.mean(Loss_values, axis=0)
            return average_loss  
        else:
            return Loss_values
    def calculate_loss(self):
            '''
            Classifiction: 0/1 loss, F1 score
            Regression: Mean squared error, Mean absolute

            '''
            loss = []
            if(self.prediction_type == "classification"):
                accuracy = np.mean(self.predictions == self.answers)
                loss.append(float(accuracy))

                unique_classes = np.unique(self.answers)
                f1_scores = []
                for cls in unique_classes:
                    true_positives = sum((self.predictions == cls) & (self.answers == cls))
                    predicted_positives = sum(self.predictions == cls)
                    actual_positives = sum(self.answers == cls)

                    precision = true_positives / predicted_positives if predicted_positives > 0 else 0
                    recall = true_positives / actual_positives if actual_positives > 0 else 0

                    if precision + recall > 0:
                        f1 = 2 * (precision * recall) / (precision + recall)
                    else:
                        f1 = 0
                    f1_scores.append(f1)

                loss.append(float(np.mean(f1_scores)))

            else:
                mse = np.mean(self.answers.astype(float) - self.predictions.astype(float)) ** 2
                loss.append(float(mse))

                mae = np.mean(np.abs(self.answers.astype(float) - self.predictions.astype(float)))
                loss.append(float(mae))
            return loss
    def euclidean_distance(self, point1: np, point2: np):
        # np.linalg.norm calculates the euclidean distances between two points
        #print(f"Point 1 type: {point1.shape}")
        #print(f"Point 2 type: {point2.shape}")
        return np.linalg.norm(point1 - point2)
    def get_neighbors(self, model: np, test_point: np, k_n: int):
        '''
        - Feed this function a NxN numpy array where the first dimension is num of examples and the second dimension is num of freatures
        - The second argument is the reference point
        - the third argument is the point that is being referenced for distances
        - The method returns the class/regression value of the k_n nearest neighbors
        '''
        #print(f"Model shape: {model.shape}")
        distances = np.zeros((model.shape[0]), dtype=float)
        #print(f"Distances Shape: {distances.shape}")
        for i, model_point in enumerate(model):
            # calculate euclidean distance
            # COULD ALWAYS SWAP THIS FUNCTION CALL FOR THE ONE LINER
            if (model_point[-1] != "null"):
                #print(f"test point: {test_point}")
                #print(f"model point: {model_point}")
                distances[i] = self.euclidean_distance(test_point[:-1].astype(float), model_point[:-1].astype(float))
            else:
                distances[i] = float('inf')
        # np.partitions moves the K_n smallest values in an np array to the front of the array. We then slice the array to get the k_n smallest values
        #smallest_distances = np.partition(distances, k_n)[:k_n]
        #print(f"Smallest distances: {smallest_distances}")
        neighbor_indices = np.argsort(distances)[:k_n]
        #print(f"Neighbor Indices:\n{neighbor_indices}")
        nearest_neighbors = model[neighbor_indices]
        #print(type(nearest_neighbors))
        # CURRENTLY RETURNS THE INDICES OF THE NEAREST NEIGHBORS
        return neighbor_indices

In [24]:
class kmeans:
    def __init__(self, data: dataset, prediction_type_flag: str, k_c = 1, k_n = 1, sigma = 1.0, suppress_plots=True):
        '''
        - Set a variable equal to the tune and validation sets
        - instantiate self variables
        '''
        self.suppress_plots = suppress_plots
        self.k_n = k_n
        self.sigma = sigma
        self.k_c = k_c
        self.tune_set = data.tune_set
        self.validate_set = data.validate_set
        self.prediction_type = prediction_type_flag
        self.predictions = []
        self.answers = []
        self.centroids = []
    
    def plot_loss(self, metrics: list, parameter: str, increment):
        # Extract the number of epochs and loss metrics
        metrics = np.array(metrics)
        epochs = np.arange(1, metrics.shape[0] + 1) * increment  # Assuming epochs start from 1
        loss1 = metrics[:, 0]  # First loss metric
        loss2 = metrics[:, 1]  # Second loss metric

        # Plotting
        plt.figure(figsize=(10, 6))
        plt.plot(epochs, loss1, label='Loss Metric 1', marker='o')
        plt.plot(epochs, loss2, label='Loss Metric 2', marker='o')

        # Adding labels and title
        plt.xlabel(f'{parameter} Value')
        plt.ylabel('Loss')
        plt.title(f'Loss Metrics vs. {parameter} value')
        plt.legend()
        plt.grid(True)

        # Show the plot
        plt.show()
        plt.close()

    def tune(self, epochs=15, k_c_increment=1, k_n_increment=1, sigma_increment=1):
        '''
        Tune number of clusters (k_c), number of neighbors (k_n), and sigma (for regression).
        Performance is averaged across all 10 folds. This process repeats for a specified number
        of epochs with the hyperparameters incrementing on each epoch.
        '''

        # Initialize the tuning lists to store performance metrics
        k_c_scores = []
        k_n_scores = []
        sigma_scores = []
        
        # Initialize hyperparameters
        self.k_c = k_c_increment
        self.k_n = k_n_increment
        self.sigma = sigma_increment

        # Tune k_c (number of clusters)
        for i in tqdm(range(epochs), desc="Tuning K_c..."):
            self.k_c += k_c_increment
            self.cluster()  # Re-run clustering with updated k_c
            if self.prediction_type == 'regression':
                k_c_scores.append(self.regress(True))
            else:
                k_c_scores.append(self.classify(True))
        if (self.suppress_plots == False):
            self.plot_loss(k_c_scores, 'K_c', k_c_increment)
        
        # Tune k_n (number of neighbors)
        for i in tqdm(range(epochs), desc="Tuning K_n..."):
            self.k_n += k_n_increment
            if self.prediction_type == 'regression':
                k_n_scores.append(self.regress(True))
            else:
                k_n_scores.append(self.classify(True))
        if (self.suppress_plots == False):
            self.plot_loss(k_n_scores, 'K_n', k_n_increment)
        
        # Tune sigma (only for regression)
        if self.prediction_type == 'regression':
            for i in tqdm(range(epochs), desc="Tuning Sigma..."):
                self.sigma += sigma_increment
                sigma_scores.append(self.regress(True))
            if (self.suppress_plots == False):
                self.plot_loss(sigma_scores, 'Sigma', sigma_increment)

        
        k_c_scores = np.array(k_c_scores)
        if(self.prediction_type == 'classification'):
            best_k_c_epochs = np.argmax(k_c_scores, axis=0)
        else:
            best_k_c_epochs = np.argmin(k_c_scores,axis=0)
        self.k_c = (round(np.mean(best_k_c_epochs+1))) * k_c_increment
        print(f"Tuned k_c: {self.k_c}")

        k_n_scores = np.array(k_n_scores)
        if(self.prediction_type == 'classification'):
            best_k_n_epochs = np.argmax(k_n_scores, axis=0)
        else:
            best_k_n_epochs = np.argmin(k_n_scores,axis=0)
        self.k_n = (round(np.mean(best_k_n_epochs)) + 1) * k_n_increment
        print(f"Tuned k_n: {self.k_n}")

        if self.prediction_type == 'regression':
            sigma_scores = np.array(sigma_scores)
            best_sigma_epochs = np.argmin(sigma_scores, axis=0)
            self.sigma = (round(np.mean(best_sigma_epochs[0] + 1))) * sigma_increment
            print(f"Tuned sigma: {self.sigma}")

   
    def cluster(self, demo=False):
        
        centroids_list = []
        
        # Get into correct fold 
        for fold_idx in tqdm(range(10), leave=False): 
            
            model = np.concatenate([self.validate_set[i] for i in range(10) if i != fold_idx])
            model[model == 'null'] = np.nan 
                
            model = model.astype(float)

            
            model = model[~np.isnan(model).any(axis=1)]
            
            
            centroids = model[np.random.choice(model.shape[0], self.k_c, replace=False)]
            
            prev_centroids = np.copy(centroids)
            convergence_threshold = 0.05
            max_iterations = 50
            iteration = 0

            while iteration < max_iterations:
                
                distances = np.linalg.norm(model[:, np.newaxis] - centroids, axis=2)
                labels = np.argmin(distances, axis=1)

                if demo and fold_idx == 0:
                    print(f"Data point:\n{model[0]}\ndistances to centroids:\n{distances[0]}")
                    print(f"Data point {model[0]} assigned to cluster {labels[0]+1}\n\n")

                prev_centroids = centroids.copy()

                
                for i in range(self.k_c):
                    if np.any(labels == i): 
                        centroids[i] = np.nanmean(model[labels == i], axis=0)
                    else:
                        
                        centroids[i] = model[np.random.choice(model.shape[0])]

               
                relative_change = np.abs(centroids - prev_centroids) / (np.abs(prev_centroids) + 1e-10)
                
                if np.all(relative_change < convergence_threshold):
                    break

                iteration += 1

            if iteration == max_iterations:
                print("Warning: Maximum iterations reached without convergence.")
            
            
            centroids_list.append(centroids)


        self.centroids = np.array(centroids_list)
        #print(f"Final Centroids: {self.centroids}")  # Should be [10 x k_c x feature_count]


    
    def classify(self, tuning_flag = False, demo=False):
        '''
        classify holdout set repeat for each fold
        '''
        if (demo):
            self.cluster(demo=True)
        else:
            self.cluster()
        Loss_values = np.zeros((10, 2))
        predictions = []
        #print(f"self.k_c = {self.k_c}")
        answers = []
        hold_out_fold = self.tune_set
        for fold_idx in tqdm(range(10), leave=False):
            if (tuning_flag == False):
                hold_out_fold = self.validate_set[fold_idx]

            model = self.centroids[fold_idx]
            #print(model.shape)
            #print(hold_out_fold.shape)

            for test_point in hold_out_fold:
                if (test_point[0] != 'null'):
                    true_label = test_point[-1]
                    neighbor_indices = self.get_neighbors(model, test_point, self.k_n)
                    #print(f"Neighbor Indices:\n{neighbor_indices}")
                    neighbor_labels = model[neighbor_indices, -1]
                    #print(f"Neighbor Labels: {neighbor_labels}")
                    label_counts = Counter(neighbor_labels)
                    predicted_label = label_counts.most_common(1)[0][0]

                    predictions.append(predicted_label)
                    answers.append(true_label)

            self.predictions = np.array(predictions)
            self.predictions = np.rint(self.predictions).astype(int).astype(str)
            self.answers = np.array(answers).astype(float)
            self.answers = np.rint(self.answers).astype(int).astype(str)
        #print(f"Predictions: {self.predictions}")
        #print(f"Answers: {self.answers}")
            Loss_values[fold_idx] = self.calculate_loss()
            predictions = []
            answers = []
            #print(f"Loss Values: {Loss_values}")
        if tuning_flag:
            average_loss = np.mean(Loss_values, axis=0)
            return average_loss  
        else:
            print(f"Loss: {Loss_values}")
            return Loss_values   
        
    def regress(self, tuning_flag = False):
        self.cluster()
        predictions = []
        answers = []
        Loss_values = np.zeros((10, 2))  

        hold_out_fold = self.tune_set
        for fold_idx in tqdm(range(10), leave=False):
            if not tuning_flag:
                hold_out_fold = self.validate_set[fold_idx]

            model = self.centroids[fold_idx]

            for test_point in hold_out_fold:
                if test_point[0] != 'null':
                    true_label = test_point[-1]
                    neighbor_indices = self.get_neighbors(model, test_point, self.k_n)
                    nearest_neighbors = model[neighbor_indices]

                    distances = np.array([np.linalg.norm(test_point[:-1].astype(float) - neighbor[:-1].astype(float)) for neighbor in nearest_neighbors])
                    rbf_weights = np.exp(- (distances ** 2) / (2 * self.sigma ** 2))
                    weighted_sum = np.sum(rbf_weights * nearest_neighbors[:, -1].astype(float))
                    weight_total = np.sum(rbf_weights)

                    predicted_value = weighted_sum / weight_total if weight_total != 0 else np.mean(nearest_neighbors[:, -1].astype(float))

                    predictions.append(predicted_value)
                    answers.append(true_label)

            self.predictions = np.array(predictions)
            self.answers = np.array(answers)

            # Calculate loss for the current fold and store it
            Loss_values[fold_idx] = self.calculate_loss()

            
            predictions = []
            answers = []

        if tuning_flag:
            average_loss = np.mean(Loss_values, axis=0)
            return average_loss  
        else:
            return Loss_values


    def euclidean_distance(self, point1: np, point2: np):
        # np.linalg.norm calculates the euclidean distances between two points
        #print(f"Point 1 type: {point1.shape}")
        #print(f"Point 2 type: {point2.shape}")
        return np.linalg.norm(point1 - point2)
    def calculate_loss(self):
            '''
            Classifiction: 0/1 loss, F1 score
            Regression: Mean squared error, Mean absolute

            '''
            loss = []
            if(self.prediction_type == "classification"):
                #print(self.predictions)
                #print(f"Answers: {self.answers}")
                accuracy = np.mean(self.predictions == self.answers)
                loss.append(float(accuracy))

                unique_classes = np.unique(self.answers)
                f1_scores = []
                for cls in unique_classes:
                    true_positives = sum((self.predictions == cls) & (self.answers == cls))
                    predicted_positives = sum(self.predictions == cls)
                    actual_positives = sum(self.answers == cls)

                    precision = true_positives / predicted_positives if predicted_positives > 0 else 0
                    recall = true_positives / actual_positives if actual_positives > 0 else 0

                    if precision + recall > 0:
                        f1 = 2 * (precision * recall) / (precision + recall)
                    else:
                        f1 = 0
                    f1_scores.append(f1)

                loss.append(float(np.mean(f1_scores)))

            else:
                mse = np.mean(self.answers.astype(float) - self.predictions.astype(float)) ** 2
                loss.append(float(mse))

                mae = np.mean(np.abs(self.answers.astype(float) - self.predictions.astype(float)))
                loss.append(float(mae))
            return loss 
    def get_neighbors(self, model: np, test_point: np, k_n: int):
        '''
        - Feed this function a NxN numpy array where the first dimension is num of examples and the second dimension is num of freatures
        - The second argument is the reference point
        - the third argument is the point that is being referenced for distances
        - The method returns the class/regression value of the k_n nearest neighbors
        '''
        #print(f"Model shape: {model.shape}")
        

        distances = np.zeros(model.shape[0], dtype=float)
        #print(f"Distances Shape: {distances.shape}")
        for i, model_point in enumerate(model):
            # calculate euclidean distance
            # COULD ALWAYS SWAP THIS FUNCTION CALL FOR THE ONE LINER
            if (model_point[-1] != "null"):
                #print(f"test point: {test_point}")
                #print(f"model point: {model_point}")
                distances[i] = self.euclidean_distance(test_point[:-1].astype(float), model_point[:-1].astype(float))
            else:
                distances[i] = float('inf')
        # np.partitions moves the K_n smallest values in an np array to the front of the array. We then slice the array to get the k_n smallest values
        #smallest_distances = np.partition(distances, k_n)[:k_n]
        #print(f"Smallest distances: {smallest_distances}")
        neighbor_indices = np.argsort(distances)[:k_n]
        #print(f"Neighbor Indices:\n{neighbor_indices}")
        #print(type(nearest_neighbors))
        # CURRENTLY RETURNS THE INDICES OF THE NEAREST NEIGHBORS
        return neighbor_indices

# Pre-Processing

In [25]:
def process_all(user: str, shuffle_split: bool):
    abalone_data = dataset('/home/'+user+'/CSCI_447/Project_2/Datasets/abalone.data', False)
    cancer_data = dataset('/home/'+user+'/CSCI_447/Project_2/Datasets/breast-cancer-wisconsin.data', False)
    fire_data = dataset('/home/'+user+'/CSCI_447/Project_2/Datasets/forestfires.data', False)
    glass_data = dataset('/home/'+user+'/CSCI_447/Project_2/Datasets/glass.data', False)
    machine_data = dataset('/home/'+user+'/CSCI_447/Project_2/Datasets/machine.data', False)
    soybean_data = dataset('/home/'+user+'/CSCI_447/Project_2/Datasets/soybean-small.data', False)

    abalone_data.continuize()
    abalone_data.normalize()
    abalone_data.shuffle()
    abalone_data.sort('regression')
    abalone_data.split()
    abalone_data.fold()

    #cancer_data.continuize()
    cancer_data.shuffle()
    cancer_data.remove_attribute()
    cancer_data.impute()
    
    cancer_data.sort('classification')
    cancer_data.split()
    cancer_data.fold()

    fire_data.continuize()
    fire_data.normalize()
    fire_data.shuffle()
    fire_data.sort('regression')
    fire_data.split()
    fire_data.fold()

    glass_data.continuize()
    glass_data.remove_attribute()
    glass_data.shuffle()
    glass_data.sort('classification')
    glass_data.split()
    glass_data.fold()

    machine_data.continuize()
    machine_data.normalize()
    machine_data.shuffle()
    machine_data.sort('regression')
    machine_data.split()
    machine_data.fold()

    soybean_data.continuize()
    soybean_data.shuffle()
    soybean_data.sort('classification')
    soybean_data.split()
    soybean_data.fold()

    if (shuffle_split == True) :
        abalone_data.shuffle_splits()
        cancer_data.shuffle_splits()
        fire_data.shuffle_splits()
        glass_data.shuffle_splits()
        machine_data.shuffle_splits()
        soybean_data.shuffle_splits()

    abalone_data.save('abalone')
    cancer_data.save('cancer')
    fire_data.save('fire')
    glass_data.save('glass')
    machine_data.save('machine')
    soybean_data.save('soybean')

    return abalone_data, cancer_data, fire_data, glass_data, machine_data, soybean_data

In [26]:
abalone_data, cancer_data, fire_data, glass_data, machine_data, soybean_data = process_all('carlthedog3', True)

# K-Means Clustering Tuning + Prediction

In [27]:
glass_kmeans = kmeans(glass_data,'classification')
fire_kmeans = kmeans(fire_data, 'regression')

glass_kmeans.tune(k_c_increment=3)
fire_kmeans.tune()

glass_kmeans_results = glass_kmeans.classify();
fire_kmeans_results = fire_kmeans.regress();

Tuning K_c...: 100%|██████████| 15/15 [00:01<00:00,  7.92it/s]
Tuning K_n...: 100%|██████████| 15/15 [00:01<00:00,  8.37it/s]


Tuned k_c: 39
Tuned k_n: 2


Tuning K_c...: 100%|██████████| 15/15 [00:01<00:00,  8.59it/s]
Tuning K_n...: 100%|██████████| 15/15 [00:01<00:00,  7.53it/s]
Tuning Sigma...: 100%|██████████| 15/15 [00:02<00:00,  6.94it/s]


Tuned k_c: 8
Tuned k_n: 2
Tuned sigma: 9


                                      

Loss: [[0.6        0.46825397]
 [0.6        0.4993895 ]
 [0.68421053 0.57777778]
 [0.57894737 0.52478632]
 [0.68421053 0.4043956 ]
 [0.52631579 0.38290598]
 [0.68421053 0.5992674 ]
 [0.68421053 0.6       ]
 [0.68421053 0.53809524]
 [0.73684211 0.65836386]]


                                      

# KNN Tuning + Prediction

In [28]:
glass_knn = knn(glass_data, "classification")
fire_knn = knn(fire_data, 'regression')

glass_knn.tune()
fire_knn.tune()

glass_knn_results = glass_knn.classify();
fire_knn_results = fire_knn.regress();

Tuning K_n...: 100%|██████████| 15/15 [00:02<00:00,  5.42it/s]


Tuned k_n: 2


Tuning K_n...: 100%|██████████| 15/15 [00:16<00:00,  1.07s/it]
Tuning sigma...: 100%|██████████| 15/15 [00:16<00:00,  1.12s/it]


Tuned k_n: 7
Tuned sigma: 15


                                               

Loss: [[0.65       0.55677656]
 [0.6        0.5468254 ]
 [0.68421053 0.39722222]
 [0.63157895 0.54761905]
 [0.68421053 0.61313131]
 [0.63157895 0.51803752]
 [0.68421053 0.55835668]
 [0.73684211 0.6647619 ]
 [0.84210526 0.87518038]
 [0.57894737 0.4875    ]]


                                               

# Tuning set, Validation set (Validation set is split into 10 folds)

In [29]:
print(f"Soybean Tuning Set:\n{soybean_data.tune_set}")
print(f"Soybean Validation Set Shape:\n{soybean_data.validate_set.shape}")
print(f"Soybean Validation Set:\n{soybean_data.validate_set}")

Soybean Tuning Set:
[[0. 1. 2. 0. 0. 1. 1. 1. 1. 1. 1. 0. 0. 2. 2. 0. 0. 0. 1. 0. 1. 1. 0. 1.
  1. 0. 0. 3. 4. 0. 0. 0. 0. 0. 0. 2.]
 [6. 0. 2. 1. 0. 1. 1. 1. 0. 0. 1. 1. 0. 2. 2. 0. 0. 0. 1. 1. 3. 1. 1. 1.
  0. 0. 0. 0. 4. 0. 0. 0. 0. 0. 0. 0.]
 [2. 1. 2. 0. 0. 1. 1. 2. 0. 0. 1. 1. 0. 2. 2. 0. 0. 0. 1. 0. 1. 2. 0. 0.
  0. 0. 0. 3. 4. 0. 0. 0. 0. 0. 1. 3.]
 [1. 1. 2. 1. 0. 0. 1. 2. 1. 1. 1. 1. 0. 2. 2. 0. 0. 0. 1. 0. 2. 2. 0. 0.
  0. 0. 0. 3. 4. 0. 0. 0. 0. 0. 1. 3.]
 [3. 0. 0. 1. 0. 1. 2. 1. 0. 0. 1. 1. 0. 2. 2. 0. 0. 0. 1. 0. 0. 3. 0. 0.
  0. 2. 1. 0. 4. 0. 0. 0. 0. 0. 0. 1.]]
Soybean Validation Set Shape:
(10, 5, 36)
Soybean Validation Set:
[[['0.0' '1.0' '2.0' ... '0.0' '0.0' '2.0']
  ['4.0' '0.0' '0.0' ... '0.0' '0.0' '1.0']
  ['1.0' '1.0' '2.0' ... '0.0' '1.0' '3.0']
  ['2.0' '1.0' '1.0' ... '0.0' '1.0' '3.0']
  ['3.0' '0.0' '2.0' ... '0.0' '0.0' '0.0']]

 [['5.0' '0.0' '2.0' ... '0.0' '0.0' '0.0']
  ['3.0' '1.0' '1.0' ... '0.0' '1.0' '3.0']
  ['2.0' '1.0' '2.0' ... '0.0' '0.0' '

# Distance function demonstration

In [30]:
point_1 = np.array([1,2,3,4])
point_2 = np.array([4,5,6,7])
print(f"Distance function calculation (answer should be 6): {glass_knn.euclidean_distance(point_1, point_2)}")

Distance function calculation (answer should be 6): 6.0


# RBF kernel function demonstration

In [31]:
test_point = np.array([2.5, 3.0, 0.0]) # The zero in this test point is a placeholder
nearest_neighbors = np.array([
    [2.0, 3.5, 5.0], # Neighbor 1
    [3.0, 2.5, 4.0], # Neighbor 2
    [2.5, 3.0, 6.0], # Neighbor 3
    [3.5, 4.0, 3.0], # Neighbor 4
    [2.0, 2.0, 5.5]  # Neighbor 5
])
glass_knn.RBF(test_point, nearest_neighbors, 1)

Neighbor Values:
[5.  4.  6.  3.  5.5]
Distances from the test point to each of the neighbors:
[0.70710678 0.70710678 0.         1.41421356 1.11803399]
Weight applied to each point based on its distance from the test point:
[0.77880078 0.77880078 1.         0.36787944 0.53526143]
Respective weight * respective neighbor value, all summed together:
17.056783228011415
Sum of the RBF weights:
3.4607424358332426
Final predicted value (weighted sum of neighbor values/RBF weight sum):
4.928648561476854


# KNN classification demonstration

In [32]:
glass_knn.classify(demo=True)

 50%|█████     | 5/10 [00:00<00:00, 47.04it/s]

Point being classified:
['1.51' '13.3' '3.43' '1.43' '72.2' '0.51' '8.6' '0.0' '0.0' '2.0']
Nearest neighbors:
[['1.52' '13.4' '3.34' '1.23' '72.3' '0.6' '8.83' '0.0' '0.0' '7.0']
 ['1.51' '13.5' '3.41' '1.52' '72.0' '0.58' '8.79' '0.0' '0.0' '3.0']]
Predicted class:
7.0


                                               

Loss: [[0.65       0.55677656]
 [0.6        0.5468254 ]
 [0.68421053 0.39722222]
 [0.63157895 0.54761905]
 [0.68421053 0.61313131]
 [0.63157895 0.51803752]
 [0.68421053 0.55835668]
 [0.73684211 0.6647619 ]
 [0.84210526 0.87518038]
 [0.57894737 0.4875    ]]




array([[0.65      , 0.55677656],
       [0.6       , 0.5468254 ],
       [0.68421053, 0.39722222],
       [0.63157895, 0.54761905],
       [0.68421053, 0.61313131],
       [0.63157895, 0.51803752],
       [0.68421053, 0.55835668],
       [0.73684211, 0.6647619 ],
       [0.84210526, 0.87518038],
       [0.57894737, 0.4875    ]])

# KNN regression demonstration

In [33]:
fire_knn.regress(demo=True)

 10%|█         | 1/10 [00:00<00:01,  6.80it/s]

Point being regressed:
['7.0' '5.0' '21.0' '17.0' '92.5' '88.0' '698.' '7.1' '22.8' '40.0' '4.0'
 '0.0' '0.0']
Nearest neighbors:
[['2.0' '5.0' '19.0' '26.0' '87.5' '77.0' '694.' '5.0' '22.3' '46.0'
  '4.0' '0.0' '0.0']
 ['3.0' '4.0' '21.0' '14.0' '94.3' '85.1' '692.' '15.9' '19.8' '50.0'
  '5.4' '0.0' '0.0']
 ['4.0' '5.0' '21.0' '17.0' '92.5' '88.0' '698.' '7.1' '20.3' '45.0'
  '3.1' '0.0' '0.0']
 ['4.0' '5.0' '21.0' '14.0' '94.3' '85.1' '692.' '15.9' '17.7' '37.0'
  '3.6' '0.0' '0.0']
 ['5.0' '4.0' '21.0' '14.0' '94.3' '85.1' '692.' '15.9' '20.1' '47.0'
  '4.9' '0.0' '0.00']
 ['3.0' '4.0' '21.0' '18.0' '89.7' '90.0' '704.' '4.8' '22.8' '39.0'
  '3.6' '0.0' '0.0']
 ['7.0' '5.0' '21.0' '17.0' '92.5' '88.0' '698.' '7.1' '17.8' '51.0'
  '7.2' '0.0' '0.0']]
Predicted value:
0.0


                                               

array([[3.24216358e-03, 7.05520050e-02],
       [9.65833573e-04, 4.42470136e-02],
       [1.40996016e-03, 9.22302804e-02],
       [8.19652662e-04, 7.29918436e-02],
       [6.22432394e-04, 6.38273201e-02],
       [2.29998895e-04, 6.69965753e-02],
       [1.70142298e-04, 5.85838964e-02],
       [7.28963675e-05, 5.53878214e-02],
       [3.59777281e-05, 5.15026848e-02],
       [2.85659155e-06, 5.12306895e-02]])

In [34]:
glass_enn = enn(glass_data, "classification", k_n=glass_knn.k_n, sigma=glass_knn.sigma)
glass_enn.tune()

                                                                    

(10, 132, 10)
Example removed from original dataset:
['1.51' '13.3' '3.43' '1.43' '72.2' '0.51' '8.6' '0.0' '0.0' '2.0']
Does the example exist in the original dataset? - True
Does the example exist in the reduced dataset? - False


Tuning K_n...: 100%|██████████| 15/15 [00:01<00:00,  7.74it/s]

Tuned k_n: 1





# Point being edited out of ENN dataset

In [35]:

fire_enn = enn(fire_data, 'regression', k_n=fire_knn.k_n, sigma=fire_knn.sigma)
fire_enn.tune(demo=True)

# Code for printing whether the removed example is in the dataset
'''
print(f"Example removed from original dataset:\n{model[removal_indices[0]]}")
print(f"Does the example exist in the original dataset? - {np.any(np.all(initial_set == model[removal_indices[0]], axis=2))}")
print(f"Does the example exist in the reduced dataset? - {np.any(np.all(padded_reduced_models == model[removal_indices[0]], axis=2))}")
'''

                                                                    

(10, 107, 13)
Example removed from original dataset:
['7.0' '5.0' '21.0' '17.0' '92.5' '88.0' '698.' '7.1' '22.8' '40.0' '4.0'
 '0.0' '0.0']
Does the example exist in the original dataset? - True
Does the example exist in the reduced dataset? - False


Tuning K_n...: 100%|██████████| 15/15 [00:04<00:00,  3.37it/s]
Tuning sigma...: 100%|██████████| 15/15 [00:04<00:00,  3.23it/s]

Tuned k_n: 2
Tuned sigma: 1





'\nprint(f"Example removed from original dataset:\n{model[removal_indices[0]]}")\nprint(f"Does the example exist in the original dataset? - {np.any(np.all(initial_set == model[removal_indices[0]], axis=2))}")\nprint(f"Does the example exist in the reduced dataset? - {np.any(np.all(padded_reduced_models == model[removal_indices[0]], axis=2))}")\n'

In [36]:
fire_enn_results = fire_enn.regress();
glass_enn_results = glass_enn.classify();

                                              

Loss: [[0.6        0.45811966]
 [0.6        0.56746032]
 [0.73684211 0.56296296]
 [0.68421053 0.61050061]
 [0.57894737 0.53535354]
 [0.52631579 0.47125097]
 [0.57894737 0.38      ]
 [0.73684211 0.6167033 ]
 [0.78947368 0.72857143]
 [0.73684211 0.65836386]]




# Point being associated with a cluster for K-Means

In [37]:
glass_kmeans.classify(demo=True);

                                      

Data point:
[ 1.51 13.7   3.93  1.54 71.8   0.54  8.21  0.    0.15  2.  ]
distances to centroids:
[1.79824915 1.67263266 2.09234318 5.58875657 0.56284989 2.11520685
 1.43425939 1.83226636 6.69772349 2.02380829 1.75988636 1.92010416
 1.78468485 1.06094298 9.11172322 2.25386335 0.94270886 6.85979592
 1.69251292 1.42734018 6.20328945 2.00227371 2.24822152 2.10653744
 1.3065221  7.84554651 2.17726893 2.11397729 2.08124962 1.17162281
 3.46151701 2.12287541 1.69655533 0.71533209 2.49747873 4.5846592
 4.27773304 1.6522409  8.3587858 ]
Data point [ 1.51 13.7   3.93  1.54 71.8   0.54  8.21  0.    0.15  2.  ] assigned to cluster 5


Data point:
[ 1.51 13.7   3.93  1.54 71.8   0.54  8.21  0.    0.15  2.  ]
distances to centroids:
[1.92618867 1.39988839 2.09234318 5.58875657 0.75361352 2.08305331
 1.45313454 1.74477529 6.1995203  1.96329089 1.75988636 1.56846167
 2.05648838 1.10193265 7.76578715 2.24330114 0.94129344 6.73209853
 1.60491375 1.48873525 5.3031982  1.8939905  2.14172049 2.05327787
 1.

                                      

Loss: [[0.6        0.64803922]
 [0.6        0.4993895 ]
 [0.73684211 0.81565657]
 [0.68421053 0.5620915 ]
 [0.52631579 0.52136752]
 [0.63157895 0.55555556]
 [0.52631579 0.41558442]
 [0.63157895 0.59736264]
 [0.63157895 0.52161172]
 [0.68421053 0.53174603]]




# Average classification performance

In [38]:
print("Glass Classification Performance:")
print(f"KNN     - 0/1 Loss: {np.mean(glass_knn_results[:, 0])} F1 Score: {np.mean(glass_knn_results[:, 1])}")
print(f"ENN     - 0/1 Loss: {np.mean(glass_enn_results[:, 0])} F1 Score: {np.mean(glass_enn_results[:, 1])}")
print(f"K-Means - 0/1 Loss: {np.mean(glass_kmeans_results[:, 0])} F1 Score: {np.mean(glass_kmeans_results[:, 1])}")

Glass Classification Performance:
KNN     - 0/1 Loss: 0.6723684210526315 F1 Score: 0.5765411010558068
ENN     - 0/1 Loss: 0.6568421052631579 F1 Score: 0.5589286639286639
K-Means - 0/1 Loss: 0.6463157894736842 F1 Score: 0.5253235653235653


# Average regression performance

In [39]:
print("Fire Regresssion Performance:")
print(f"KNN     - MSE: {np.mean(fire_knn_results[:, 0])} MAE: {np.mean(fire_knn_results[:, 1])}")
print(f"ENN     - MSE: {np.mean(fire_enn_results[:, 0])} MAE: {np.mean(fire_enn_results[:, 1])}")
print(f"K-Means - MSE: {np.mean(fire_kmeans_results[:, 0])} MAE: {np.mean(fire_kmeans_results[:, 1])}")

Fire Regresssion Performance:
KNN     - MSE: 0.0007571914245353266 MAE: 0.06275501301324626
ENN     - MSE: 0.0015110256198391505 MAE: 0.03478500484367331
K-Means - MSE: 0.00339409430679491 MAE: 0.04929409933155525
