# Assignment 1, implementation of KNN

In [21]:
import numpy as np
import math
import time

## Part 1
Loading data into Python

In [22]:
#load iris data
from sklearn.datasets import load_iris
iris = load_iris()

In [23]:
#now importing ionosphere data

#34 features so usecols=np.arrange(34) take first 34 as column
X = np.genfromtxt("ionosphere.txt", delimiter=",",
usecols=np.arange(34))

#each label is of type int, features are not. Only labels will be generated
y = np.genfromtxt("ionosphere.txt", delimiter=",",
usecols=34, dtype='int')

## Part 2
Splitting the data into training and test sets

In [24]:
# split iris
from sklearn.model_selection import train_test_split
X_train_iris, X_test_iris, y_train_iris, y_test_iris = train_test_split(iris['data'],
iris['target'], random_state=0)

In [25]:
# split ionosphere
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,
y, random_state=23) #random state 23 yileds lower false-p value

In [26]:
def knn_fit(features, labels):

    columns = features.shape[1] + 1
    length = len(features)
    labelled_samples = np.zeros((length,columns))
    for x in range(length):
        labelled_samples[x] = np.concatenate([features[x], [labels[x]]])
    return labelled_samples


# Part 3
Implementing the Nearest Neighbour method. 

Reasoning; the average difference between all 34 features in ionoshpere provides x co-ordinate (4 features for iris). The euclidean distance is calculated using x value and x* values.

In [27]:
"""takes two vectors sample[x,y] and neighbour[x,y]
   takes one scalar 'l' of type int, limiting number of attributes i.e. not including labels
"""
#calculates euclidean distance
def distance(x1, x2, num_of_features):
    distance = 0
    for i in range(num_of_features):
        distance += ((x1[i] - x2[i])**2)
    return math.sqrt(distance)
    

In [28]:
#finds distance from nearest neighbour 

def calc_nearest(distance_list):
    current_minimum = math.inf
    length = len(distance_list)
    index = math.inf
    for x in range(length):
        if current_minimum > distance_list[x]:
            current_minimum = distance_list[x]
            index = x
    return index #returns index of min distance

In [29]:
#predictions

def predict(test_sample, train_set, num_of_features):
    distance_list = []
    length = len(train_set)
    for x in range(length):
        #calls distance() and appends to distance_list
        distance_list.append(distance(test_sample, train_set[x], num_of_features))
    nearest_neighbour = calc_nearest(distance_list) #index of neareset neighbour in training set 
    predicted_label = train_set[nearest_neighbour][-1]
    return predicted_label

In [30]:
#ERROR RATE
def test_error_rate(predictions, actual_labels):
    return np.mean(predictions != actual_labels)

## Conformity Measure
the divide(a, b) function in the cell below returns the conformity score

* **0/x = 0**

* **x/0 = infinity**

* **0/0 = 0** returns an average false p-value of 0.04

* **0/0 = 1** the average false p-value for Iris improves from 0.04 to 0.01, hence I have chosen this. It is also mathematically sound in that x/x = x

In [44]:
def divide(numerator, denominator):

    if numerator and denominator == 0:
        return 1
    elif numerator == 0:
        return 0
    elif denominator == 0:
        return math.inf
    else:
        return numerator/denominator


## Calculate confirmity scores for entire training set
Calculate the scores for the whole training set and store them.

In [32]:
# CONFORMITY SCORES
def conformity(neighbours_set, num_features):

    length = len(neighbours_set)
    conformity_scores = []
    for x in range(length):
        n = 0
        distance_list = []
        distance_to_diff = 1
        diff_flag = 0
        distance_to_same = 1
        same_flag = 0
        conf_score = 0
        for i in range(length):
            if x == i:
                n = i #store the index of the neighbour being operated on
            distance_list.append(distance(neighbours_set[x], neighbours_set[i], num_features))
        
        current_minimum = math.inf
        index = 0 #index of nearest neighbour
        for j in range(length):
            if current_minimum > distance_list[j]:
                if j != n: #ensure not storing distance to itself
                    current_minimum = distance_list[j]
                    index = j
            
        if neighbours_set[index][-1] == neighbours_set[x][-1]:
            distance_to_same = current_minimum
            same_flag = 1

        else:
            distance_to_diff = current_minimum
            diff_flag = 1

        
        if same_flag == 1:
            current_minimum = math.inf
            for j in range(length):
                if current_minimum > distance_list[j]:
                    if j != n and neighbours_set[j][-1] != neighbours_set[x][-1]: #ensure labels aren't the same
                        current_minimum = distance_list[j]
            distance_to_diff = current_minimum

        
        if diff_flag == 1:
            current_minimum = math.inf
            for j in range(length):
                if current_minimum > distance_list[j]:
                    if j != n and neighbours_set[j][-1] == neighbours_set[x][-1]: #ensure labels are the same
                        current_minimum = distance_list[j]            
            distance_to_same = current_minimum

        conf_score = divide(distance_to_diff, distance_to_same)
        conformity_scores.append(conf_score)

    return conformity_scores

## Calculate p-value
Re-compute conformity score for single test_sample\[label\], the nearest neighbour of the same, and the nearest neighbour of different. 

Rank *test_sample\[label\]*

Return *rank  /  n +1*

In [33]:
def calculate_p_val(conformity_set, train_set, p_training_set):   
    #shape of training_set arr[item][features, features, label, conformity_score]
    
    #CALCULATE NEW CONFORMITY SCORE FOR ADDED LABEL
        #if p_training_set[x][-1] == p_training_set[-1][-1]
    
    length = len(conformity_set) #length of original set. Ignores test_item 
    conformity_score_for_label = 0
    num_features = len(train_set[0])-1
    
    distance_list = []
    distance_to_diff = 1
    diff_flag = 0
    distance_to_same = 1
    same_flag = 0
    conf_score = 0
    for i in range(length):
        distance_list.append(distance(train_set[i], p_training_set[-1], num_features))

    current_minimum = math.inf
    index = 0 #index of nearest neighbour
    for j in range(length):
        if current_minimum > distance_list[j]:
            current_minimum = distance_list[j]
            index = j

    if train_set[index][-1] == p_training_set[-1][-1]:
        distance_to_same = current_minimum
        same_flag = 1

    else:
        distance_to_diff = current_minimum
        diff_flag = 1

    index_to_same = 0
    index_to_diff = 0
    if same_flag == 1:
        index_to_same = index
        current_minimum = math.inf
        for j in range(length):
            if current_minimum > distance_list[j]:
                if train_set[j][-1] != p_training_set[-1][-1]: #ensure labels aren't the same
                    current_minimum = distance_list[j]
                    index_to_diff = j
        distance_to_diff = current_minimum


    if diff_flag == 1:
        index_to_diff = index
        current_minimum = math.inf
        for j in range(length):
            if current_minimum > distance_list[j]:
                if train_set[j][-1] == p_training_set[-1][-1]: #ensure labels are the same
                    current_minimum = distance_list[j]
                    index_to_same = j
        distance_to_same = current_minimum

    conf_score = divide(distance_to_diff, distance_to_same)
    #update conformity set
    
    rank = length+1
    for x in range(length):
        if conf_score <= conformity_set[x]:
            rank -= 1
    
    rank /= (length +1)
    return rank

## Calculate false p-value
Sum total p-values ignoring p_value of true label

In [41]:
# calculate false p value for each sample

def p_val(conformity_set, train_set, test_sample, labels_set,  true_label):
    average_p = 0
    
    l = len(labels_set)

    train_set_plus1 = len(train_set)+1
    
    false_p = 0
    for i in range(len(labels_set)):
        p_training_set = np.empty(train_set.shape)
        test_sample[-1]=labels_set[i]
        p_training_set = np.concatenate([train_set, [test_sample]])
        new_p = calculate_p_val(conformity_set, train_set, p_training_set)
        if true_label != labels_set[i]:
            false_p += new_p
    average_p = false_p / (l-1)
    return average_p #(per row of samples)

# MAIN

## IRIS

In [45]:
start = time.time()
#constants
num_of_features = len(X_test_iris[0])
num_of_samples = len(y_test_iris)
actual_labels = y_test_iris
#train 
iris_train_set = knn_fit(X_train_iris, y_train_iris)
iris_test_set = knn_fit(X_test_iris, y_test_iris)

#predict
predictions = []
for x in range(num_of_samples):
    predictions.append(int(predict(iris_test_set[x], iris_train_set, num_of_features)))


# SCORE
print('test error rate: '+str(test_error_rate(predictions, actual_labels))+'\n')

# Calculate conformity scores for iris_train_set
conformity_set = conformity(iris_train_set, num_of_features)


labels_set = [0,1,2]
iris_test_set_labels = iris_test_set
false_p_value =0
temp_p_value = 0

# for all predictions, append false p values for all test samples
for x in range(len(predictions)):
    temp_p_value += p_val(conformity_set, iris_train_set, iris_test_set[x], labels_set, actual_labels[x])
false_p_value = (temp_p_value / len(predictions))

print('average false p value '+str(false_p_value))
print('Iris ran in ',time.time() - start, ' seconds')

test error rate: 0.02631578947368421

average false p value 0.011178388448998608
Iris ran in  0.15470170974731445  seconds


## IONESPHERE

In [46]:
start = time.time()
#constants
num_of_features = len(X_test[0])
num_of_samples = len(y_test)
actual_labels = y_test
#train 
iono_train_set = knn_fit(X_train, y_train)
iono_test_set = knn_fit(X_test, y_test)

#predict
predictions = []
for x in range(num_of_samples):
    predictions.append(int(predict(iono_test_set[x], iono_train_set, num_of_features)))


# SCORE
print('test error rate: '+str(test_error_rate(predictions, actual_labels))+'\n')

# Calculate conformity scores for iono_train_set
conformity_set = conformity(iono_train_set, num_of_features)

labels_set = [1,-1]
false_p_value =0
temp_p_value = 0

# for all predictions, append false p values for all test samples
for x in range(len(predictions)):
    temp_p_value += p_val(conformity_set, iono_train_set, iono_test_set[x], labels_set, actual_labels[x])
false_p_value = (temp_p_value / len(predictions))

print('average false p value '+str(false_p_value))
print('Ionosphere ran in ',time.time() - start, ' seconds')


test error rate: 0.07954545454545454

average false p value 0.04739152892561988
Ionosphere ran in  2.905355453491211  seconds
