KNN Implementation description:
Initially I split the Implementation into multiple functions, these are: dataSplit, getDistances, labelSelection, applyToTestSet, and predictionAccuracy.
These functions are called by the user when required to progress the application of a KNN.

Imports for the functions include numpy and math, numpy is used to define the arrays and infinty.
Math is used to floor a value and to square root a value.
Both of these uses could have been implemented without imports however this would probabily reduce efficiency and was not specified as an additional task

Notes on my implementation:
I made several functions that allow a user to split data to training and test data, create a KNN (Variable size of K) and obtain a conformity measure, P-value and False-P-Value. I applied this to the Iris dataset and ionosphere

Structure: Imports -> KNN functions -> Conformal Predictor functions -> Tests on Iris dataset -> Tests on Ionosphere dataset

# Imports

In [1]:
#imports
import numpy as np
import math

# KNN functions

The first function: dataSplit, has the same purpose as train_test_split.

In [2]:
#define the data split function
#inputs: data = the input data
#labels = labels associated with the input data
#ratio = % to go to testSet
#shuffleVar = the seed to set random to
#outputs: array of: trainingSet, testSet, trainingLabels, testLabels
def dataSplit(data,labels,ratio,shuffleVar):
    #take input data and ratio in order to find the size of the test and training datasets
    testSize = math.floor(ratio*data.shape[0])
    trainSize = data.shape[0]-testSize
    
    #set the random seed, then shuffle an array of indexs to move around the order of the input data
    np.random.seed(shuffleVar)
    indexs = np.arange(data.shape[0])
    np.random.shuffle(indexs)
    #create 2 new variables to store the new data in with its new order
    dataSet = np.zeros(data.shape)
    labelSet = np.zeros(data.shape[0])
    #loop through all data applying the new index
    for i in range(trainSize+testSize):
        dataSet[i] = data[indexs[i]]
        labelSet[i] = labels[indexs[i]]
    data=dataSet
    labels=labelSet
    #create arrays to store the separated labels and data 
    trainingSet = np.zeros((trainSize,data.shape[1]))
    testSet = np.zeros((testSize,data.shape[1]))
    trainingLabels = np.zeros(trainSize)
    testLabels = np.zeros(testSize)
    #loop through and split the data
    for i in range(trainSize+testSize):
        if i<testSize:
            testSet[i] = data[i]
            testLabels[i] = labels[i]
        else:
            trainingSet[i-testSize] = data[i]
            trainingLabels[i-testSize] = labels[i]
    #output the array of: trainingSet, testSet, trainingLabels, testLabels
    return [trainingSet,testSet,trainingLabels,testLabels]

The second function getDistances takes the input of a test value, the training set and the number k of nearest neighbors

Note on the computational complexity. This function is where the majority of the computation will occur, this being the case it is where computational complexity is a primary concern. There are multiple nested for loops within this function, the first 2 for loops have a complexity of "size of the training dataset"*"number of dimensions the data has" The second set of for loops has "size of the training dataset"*k. As k and "number of dimensions the data has" are relitively small complexity can be sumerised by: O("size of the training dataset").

In [3]:
#define the distance funtion
#x is the test value and y are the training values, k=K
#return the smallest distance and its index in y
#Computational complexity: this uses 2 for loops going through all of the training dataset and then calculates the distance based on the number of dimensions the dataset has. This function will do the majority of the computational work
#There are ways to reduce computational complexity with memory however this introduce added complexity into the code
def getDistances(x,y,k):
    distances = np.zeros((y.shape[0],2))
    #get the distance from each of the points in y to the point x
    for j in range(y.shape[0]):
        #calculate the distance based on the number of dimensions (e.g. iris has 4 so the loop will run 4 times)
        for i in range(x.shape[0]):
            #find the square of each differnce
            distances[j][0] += (x[i]-y[j][i])**2
        #root the value to find the distance
        distances[j] = np.array([math.sqrt(distances[j][0]),j])
    #define arrays minimumDistance (the output), and tempDistance (used to store a value temparerily)
    minimumDistance = np.full((k,2),np.inf)
    tempDistance = np.full((2),0)
    #loop through all of the distances
    for i in range(distances.shape[0]):
        #if any of the distances are lower than the largest distance stored allow the distances stored to be updated
        if distances[i][0]<minimumDistance[k-1][0]:
            #loop through all minimum distances stored
            for j in range(k):
                #if the new duistance is smaller than the smallest stored replace. Use a tempary array to enable switching
                if distances[i][0]<minimumDistance[j][0]:
                    tempDistance = np.ndarray.copy(minimumDistance[j])
                    minimumDistance[j] = distances[i]
                    if tempDistance[0] ==np.inf:
                        j=k
                    else:
                        distances[i][0] = tempDistance[0]
                        distances[i][1] = tempDistance[1]
    
    
    return minimumDistance

The third function labelSelection takes the output from the previous function and assigns it a label based upon a voting method

In [4]:
#Define the label selection function
#inputs are:
#x=output from getDistances
#y=the example datasetlabels
#k=k nearest neighbors
def labelSelection(x, y, k):
    #create array to hold votes
    labelSet = np.zeros(np.unique(y).shape[0])
    #loop through all labels which were close to 
    for i in range(k):
        #get index of the distance from x to get the relevent label from y to add one to label set
        labelSet[np.where(np.unique(y)== y[int(x[i][1])])] += 1
    #return the value found in array of all unique labels in position voted on
    return np.unique(y)[np.where(labelSet == np.amax(labelSet))[0][0]]

This function combines the previous 2 functions over an entire dataset, the rational behind separating the functions is that the user may sometimes wish to analyse a single datapoint 

Note on complexity: This function calls getDistances within another for loop. This increases the computational complexity significantly to O("size of the training dataset"*"size of the test dataset").

As "size of the training dataset" = data * m, and "size of the test dataset" = data * (1-m). big O can be simplified (at the cost of trueness) to O(Data ** 2), where data is the size of the dataset before splitting

In [5]:
#define applyToTestSet function
#inputs: x=trainingSet, y=traningLabels, z=testSet, k=K
def applyToTestSet(x,y,z,k):
    #create an array to store the output list of predicted labels
    predictedLabels = np.zeros(z.shape[0])
    #loop through all of the test datapoints and obtain a label prediction
    for i in range(z.shape[0]):
        distancesForZi = getDistances(z[i], x, k)
        predictedLabels[i] = labelSelection(distancesForZi,y,k)
    return predictedLabels

The final function defines the accuracy: It loops through all of the predicted labels and compares them to their true values

In [6]:
#accuracy function
#inputs x=the predictd labels and y=the test labels
def predictionAccuracy(x,y):
    numberCorrect = 0
    for i in range(x.shape[0]):
        if x[i]==y[i]:
            numberCorrect +=1
    return numberCorrect/x.shape[0]

# Conformal Predictor functions

The ConformityPredictor function has a computational complexity of O(otherData ** 2)

In [7]:
#first function: conforityPredictor
#the inputs include: currentData = the datapoint you want a conformity measure for
#currentLabel = the corresponding label for the current data point
#otherData = the other datapoints to compare your current data to
#otherLabels = corresponding labels for the other Data points
def conformityPredictor(currentData, currentLabel,otherData, otherLabels):
    #this uses the same method of calculating distances as the function getDistances
    #Note: Maybe the code could have been secioned off to reduce the complexity of the code, this was not done as both functions were already complete
    distances = np.zeros((otherData.shape[0],2))
    for i in range(otherData.shape[0]):
        #calculate the distance based on the number of dimensions (e.g. iris has 4 so the loop will run 4 times)
        for j in range(currentData.shape[0]):
            #find the square of each differnce
            distances[i][0] += (currentData[j]-otherData[i][j])**2
        #root the value to find the distance
        distances[i] = np.array([math.sqrt(distances[i][0]),i])
    #vars to hold the value of the minimum distance to the value
    distanceToSameClass = np.inf
    distanceToDifClass = np.inf
    
    #go through all of the distances and select the smallest distance
    for i in range(otherData.shape[0]):
        if otherLabels[i] == currentLabel and distances[i][0] < distanceToSameClass:
            distanceToSameClass = distances[i][0]
        elif distances[i][0] < distanceToDifClass:
            distanceToDifClass = distances[i][0]
    #following if statements avoid the problem of dividing by 0 and ensure that the corret value is returned
    if distanceToSameClass == 0:
        return 0
    elif distanceToDifClass == 0:
        return np.inf
    else:
        return distanceToDifClass/distanceToSameClass

In [8]:
#get p value for a point
#inputs: testDataPoint = the datapoint you want a p value for
#currentLabel = the corresponding label for the current data point
#otherData = the other datapoints to compare your current data to
#otherLabels = corresponding labels for the other Data points
def pValues(testDatapoint, predictedLabel,trainingData, trainingLabels):
    fullDataArray = np.concatenate(([testDatapoint], trainingData))
    fullLabelArray = np.concatenate(([predictedLabel], trainingLabels))
    conformityScores = np.zeros(fullLabelArray.shape[0])
    #obtain an array of all conformity scores
    #get the first value
    currentData = fullDataArray[0]
    currentLabel = fullLabelArray[0]
    otherData = fullDataArray[:][1:]
    otherLabels = fullLabelArray[:][1:]
    conformityScores[0] = conformityPredictor(currentData,currentLabel, otherData, otherLabels)
    #get the other values
    for i in range(fullDataArray.shape[0])[1:]:
        currentData = fullDataArray[i]
        currentLabel = fullLabelArray[i]
        otherData = np.concatenate(([fullDataArray[i-1]],fullDataArray[:][i+2:]))
        otherLabels = np.concatenate(([fullLabelArray[i-1]],fullLabelArray[:][i+2:]))
        conformityScores[i] = conformityPredictor(currentData,currentLabel, otherData, otherLabels)
    #set the var to hold the pValue 
    pValue = 0
    #loop through all of the conformity scores and if the conformity score is higher or equal to another then add 1 to pVal
    #use of >= allows for Modified competition ranking, compared to Standard competition ranking which is achieved by >
    for i in range(conformityScores.shape[0])[1:]:
        if conformityScores[0] >= conformityScores[i]:
            pValue += 1
    pValue = pValue/conformityScores.shape[0]
    return pValue

In [9]:
#get p values for an array of points
#inputs: testDataPoint = the datapoints you want a p values for
#currentLabel = the corresponding labels for the test datapoints
#otherData = the other datapoints to compare your current data to
#otherLabels = corresponding labels for the other Data points
def pValuesForGroup(testDatapoints, predictedLabels,trainingData, trainingLabels):
    pValuesArray = np.zeros(testDatapoints.shape[0])
    #loop through all points
    for i in range(testDatapoints.shape[0]):
        #fetch all of the p values for each test data point
        pValuesArray[i] = pValues(testDatapoints[i],predictedLabels[i], trainingData, trainingLabels)
    return pValuesArray

In [10]:
#get the false p value
#inputs: pValues = an array of p values
#predicted labels = the predicted labels which correspond to the p values
#true labels = the true labels which correspond to the p values
def falsePValues(pValues, predictedLabels, trueLabels):
    #loop through all values
    falsePredictions = 0
    sumPVals = 0
    #loop through all p values
    for i in range(pValues.shape[0]):
        #if the predicted label was false then sum add up the the p values, then divide by the number of false predictions
        if predictedLabels[i] == trueLabels[i]:
            falsePredictions += 1
            sumPVals += pValues[i]
    return sumPVals/falsePredictions

# Tests on iris dataset

In [11]:
#load the iris dataset
from sklearn.datasets import load_iris
iris = load_iris()
#alt method
#irisData = np.genfromtxt("iris_data.txt")
#irisTarget = np.genfromtxt("iris_target.txt")

In [12]:
splitData = dataSplit(iris['data'],iris['target'],0.25,10)

# 1NN

In [13]:
predictedLabels = applyToTestSet(splitData[0],splitData[2],splitData[1], 1)
print(predictedLabels)

[1. 2. 0. 1. 0. 1. 2. 1. 0. 1. 1. 2. 1. 0. 0. 2. 1. 0. 0. 0. 2. 2. 2. 0.
 1. 0. 1. 1. 1. 2. 1. 1. 2. 2. 2. 0. 2.]


In [14]:
print(splitData[3])
print(predictionAccuracy(predictedLabels,splitData[3]))

[1. 2. 0. 1. 0. 1. 1. 1. 0. 1. 1. 2. 1. 0. 0. 2. 1. 0. 0. 0. 2. 2. 2. 0.
 1. 0. 1. 1. 1. 2. 1. 1. 2. 2. 2. 0. 2.]
0.972972972972973


# Average False P Value for 1NN applied to iris.txt

In [15]:
arrayOfPValues = pValuesForGroup(splitData[1], predictedLabels, splitData[0], splitData[2])
print(arrayOfPValues)

[0.87719298 0.26315789 0.92982456 0.81578947 0.59649123 0.40350877
 0.11403509 0.5        0.51754386 0.16666667 0.87719298 0.02631579
 0.59649123 0.11403509 0.73684211 0.83333333 0.28947368 0.6754386
 0.10526316 0.16666667 0.20175439 0.94736842 0.61403509 0.51754386
 0.34210526 0.22807018 0.37719298 0.60526316 0.22807018 0.78070175
 0.11403509 0.8245614  0.60526316 0.16666667 0.77192982 0.59649123
 0.19298246]


In [16]:
print(falsePValues(arrayOfPValues, predictedLabels, splitData[3]))

0.48903508771929816


# 3NN

In [17]:
predictedLabels = applyToTestSet(splitData[0],splitData[2],splitData[1], 3)
print(predictedLabels)

[1. 2. 0. 1. 0. 1. 2. 1. 0. 1. 1. 2. 1. 0. 0. 2. 1. 0. 0. 0. 2. 2. 2. 0.
 1. 0. 1. 1. 1. 2. 1. 1. 2. 2. 2. 0. 2.]


In [18]:
print(splitData[3])
print(predictionAccuracy(predictedLabels,splitData[3]))

[1. 2. 0. 1. 0. 1. 1. 1. 0. 1. 1. 2. 1. 0. 0. 2. 1. 0. 0. 0. 2. 2. 2. 0.
 1. 0. 1. 1. 1. 2. 1. 1. 2. 2. 2. 0. 2.]
0.972972972972973


# Tests on ionosphere dataset

In [19]:
#import the ionosphere
X = np.genfromtxt("ionosphere.txt", delimiter=",")
#seperate the labels from data
ionosphereData = X[:,0:33]
ionosphereLabels = X[:,34]

In [20]:
splitData = dataSplit(ionosphereData,ionosphereLabels,0.25,10)

# 1NN

In [21]:
predictedLabels = applyToTestSet(splitData[0],splitData[2],splitData[1], 1)
print(predictedLabels)

[-1.  1. -1.  1.  1.  1.  1. -1.  1. -1.  1. -1.  1. -1.  1.  1.  1.  1.
  1.  1.  1. -1.  1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.
  1.  1.  1.  1.  1.  1. -1.  1.  1. -1.  1.  1.  1. -1.  1.  1.  1.  1.
  1.  1.  1.  1.  1.  1. -1. -1.  1.  1.  1.  1.  1.  1.  1. -1. -1.  1.
 -1.  1.  1. -1.  1. -1.  1. -1.  1. -1.  1.  1.  1.  1.  1.]


In [22]:
print(splitData[3])
print(predictionAccuracy(predictedLabels,splitData[3]))

[-1.  1. -1.  1. -1.  1.  1. -1.  1. -1.  1.  1.  1. -1. -1. -1.  1.  1.
 -1.  1.  1. -1.  1.  1.  1.  1.  1.  1. -1.  1.  1. -1.  1.  1.  1.  1.
  1.  1.  1.  1.  1.  1. -1.  1.  1. -1.  1.  1. -1. -1.  1.  1.  1.  1.
  1.  1.  1. -1. -1.  1. -1. -1.  1. -1.  1.  1.  1.  1.  1. -1. -1. -1.
 -1. -1.  1. -1. -1. -1.  1. -1. -1. -1.  1.  1. -1.  1.  1.]
0.8275862068965517


# Average False P Value for 1NN applied to ionosphere.txt

In [None]:
arrayOfPValues = pValuesForGroup(splitData[1], predictedLabels, splitData[0], splitData[2])
print(arrayOfPValues)

In [None]:
print(falsePValues(arrayOfPValues, predictedLabels, splitData[3]))

# 3NN

In [None]:
predictedLabels = applyToTestSet(splitData[0],splitData[2],splitData[1], 3)
print(predictedLabels)

In [None]:
print(splitData[3])
print(predictionAccuracy(predictedLabels,splitData[3]))