In [None]:
novel_class = 1 #choose which class to exclude

'''Import data and create novel class'''
import numpy as np
import sys

X_features = np.load("X.npy")
Y_features = np.load("Y.npy")

'''Display all classes'''
categories = np.unique(Y_features)
print ("Classes: %s" %' '.join(map(str, categories)))
categories = np.delete(categories, novel_class)
print ("Novel class: %s" %(novel_class))
print ("Classes after removal of novel: %s" %' '.join(map(str, categories)))

In [None]:
X_novelties = np.load("X.npy")
Y_novelties = []
for i in range(len(X_novelties)):
Y_novelties.append(novel_class)
Y_novelties = np.array(Y_novelties)
X_features = np.concatenate((X_dataset, X_novelties))
Y_features = np.concatenate((Y_dataset, Y_novelties))

In [None]:
'''Split the known classes into test sets'''
from sklearn.model_selection import train_test_split
X_TR, X_TE, Y_TR, Y_TE = train_test_split(X_features, Y_features, test_size=0.2, random_state=555, stratify=Y_features)
X_TEK, X_TEM, Y_TEK, Y_TEM = train_test_split(X_TE, Y_TE, test_size=0.5, random_state=555, stratify=Y_TE)

Y_train, X_train, Y_test_known, X_test_known = [],[],[],[] #New arrays for deleting novel data
for i in range(len(Y_TEK)): #creating testing set without novel data
    if Y_TEK[i] != novel_class:
        Y_test_known.append(Y_TEK[i])
        X_test_known.append(X_TEK[i])
for i in range(len(Y_TR)): #creating training test without novel data
    if Y_TR[i] != novel_class:
        Y_train.append(Y_TR[i])
        X_train.append(X_TR[i])
Y_test_mix, X_test_mix = Y_TEM, X_TEM

In [None]:
'''2nd Run: Take away novel data from mixed dataset'''
Y_test_mix, X_test_mix = [],[]
for i in range(len(Y_TEM)):
    if Y_TEM[i] != novel_class:
        Y_test_mix.append(Y_TEM[i])
        X_test_mix.append(X_TEM[i])

In [None]:
'''Train and fit an GP'''
from sklearn.multiclass import OneVsRestClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
import time

start = time.time()
clf = OneVsRestClassifier(GaussianProcessClassifier(kernel=1.0 * RBF(length_scale=1.0), random_state=555))
clf.fit(X_train,Y_train)
end = time.time()
print(end - start)

'''Predicting test data and calculating probabilites'''
start = time.time()
Y_pred_known = clf.predict(X_test_known)
Y_score_known = clf.predict_proba(X_test_known)
Y_PM = clf.predict(X_test_mix)
Y_score_mix = clf.predict_proba(X_test_mix)
end = time.time()
print(end - start)

In [None]:
'''Binarize labels'''
from sklearn.preprocessing import label_binarize

n_classes = len(categories)
Y_test_known = label_binarize(Y_test_known, classes=categories) #classes: skipping novel
Y_train = label_binarize(Y_train, classes=categories) #classes: skipping novel
Y_TEM = label_binarize(Y_test_mix, classes=range(n_classes+1)) #classes: including novel
Y_pred_known = label_binarize(Y_pred_known, classes=categories) #classes: skipping novel
Y_PM = label_binarize(Y_PM, classes=categories) #classes: skipping novel

In [None]:
'''Rearranging the novel class to the end of the Y_test_mix-label'''
Y_TEM_R = []
for i in range(len(Y_TEM)):
    new_ = []
    new_.extend(Y_TEM[i][:novel_class])
    new_.extend(Y_TEM[i][(novel_class+1):])
    new_.append(Y_TEM[i][novel_class])
    Y_TEM_R.append(new_)
Y_test_mix = np.array(Y_TEM_R)

In [None]:
'''Deciding thresholds for each class'''
threshold_array = []
for i in range(len(categories)): #going through all but the novel class
    max_score_known = []
    for j in range(len(Y_pred_known)):
        max_score_known.append(max(Y_score_known[j]))
    max_score_mix = []
    for j in range(len(Y_PM)):
        max_score_mix.append(max(Y_score_mix[j]))
        
    mean_mix = np.mean(max_score_mix)
    mean_known = np.mean(max_score_known)
    sorted_mixed = np.sort(max_score_mix)
    times = 0
    threshold = 0
    if mean_known < mean_mix:
    print ('Mean with novels is higher than mixed: %f vs %f' %(mean_mix, mean_known))
    while mean_mix < mean_known:
        threshold = sorted_mixed[0]
        sorted_mixed = np.delete(sorted_mixed, 0)
        mean_mix = np.mean(sorted_mixed)
        times += 1
    print ('The threshold will be %f and we will consider %d data as novel' %(threshold, times))
    threshold_array.append(threshold)

In [None]:
'''Creating a novel class and moving all novel data to that class'''
novel_list = []
max_scores_known = []

novel_class_array = [] #creating array with all zeros except the last (which is one, representing the novel class)
for i in range(len(categories)):
    novel_class_array.append(0)
novel_class_array.append(1)

Y_pred_mix = []
number_of_novels = []
for i in range(len(Y_PM)): #going through all predicted data
    i_class = Y_PM[i].argmax(axis=0)
    if max(Y_score_mix[i]) <= threshold_array[Y_PM[i].argmax(axis=0)]: #checking if it's under the threshold
        Y_pred_mix.append(novel_class_array)
        novel_list.append([max(Y_score_mix[i]), Y_test_mix[i]])
        number_of_novels.append(i_class)
    else:
        Y_pred_mix.append(np.concatenate((Y_PM[i], [0])))
        max_scores_known.append([max(Y_score_mix[i]), Y_test_mix[i]])

In [None]:
'''Printing number of removed data for each class'''
Y_pred_mix = np.array(Y_pred_mix)
from collections import Counter

countering = Counter(number_of_novels)
for key, value in countering.items():
    if key < novel_class:
        print (key, value)
    else:
        print (key+1, value)

'''Calculating percentiles'''
SUM_NOVEL = 0
for i in range(len(Y_test_mix)):
    if Y_test_mix[i].argmax(axis=0) == n_classes: #if it's novel data
        SUM_NOVEL += 1
        
unassigned = sorted(unassigned, key=lambda x: x[0])

novel_list = sorted(novel_list, key=lambda x: x[0]) 
max_scores_known = sorted(max_scores_known, key=lambda x: x[0])

chosen_list = novel_list 
chosen_list.extend(max_scores_known) 


def listed_novels(chosen_list): #for evaluating datasets with a few novelties
    no_of_novelties = 0
    for no_of_data in range(len(chosen_list)):
        if chosen_list[no_of_data][1].argmax(axis=0) == n_classes: #if it's a novel class
            no_of_novelties += 1
            print ('Novelties: %d, Data %d' %(no_of_novelties, no_of_data+1))
        if no_of_novelties == SUM_NOVEL:
            break

def percentiles_calc(chosen_list): #for evaluating datasets with a great amount of novelties
    no_of_novelties = 0
    PERC = 10
    for no_of_data in range(len(chosen_list)):
        if chosen_list[no_of_data][1].argmax(axis=0) == n_classes: #if it's a novel class
            no_of_novelties += 1
        if no_of_novelties == round(SUM_NOVEL*0.1, 0) and PERC == 10: #the 10th percentile
            print 'The precision at the 10th percentile: %f'%(float(no_of_novelties)/(no_of_data+1)*100)
            PERC = 25
        elif no_of_novelties == round(SUM_NOVEL*0.25, 0) and PERC == 25: #the 25th percentile
            print 'The precision at the 25th percentile: %f'%(float(no_of_novelties)/(no_of_data+1)*100)
            PERC = 50
        elif no_of_novelties == round(SUM_NOVEL*0.50, 0) and PERC == 50: #the 50th percentile
            print 'The precision at the 50th percentile: %f'%(float(no_of_novelties)/(no_of_data+1)*100)
            PERC = 75
        elif no_of_novelties == round(SUM_NOVEL*0.75, 0) and PERC == 75: #the 75th percentile
            print 'The precision at the 75th percentile: %f'%(float(no_of_novelties)/(no_of_data+1)*100)
            PERC = 100
        elif no_of_novelties == SUM_NOVEL: #all data
            print 'The precision at the 100th percentile: %f'%(float(no_of_novelties)/(no_of_data+1)*100)
            break

print ('The predicted classes first:')
percentiles_calc(chosen_list)

print ('Small dataset, predicted classes first:')
percentiles_list(chosen_list)

In [None]:
'''Evaluation metrics'''

'''Overall Prediction accuracy'''
print ('Overall prediction accuracy:')
from sklearn.metrics import accuracy_score
overall_accuracy_known = accuracy_score(Y_test_known, Y_pred_known)
overall_accuracy_mix = accuracy_score(Y_test_mix, Y_pred_mix)
print (overall_accuracy_known)
print (overall_accuracy_mix)

'''Classification report'''
print ('Classification report:')
from sklearn.metrics import classification_report
print (classification_report(Y_test_known, Y_pred_known, target_names=''.join(map(str, range(n_classes)))))
print (classification_report(Y_test_mix, Y_pred_mix, target_names=''.join(map(str, range(n_classes+1)))))

'''Confusion matrices - one with number of data and one normalized'''
import itertools
from sklearn.metrics import confusion_matrix
%matplotlib notebook
import matplotlib.pyplot as plt

def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')
    
    print(cm)
    
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
            horizontalalignment="center", 
                color="white" if cm[i, j] > thresh else "black")
    
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
# Compute confusion matrix
cnf_matrix = confusion_matrix(Y_test_mix.argmax(axis=1), Y_pred_mix.argmax(axis=1))
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=''.join(map(str, range(n_classes+1))), title='Confusion matrix, without normalization')

# Plot normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=''.join(map(str, range(n_classes+1))), normalize=True, title='Normalized confusion matrix')