# __Assignment 1__

***

### Imports

In [1]:
import numpy as np
import math
import time
import heapq
import copy
from itertools import count
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

### Constants

In [2]:
RANDOM_STATE = 0 # this can be altered to provide different accuracy, see Dataset.split() (try 18 for best results)

In [3]:
"""Constants for the different conformity measures, to be used when instantiating a new Conformal object.
   See Conformal class.
   
"""

DIFFERENT_CLASS = 1
ONE_OVER_SAME_CLASS = 2
DIFFERENT_OVER_SAME_CLASS = 3

***

### Dataset Class

In [4]:
"""Class that represents the Dataset that will be used for predictions.

"""

class Dataset:
    
    def __init__(self, name, samples, labels):
        """Takes a name for the dataset, an array of samples and an array of their corresponding labels. 
        
        """
        
        self.name = name
        self.samples = samples
        self.labels = labels
        self.dataset = self.fit(samples, labels)
        self.label_set = self.extract_labels()   
        self.test_set, self.training_set, self.actual_test_labels = self.split()
        
    def __repr__(self):
        data = ""
        for sample in self.dataset:
           data += str(sample) + "\n"
        return data
        
    def extract_labels(self):
        """Return a set of all unique labels for this dataset.
        
        """

        labels = set()

        for i in range(len(self.dataset)):
            if not self.dataset[i][-1] in labels:
                labels.add(self.dataset[i][-1])

        return labels
    
    def get_name(self):
        return self.name
    
    def get_actual_test_labels(self):
        return self.actual_test_labels
        
    def get_label_set(self):
        return self.label_set
    
    def get_test_set(self):
        return self.test_set
    
    def get_training_set(self):
        return self.training_set
    
    def fit(self, samples, labels):
        """Takes an array of samples and an array of their correspondiong labels.
           Returns a new array where each element is the concatenation of a sample and its corresponding label.
           
        """

        features = samples.shape[1] + 1
        labelled_samples = np.zeros((len(samples), features))

        for i in range(len(labelled_samples)):
            sample = samples[i]
            label = [labels[i]]
            labelled_samples[i] = np.concatenate([sample,label])

        return labelled_samples
    
    def split(self):
        """Split the dataset into a test set and a training set.
            Return an array for the test set, an array for the training set and an array for the test set labels.
            
        """
        
        training_samples, test_samples, training_labels, test_labels = train_test_split(self.samples, self.labels, random_state=RANDOM_STATE)
        test_set = self.fit(test_samples, test_labels)
        training_set = self.fit(training_samples, training_labels)
        return test_set, training_set, test_labels

***

### NearestNeighbours Class

In [5]:
"""Class that represents the Nearest Neighbours model.

"""

class NearestNeighbours:
    
    def __init__(self, dataset, neighbours=1):
        """Takes a Dataset object and an optional parameter for number of neighbours, which is 1 by default. 
        
        """        
        
        # Make sure user can't choose less than 1 nearest neighbour
        if neighbours < 1:
            self.kNeighbours = 1
        else:
            self.kNeighbours = neighbours
            
        self.dataset = dataset
        self.test_set = self.get_test_set()
        self.training_set = self.get_training_set()
        self.predictions = None
        self.tiebreaker = count()
        
    def get_dataset(self):
        return self.dataset
    
    def get_dataset_name(self):
        return self.dataset.get_name()
    
    def get_test_set(self):
        return self.dataset.get_test_set()
    
    def get_training_set(self):
        return self.dataset.get_training_set()
        
    def get_neighbours(self):
        return self.kNeighbours
    
    def find_knn(self):
        nearest_neighbours = np.zeros((len(self.training_set), self.kNeighbours))
        print(str(nearest_neighbours))
    
    def actual_labels(self):
        return self.dataset.get_actual_test_labels()
    
    def euclidean(self, x, y):
        """Takes two vectors as parameters and returns the Euclidean distance between those two points.

        """

        ed = 0

        for i in range(len(x)):
            ed += (x[i] - y[i])**2

        return np.sqrt(ed)
    
    def find_nearest_neighbour(self, test_sample, training_set):
        """Takes a test sample and a training set and finds the K nearest neighbours to the test sample in the training set.
           The K nearest neighbours are analysed for their labels and the majority label is the one that will be predicted for the test sample.
           In case of ties, the first label is chosen.
           Returns the nearest labelled sample and the distance of this sample to the test sample.

        """

        # Make sure that K is not larger than the training set
        if self.kNeighbours > len(training_set):
            k = len(training_set)
        else:
            k = self.kNeighbours
            
        neighbours = []
        heapq.heapify(neighbours)
        nearest_neighbours = np.zeros((k, 3), dtype=object)
        nearest_labels = []    

        for i in range(len(training_set)):
            ed = self.euclidean(test_sample, training_set[i][:-1])
            entry = [ed]
            entry.append(next(self.tiebreaker)) 
            entry.append(training_set[i])
            heapq.heappush(neighbours, entry)        
            
        for i in range(k):
            entry = heapq.heappop(neighbours)
            nearest_neighbours[i] = entry
            nearest_labels.append(nearest_neighbours[i][2][-1])
        
        (values, counts) = np.unique(nearest_labels, return_counts=True)
        index = np.argmax(counts)
        
        closest_labelled_sample = nearest_neighbours[index][2]
        closest_distance = nearest_neighbours[index][0]
        
        return closest_labelled_sample, closest_distance    
    
    def predict_labels(self):
        """Predicts the label for each sample in the test set using the nearest neighbour algorithm.
           Stores an array of all the predicted labels in the instance variable self.predictions.
           
        """
        
        self.predictions = np.zeros((len(self.test_set)))

        for i in range(len(self.test_set)):
            test_sample = self.test_set[i][:-1]
            closest_labelled_sample = self.find_nearest_neighbour(test_sample, self.training_set)
            closest_label = closest_labelled_sample[0][-1:]
            self.predictions[i] = closest_label

    def score(self):
        """Compares the predicted labels with the actual labels and prints the accuracy rate and error rate of the model.
           
        """
        
        dataset_name = self.get_dataset_name()
        score = 1 - np.mean(self.predictions == self.actual_labels())
        errors = np.size(self.predictions) - np.count_nonzero(self.predictions == self.actual_labels())
        print("Number of errors in " + dataset_name + " predictions is: " + str(errors) + " out of " + str(len(self.predictions)))
        print("The error rate for the " + dataset_name + " predictions is: " + str(score) + "\n")
        

***

### Conformal Class

In [6]:
"""Class that represents the Conformal prediction model.
   This is a subclass of the NearestNeighbours class.
   
"""

class Conformal(NearestNeighbours):
    
    def __init__(self, dataset, conformity_measure=DIFFERENT_OVER_SAME_CLASS):
        """Takes a Dataset object and an optional parameter for the conformity measure, 
           which is DIFFERENT_OVER_SAME_CLASS by default. See Constants at top for more options.
        
        """  
        
        super().__init__(dataset, neighbours=1)
        self.conformity_measure = conformity_measure
        self.tiebreaker = count()
        self.training_set_scores = self.nn_training_set()

    def label_set(self):
        """Return the set of possible labels in this dataset.
        
        """
        
        return self.dataset.get_label_set()
    
    def separate_classes(self, test_sample):
        """Takes a test sample and returns two arrays, same_class and different_class.
           same_class contains all labelled samples from training set with the same label as test_sample.
           different_class contains all labelled samples from training set with a different label from test_sample.
           
        """
        same_class = []
        different_class = []

        for i in range(len(self.training_set)):
            if self.training_set[i][-1] == test_sample[-1]:
                same_class.append(self.training_set[i])
            else:
                different_class.append(self.training_set[i])
        
        # Make sure the test sample isn't kept in the same_class array.        
        for i in range(len(same_class)):
            if np.array_equal(same_class[i], test_sample):
                same_class = np.delete(same_class, i, 0)
                break

        same_class = np.array(same_class)   
        different_class = np.array(different_class)

        return same_class, different_class
    
    def conf_score(self, nearest_diff_distance, nearest_same_distance):
        """Takes two distances and calculates the conformity score based on the current
           conformity measure being used.
        
        """
        
        if self.conformity_measure == DIFFERENT_CLASS:
            conformity_score = nearest_diff_distance
        elif self.conformity_measure == ONE_OVER_SAME_CLASS and nearest_same_distance != 0:
            conformity_score = 1 / nearest_same_distance                  
        elif nearest_diff_distance == 0 and nearest_same_distance == 0:
            conformity_score = 0              
        elif nearest_same_distance == 0:
            conformity_score = math.inf    
        else:
            conformity_score = nearest_diff_distance / nearest_same_distance                       
            
        return conformity_score    
    
    def nn_training_set(self):
        """Compares each training sample with the rest of the training set and calculates its conformity score.
           Return an array of labelled training samples along with their conformity scores.
           Make the array length 1 more than training set length and leave last index blank which will be 
           used to temporarily store a test sample when it is being measured against the training set - 
           see self.nn_test_sample.
           
        """

        scores = np.zeros(len(self.training_set) + 1, dtype=object)

        for i in range(len(self.training_set)):
            labelled_sample = self.training_set[i]
            same_class, different_class = self.separate_classes(labelled_sample)         

            nn_different_class, nn_different_distance = self.find_nearest_neighbour(labelled_sample[:-1], different_class)
            nn_same_class, nn_same_distance = self.find_nearest_neighbour(labelled_sample[:-1], same_class)                    

            conformity_score = self.conf_score(nn_different_distance, nn_same_distance)

            entry = [conformity_score]
            entry.append(next(self.tiebreaker)) 
            entry.append(labelled_sample)
            entry.append(nn_different_distance)
            entry.append(nn_same_distance)
            scores[i] = entry

        return scores

    def nn_test_sample(self, sample):
        """Takes a test sample and finds the nearest distance between this and a sample in the training set with
           the same label, as well as the nearest distance to a sample in the training set with a different label.
           Makes a copy of the training set scores array created in self.nn_training_set. In each iteration check if
           the euclidean distance calculated is less than the distance already stored for that particular training sample,
           if so this will only be updated in the copied array.
           Return an array with the training set and this specific test sample along with the conformity scores for each sample.
           
        """
        
        test_sample = sample[:-1]
        test_label = sample[-1]
        nearest_same_distance = math.inf
        nearest_diff_distance = math.inf
        train_set_scores = copy.deepcopy(self.training_set_scores)
        changed = False
        
        for i in range(len(train_set_scores) - 1):
            training_sample = train_set_scores[i][2][:-1]
            training_label = train_set_scores[i][2][-1]            
            ed = self.euclidean(test_sample, training_sample)
            
            if (test_label == training_label):
                if ed <= nearest_same_distance:
                    nearest_same_distance = ed            
                if ed <= train_set_scores[i][4]:
                    train_set_scores[i][4] = ed
                    changed = True
            else:
                if ed <= nearest_diff_distance:
                    nearest_diff_distance = ed
                if ed <= train_set_scores[i][3]:
                    train_set_scores[i][3] = ed
                    changed = True
            
            '''
               Checks if either of the distances for this training sample have been changed, if so, the conformity
               score will be recalculated
            '''
            if changed:
                train_set_scores[i][0] = self.conf_score(train_set_scores[i][3], train_set_scores[i][4])
                changed = False
            
        conformity_score = self.conf_score(nearest_diff_distance, nearest_same_distance)

        entry = [conformity_score]
        entry.append(next(self.tiebreaker))
        entry.append(sample)
        entry.append(nearest_diff_distance)
        entry.append(nearest_same_distance)
        test = np.array([entry], object)
        train_set_scores[-1] = entry

        return train_set_scores  
    
    def pval(self, test_sample, scores_list):
        """Takes a test sample and the training set conformity scores created in self.nn_test_sample, and calculates 
           the P Value for this test sample.
           Converts the array of scores into a heap. The heap stores each labelled training sample with the conformity 
           score used as the means of prioritising the data. Pop will always remove the smallest conformity score first. 
           Pop each entry until test sample is found.
           Return the P Value for this specific test sample.
        
        """

        rank = 0
        scores = copy.deepcopy(scores_list)
        scores = scores.tolist()
        num_of_samples = len(scores)
        heapq.heapify(scores)

        for i in range(len(scores_list)):
            sample = heapq.heappop(scores)

            if np.array_equal(sample[2], test_sample):
                rank = i + 1
                break

        p_value = rank / num_of_samples
        
        return p_value   
    
    def false_pvalues(self):
        """Calculates the average false P Values for each test sample.
           Iterate through each sample in the test set and for each possible label calculates the P Value. This is
           used to calculate the false P Value and then the average false P Value.
           Return an array with the average false P Value for each test sample.
        
        """

        p_val = -math.inf
        count = 0    
        false_pval_list = np.zeros((len(self.test_set), 1))
        
        for test_sample in self.test_set:
            p_val = -math.inf
            false_pval = 0
            real_label = test_sample[-1]

            for label in self.label_set():
                sample = np.concatenate((test_sample[:-1], [label]))
                scores = self.nn_test_sample(sample)
                current_pv = self.pval(sample, scores)

                if current_pv >= p_val:
                    p_val = current_pv
                if label != real_label:
                    false_pval += current_pv

            avg_false_pval = false_pval / (len(self.label_set()) - 1)
            false_pval_list[count] = avg_false_pval
            count += 1                

        return false_pval_list  
    
    def avg_false_pval(self):
        """Calculates the average false P Values for each test sample and then returns the average of all these values.
           Returns an integer for the average false P Value.
           
        """
        
        false_pvals = self.false_pvalues()
        
        total_false_pvals = 0
        
        for pval in false_pvals:
            total_false_pvals += pval[0]
            
        avg_false_pval = total_false_pvals / len(false_pvals)
        
        return avg_false_pval
    
    def score(self):
        """Evaluates the accuracy of the Conformal Prediction model and prints out a message with the average false P Value.
        
        """
        
        start = time.time()
        
        dataset_name = self.get_dataset_name()
        score = self.avg_false_pval()

        print("Average False P Value in " + dataset_name + " conformal predictions is: " + str(score))
        print("(This took " + str(time.time() - start) + " seconds to calculate.)\n\n")

***

### Load and split the datasets

In [7]:
iris = load_iris() # load iris data

X_ion = np.genfromtxt("ionosphere.txt", delimiter=",", usecols=np.arange(34)) # extract samples from ionosphere
y_ion = np.genfromtxt("ionosphere.txt", delimiter=",", usecols=34, dtype='int') # extract labels from ionosphere

In [8]:
iris_dataset = Dataset("Iris", iris.data, iris.target)
ion_dataset = Dataset("Ionosphere", X_ion, y_ion)

***

# NEAREST NEIGHBOUR

***

### Instantiate the NearestNeighbours models for both datasets

In [9]:
iris_knn = NearestNeighbours(iris_dataset)
ion_knn = NearestNeighbours(ion_dataset)

***

### Predict the labels for each test set

In [10]:
iris_knn.predict_labels()
ion_knn.predict_labels()

***

### Score the accuracy and error rates of predictions on Nearest Neighbour model

In [11]:
iris_knn.score()

Number of errors in Iris predictions is: 1 out of 38
The error rate for the Iris predictions is: 0.02631578947368418



In [12]:
ion_knn.score()

Number of errors in Ionosphere predictions is: 13 out of 88
The error rate for the Ionosphere predictions is: 0.1477272727272727



***

### Change the value of K to see what difference this will make

#### 3 Neighbours

In [13]:
iris_knn = NearestNeighbours(iris_dataset, 3)
iris_knn.predict_labels()
iris_knn.score()

Number of errors in Iris predictions is: 1 out of 38
The error rate for the Iris predictions is: 0.02631578947368418



In [14]:
ion_knn = NearestNeighbours(ion_dataset, 3)
ion_knn.predict_labels()
ion_knn.score()

Number of errors in Ionosphere predictions is: 12 out of 88
The error rate for the Ionosphere predictions is: 0.13636363636363635



#### 5 Neighbours

In [15]:
iris_knn = NearestNeighbours(iris_dataset, 5)
iris_knn.predict_labels()
iris_knn.score()

Number of errors in Iris predictions is: 1 out of 38
The error rate for the Iris predictions is: 0.02631578947368418



In [16]:
ion_knn = NearestNeighbours(ion_dataset, 5)
ion_knn.predict_labels()
ion_knn.score()

Number of errors in Ionosphere predictions is: 13 out of 88
The error rate for the Ionosphere predictions is: 0.1477272727272727



#### 9 Neighbours

In [17]:
iris_knn = NearestNeighbours(iris_dataset, 9)
iris_knn.predict_labels()
iris_knn.score()

Number of errors in Iris predictions is: 1 out of 38
The error rate for the Iris predictions is: 0.02631578947368418



In [18]:
ion_knn = NearestNeighbours(ion_dataset, 9)
ion_knn.predict_labels()
ion_knn.score()

Number of errors in Ionosphere predictions is: 14 out of 88
The error rate for the Ionosphere predictions is: 0.15909090909090906



#### 30 Neighbours

In [19]:
iris_knn = NearestNeighbours(iris_dataset, 30)
iris_knn.predict_labels()
iris_knn.score()

Number of errors in Iris predictions is: 1 out of 38
The error rate for the Iris predictions is: 0.02631578947368418



In [20]:
ion_knn = NearestNeighbours(ion_dataset, 30)
ion_knn.predict_labels()
ion_knn.score()

Number of errors in Ionosphere predictions is: 12 out of 88
The error rate for the Ionosphere predictions is: 0.13636363636363635



#### 100 Neighbours

In [21]:
iris_knn = NearestNeighbours(iris_dataset, 100)
iris_knn.predict_labels()
iris_knn.score()

Number of errors in Iris predictions is: 2 out of 38
The error rate for the Iris predictions is: 0.052631578947368474



In [22]:
ion_knn = NearestNeighbours(ion_dataset, 100)
ion_knn.predict_labels()
ion_knn.score()

Number of errors in Ionosphere predictions is: 9 out of 88
The error rate for the Ionosphere predictions is: 0.10227272727272729



***

#### Conclusion on KNN model

By playing with the value of K we can see that the accuracy of the model varies. In some cases there were no differences, particularly with the Iris dataset. This could be because it is relatively small and so there is less chance of finding anamolous data to skew our results.

But looking at the Ionosphere dataset we can see that our predictions are less accurate when using 9 neighbours, more accurate when using 30 neighbours and even more accurate when using 100 neighbours. Having said that, using 100 neighbours for the Iris dataset returns a worse score.

The accuracy is also dependent on the random state we choose in the train_test_split method. Please alter the value of the RANDOM_STATE constant at the top and restart and run all cells. A random state of 18 provides a more favourable score and returns no errors for the Iris predictions.

***

# CONFORMAL PREDICTION

***

### Instantiate the Conformal objects for both datasets and for each conformity measure

#### Iris

In [23]:
iris_conformal1 = Conformal(iris_dataset, conformity_measure=DIFFERENT_CLASS)
iris_conformal2 = Conformal(iris_dataset, conformity_measure=ONE_OVER_SAME_CLASS)
iris_conformal3 = Conformal(iris_dataset, conformity_measure=DIFFERENT_OVER_SAME_CLASS)

#### Ionosphere

In [24]:
ion_conformal1 = Conformal(ion_dataset, conformity_measure=DIFFERENT_CLASS)
ion_conformal2 = Conformal(ion_dataset, conformity_measure=ONE_OVER_SAME_CLASS)
ion_conformal3 = Conformal(ion_dataset, conformity_measure=DIFFERENT_OVER_SAME_CLASS)

***

### Score the conformal predictions for each dataset and for each conformity measure

#### Iris

In [25]:
iris_conformal1.score() # DIFFERENT_CLASS
iris_conformal2.score() # ONE_OVER_SAME_CLASS
iris_conformal3.score() # DIFFERENT_OVER_SAME_CLASS

Average False P Value in Iris conformal predictions is: 0.04448067070330696
(This took 0.2781543731689453 seconds to calculate.)


Average False P Value in Iris conformal predictions is: 0.02619934792734048
(This took 0.2974226474761963 seconds to calculate.)


Average False P Value in Iris conformal predictions is: 0.011294829995342344
(This took 0.30892515182495117 seconds to calculate.)




#### Ionosphere

In [27]:
ion_conformal1.score() # DIFFERENT_CLASS
ion_conformal2.score() # ONE_OVER_SAME_CLASS
ion_conformal3.score() # DIFFERENT_OVER_SAME_CLASS

Average False P Value in Ionosphere conformal predictions is: 0.36664944903581276
(This took 1.4453871250152588 seconds to calculate.)


Average False P Value in Ionosphere conformal predictions is: 0.21651170798898073
(This took 1.5119688510894775 seconds to calculate.)


Average False P Value in Ionosphere conformal predictions is: 0.06099345730027552
(This took 1.5075788497924805 seconds to calculate.)




***

#### Conclusion on Conformal Prediction model

For both datasets we can see that using the default conformity measure (i.e. the distance to the nearest sample of a different class over the distance to the nearest sample of the same class) produces a better result than the other conformity measures.

The accuracy is also dependent on the random state we choose in the train_test_split method. Please alter the value of the RANDOM_STATE constant at the top and restart and run all cells. A random state of 18 provides a more favourable score.

I suppose there is an element of luck in regards to the test and training sets that the random state generates and it just so happens that using a state of 18 provides a more favourable test/training set for making accurate predictions than a state of 0 generates.

***