# Conformal prediction implementation

Here we handle the 0/0 situation by setting the score to Zero. 0/0 situation can arise only when there are training datapoints of different labels at the same location as the test datapoint. Given the situation it is highly likely that the label datapoint will have same label as one of them. Setting the conformity score to lowest possible value ensures that all these datapoints they have the highest possible pessimistic rank and the same p-value which indicates equal probability of the label being same as the test data points.

In below implmentation some superflous calculations were avoided, such as :
1. Repetative calculation of distances for test set for each iteration of postulated label:   
    This has been handled by creating as distance matrix of distance to nearest point of same label and different label
    at the beginning for all points in y_train.
    On further iterations over the test datapoints a copy of matrix is made and the distances are updated only if the test data
    point is the nearest point of same label or different label.
2. Calculation of scores for all datapoints:   
    It is unnecessary to calculate the ranks for each conformity score as we only need the rank of the test datapoint. This has 
    been acheived by incrementing the rank while comparing the conformity scores if they are found to be lower or equal to the 
    caluated conformity score of the test datapoint for the considered postulated label.

In [20]:
#Declaring all import statements
import numpy as np
import math

In [21]:
#Function to calculate and return Euclidean Distance between two points
#Input: arr1(numpy array), arr2(numpy array)
def distance(arr1,arr2):
    dist_sq=0
    for i in range(len(arr1)):
        dist_sq += (arr1[i] - arr2[i])**2
    dist = np.sqrt(dist_sq)
    return dist

In [22]:
#Function to calculate p-values for each label for sample passed as input
#conformity score calculated by the formula:
#(the distance to the nearest sample of a different class/the distance to the nearest sample of the same class)
#we pass the X_train,y_train and the default distance matrix for the test set along with the sample
def pvalue(sample,distances,X_train,y_train):
    labels = np.unique(y_train)
    label_count = len(labels)
    scores = np.zeros(np.shape(y_train)[0]+1)
    pvalues = np.zeros(label_count)
    for i in range(len(labels)):
        #Considering as postulated label
        postulated_label = i
        distances_run = np.copy(distances)
        for j in range(len(X_train)):
            dist = 0
            distance_to_nearest_diff = math.inf
            distance_to_nearest_same = math.inf
            if postulated_label == y_train[j]:
                dist = distance(sample,X_train[j])
                if dist <  distance_to_nearest_same :
                    distance_to_nearest_same = dist
            else:
                dist = distance(sample,X_train[j])
                if dist <  distance_to_nearest_diff :
                    distance_to_nearest_diff = dist      
            if distance_to_nearest_diff < distances_run[j][0]:
                distances_run[j][0] =   distance_to_nearest_diff
            if distance_to_nearest_same < distances_run[j][1]: 
                distances_run[j][1] =    distance_to_nearest_same       
        dist=0
        distance_to_nearest_diff = math.inf
        distance_to_nearest_same = math.inf
        for j in range(len(X_train)):
            if postulated_label == y_train[j]:
                dist = distance(sample,X_train[j])
                if dist <  distance_to_nearest_same :
                    distance_to_nearest_same = dist
            else:
                dist = distance(sample,X_train[j])
                if dist <  distance_to_nearest_diff :
                    distance_to_nearest_diff = dist
            distances_run[len(X_train)][0] =    distance_to_nearest_diff
            distances_run[len(X_train)][1] =    distance_to_nearest_same
        for j in range(len(X_train)+1):
            #Handling Zero Division errors
            if (distances_run[j][0] == 0) and (distances_run[j][1] == 0):
                scores[j] = 0
            if distances_run[j][1] == 0 :
                scores[j] = math.inf
            elif distances_run[j][0] == 0 :
                scores[j] = 0
            else : 
                scores[j] = ( distances_run[j][0]/ distances_run[j][1])
        rank = 1
        sample_score = scores[len(X_train)]
        for l in range(len(X_train)):
            if (sample_score > scores[l] ) or (sample_score == scores[l]) :
                rank += 1
            p = rank/(len(X_train)+1)
            pvalues[i] = p        
            
    return pvalues
        
#The function is call to return the conformal predictions    
#Below function takes the samples(s) as input and returns a matrix of p-value(s) for each sample per postulated label. 
#Each column representing each label
#It calls the pvalue function defined above to calculate the p-values
#parameters to pass while calling the function are set of new sample(s),X_train,y_train
def conformal_predictor(new_sample,X_train,y_train):
    labels = np.unique(y_train)
    label_count = len(labels)
    distances = np.zeros((len(X_train)+1,2))
    scores = np.zeros(np.shape(y_train)[0]+1)
    pvalues = np.zeros(label_count)
    for j in range(len(X_train)):
        dist = 0
        distance_to_nearest_diff = math.inf
        distance_to_nearest_same = math.inf
        for k in range(len(X_train)):
            if j != k:
                if y_train[j] == y_train[k]:
                    dist = distance(X_train[j],X_train[k])
                    if dist <  distance_to_nearest_same :
                        distance_to_nearest_same = dist
                else:
                    dist = distance(X_train[j],X_train[k])
                    if dist <  distance_to_nearest_diff :
                        distance_to_nearest_diff = dist
        distances[j][0] = distance_to_nearest_diff
        distances[j][1] = distance_to_nearest_same
    distances[len(X_train)][0] = math.inf
    distances[len(X_train)][1] = math.inf    
    
    length = len(new_sample)
    results = []
    for i in range(length):
        sample = new_sample[i]
        result = pvalue(sample,distances,X_train,y_train)
        results.append(result)
   

    return results

# IRIS dataset prediction using Conformal Prediction implementation

In [23]:
#Loading Iris dataset
from sklearn.datasets import load_iris
iris_data = load_iris()

In [24]:
#Splitting the IRIS dataset into train and test data sets using random_state
from sklearn.model_selection import train_test_split
Iris_X_train, Iris_X_test, Iris_y_train, Iris_y_test = train_test_split(iris_data.data, iris_data.target,random_state=100)

In [25]:
#Calling conformal prediction function for the test set of IRIS dataset
cp_iris = conformal_predictor(Iris_X_test,Iris_X_train,Iris_y_train)
print("Matrix of p-values:\n")
print(cp_iris)
print("\n")

#Following loop calculates the total false p-value for predictions on the test dataset
p_error =0
count = 0
labels = np.unique(iris_data.target)
for i in range(len(Iris_y_test)):
    for j in range(len(labels)):
        if labels[j] != Iris_y_test[i]:
            p_error += cp_iris[i][j]
            count += 1 
            
# Calculating the average False p Value             
avg_false_p_value = p_error/count
print("The average false p-value for the Nearest Neighbour conformal predictor applied to the IRIS dataset:", avg_false_p_value )

Matrix of p-values:

[array([0.00884956, 0.00884956, 0.73451327]), array([0.74336283, 0.00884956, 0.00884956]), array([0.00884956, 0.00884956, 0.54867257]), array([0.71681416, 0.00884956, 0.00884956]), array([0.00884956, 0.00884956, 0.17699115]), array([0.00884956, 0.00884956, 0.38053097]), array([0.7079646 , 0.00884956, 0.00884956]), array([0.81415929, 0.00884956, 0.00884956]), array([0.00884956, 0.00884956, 0.59292035]), array([0.75221239, 0.00884956, 0.00884956]), array([0.88495575, 0.00884956, 0.00884956]), array([0.00884956, 0.00884956, 0.32743363]), array([0.74336283, 0.00884956, 0.00884956]), array([0.86725664, 0.00884956, 0.00884956]), array([0.00884956, 0.00884956, 0.47787611]), array([0.00884956, 0.46902655, 0.00884956]), array([0.00884956, 0.2920354 , 0.00884956]), array([0.00884956, 0.07079646, 0.02654867]), array([0.00884956, 0.00884956, 0.73451327]), array([0.00884956, 0.00884956, 0.59292035]), array([0.00884956, 0.00884956, 0.45132743]), array([0.69911504, 0.00884956, 0.

# Checking the validity of the conformal predictor predicting IRIS Dataset

In [26]:
#In this case while validating we are considering the label with highest p-value as the predicted label
validate = np.empty(len(Iris_y_test))
for i in range(len(Iris_y_test)):
    p = 0
    predicted_label = 0
    for j in range(len(labels)):
        if cp_iris[i][j] > p :
            p = cp_iris[i][j]
            predicted_label = j
    if Iris_y_test[i] == predicted_label:
        validate[i] = 1
    else:
        validate[i] = 0
print("The accuracy of the predictions of IRIS Dataset using Conformal Prediction is :", np.mean(validate)*100,"%")        
            

The accuracy of the predictions of IRIS Dataset using Conformal Prediction is : 97.36842105263158 %


# Ionosphere dataset prediction using Conformal Prediction implementation

In [27]:
#Loading ionosphere data
ion = np.genfromtxt("ionosphere.txt", delimiter=",")
ion[1:33]
data = ion[:,0:34]
target = ion[:,34]

In [28]:
#Splitting the Ionosphere dataset to train and test data sets using random_state
from sklearn.model_selection import train_test_split
Ion_X_train, Ion_X_test, Ion_y_train, Ion_y_test = train_test_split(data,target,random_state=100)

In [29]:
#Calling conformal prediction function for the test set of Ionosphere dataset
cp_ion = conformal_predictor(Ion_X_test,Ion_X_train,Ion_y_train)
p_error =0
count = 0
labels = np.unique(target)

#Following loop calculates the total false p-value for the predictions on test dataset
for i in range(len(Ion_y_test)):
    for j in range(len(labels)):
        if labels[j] != Ion_y_test[i]:
            p_error += cp_ion[i][j]
            count += 1
            
# Calculating the average False p Value              
avg_false_p_value = p_error/count
print("The average false p-value for the Nearest Neighbour conformal predictor applied to ionosphere.txt:", avg_false_p_value )

The average false p-value for the Nearest Neighbour conformal predictor applied to ionosphere.txt: 0.04773588154269979


# Checking the validity of the conformal predictor predicting Ionosphere Dataset

In [30]:
#In this case while validating we are considering the label with highest p-value as the predicted label
validate = np.empty(len(Ion_y_test))
for i in range(len(Ion_y_test)):
    p = 0
    predicted_label = 0
    for j in range(len(labels)):
        if cp_ion[i][j] > p :
            p = cp_ion[i][j]
            predicted_label = j
    if Ion_y_test[i] == predicted_label:
        validate[i] = 1
    else:
        validate[i] = 0
print("The accuracy of the predictions of Ionosphere Dataset using Conformal Prediction is :", np.mean(validate)*100,"%")        
            

The accuracy of the predictions of Ionosphere Dataset using Conformal Prediction is : 63.63636363636363 %


# Experimenting with Conformal prediction with Conformity score = Distance to nearest sample of different class

In [31]:
#Function to calculate p-values for each label for sample passed as input
#conformity score is the distance to the nearest sample of a different class
#we pass the X_train, y_train and the default distance matrix for the test set along with the sample
def pvalue2(sample,distances,X_train,y_train):
    labels = np.unique(y_train)
    label_count = len(labels)
    scores = np.zeros(np.shape(y_train)[0]+1)
    pvalues = np.zeros(label_count)
    for i in range(len(labels)):
        #Considering as postulated label
        postulated_label = i
        distances_run = np.copy(distances)
        for j in range(len(X_train)):
            dist = 0
            distance_to_nearest_diff = math.inf
            if postulated_label != y_train[j]:
                dist = distance(sample,X_train[j])
                if dist <  distance_to_nearest_diff :
                    distance_to_nearest_diff = dist      
            if distance_to_nearest_diff < distances_run[j][0]:
                distances_run[j][0] =   distance_to_nearest_diff    
        dist=0
        distance_to_nearest_diff = math.inf
        for j in range(len(X_train)):
            if postulated_label != y_train[j]:
                dist = distance(sample,X_train[j])
                if dist <  distance_to_nearest_diff :
                    distance_to_nearest_diff = dist
            distances_run[len(X_train)][0] =    distance_to_nearest_diff
        scores = np.copy(distances_run)
        rank = 1
        sample_score = scores[len(X_train)]
        for l in range(len(X_train)):
            if (sample_score > scores[l] ) or (sample_score == scores[l]) :
                rank += 1
            p = rank/(len(X_train)+1)
            pvalues[i] = p        
            
    return pvalues
        
#The function is call to return the conformal predictions    
#Below function takes the samples(s) as input and returns a matrix of p-value(s) for each sample per postulated label. 
#Each column representing each label
#It calls the pvalue function defined above to calculate the p-values
#parameters to pass while calling the function are set of new sample(s),X_train,y_train
def conformal_predictor2(new_sample,X_train,y_train):
    labels = np.unique(y_train)
    label_count = len(labels)
    distances = np.zeros((len(X_train)+1,1))
    scores = np.zeros(np.shape(y_train)[0]+1)
    pvalues = np.zeros(label_count)
    for j in range(len(X_train)):
        dist = 0
        distance_to_nearest_diff = math.inf
        distance_to_nearest_same = math.inf
        for k in range(len(X_train)):
            if j != k:
                if y_train[j] != y_train[k]:
                    dist = distance(X_train[j],X_train[k])
                    if dist <  distance_to_nearest_diff :
                        distance_to_nearest_diff = dist
        distances[j][0] = distance_to_nearest_diff
    distances[len(X_train)][0] = math.inf    
    
    length = len(new_sample)
    results = []
    for i in range(length):
        sample = new_sample[i]
        result = pvalue2(sample,distances,X_train,y_train)
        results.append(result)
   

    return results

Checking the scores for Iris dataset 

In [32]:
cp2_iris = conformal_predictor2(Iris_X_test,Iris_X_train,Iris_y_train)
print("Matrix of p-values:\n")
print(cp2_iris)
print("\n")

#Following loop calculates the total false p-value for predictions on the test dataset
p_error =0
count = 0
labels = np.unique(iris_data.target)
for i in range(len(Iris_y_test)):
    for j in range(len(labels)):
        if labels[j] != Iris_y_test[i]:
            p_error += cp2_iris[i][j]
            count += 1 
            
# Calculating the average False p Value             
avg_false_p_value = p_error/count
print("The average false p-value for the Nearest Neighbour conformal predictor applied to the IRIS dataset:", avg_false_p_value )

Matrix of p-values:

[array([0.01769912, 0.01769912, 0.36283186]), array([0.72566372, 0.03539823, 0.03539823]), array([0.16814159, 0.16814159, 0.97345133]), array([0.99115044, 0.08849558, 0.08849558]), array([0.01769912, 0.01769912, 0.07964602]), array([0.15044248, 0.15044248, 0.65486726]), array([0.97345133, 0.0619469 , 0.0619469 ]), array([0.72566372, 0.01769912, 0.01769912]), array([0.01769912, 0.01769912, 0.30088496]), array([0.71681416, 0.01769912, 0.01769912]), array([0.88495575, 0.01769912, 0.01769912]), array([0.04424779, 0.04424779, 0.38938053]), array([0.69911504, 0.01769912, 0.01769912]), array([0.82300885, 0.01769912, 0.01769912]), array([0.03539823, 0.03539823, 0.40707965]), array([0.01769912, 0.27433628, 0.01769912]), array([0.14159292, 0.56637168, 0.14159292]), array([0.04424779, 0.08849558, 0.04424779]), array([0.03539823, 0.03539823, 0.83185841]), array([0.01769912, 0.01769912, 0.38053097]), array([0.0619469 , 0.0619469 , 0.53982301]), array([0.71681416, 0.04424779, 0.

In [33]:
#In this case while validating we are considering the label with highest p-value as the predicted label
validate = np.empty(len(Iris_y_test))
for i in range(len(Iris_y_test)):
    p = 0
    predicted_label = 0
    for j in range(len(labels)):
        if cp2_iris[i][j] > p :
            p = cp2_iris[i][j]
            predicted_label = j
    if Iris_y_test[i] == predicted_label:
        validate[i] = 1
    else:
        validate[i] = 0
print("The accuracy of the predictions of IRIS Dataset using Conformal Prediction is :", np.mean(validate)*100,"%")        
            

The accuracy of the predictions of IRIS Dataset using Conformal Prediction is : 94.73684210526315 %


# Observations about the dataset

There are duplicate datapoints in the IRIS dataset

In [34]:
iris_data.data[101]

array([5.8, 2.7, 5.1, 1.9])

In [35]:
iris_data.data[142]

array([5.8, 2.7, 5.1, 1.9])

In [36]:
iris_data.target[101]

2

In [37]:
iris_data.target[142]

2

The second column in ionosphere dataset has no variations

In [38]:
ion[:,1]

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0.

# Results at a glance


– the average false p-value for the Nearest Neighbour conformal predictor applied to the iris dataset: 0.010829063809967407

– the average false p-value for the Nearest Neighbour conformal predictor applied to ionosphere.txt:  0.04429235537190089

# Observations on method

The implementation of conformal prediction is compute intensive and takes longer as the number of observations and possible labels increases.

There is not much difference in accuracy of prediction on the Iris dataset using Conformal Prediction with different conformity scores. 

# References

1. https://scikit-learn.org/stable/index.html    

2. https://numpy.org/doc/stable/

3. https://docs.python.org/3/library/math.html