In [1]:
#importing necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt #for visualization purposes, otherwise skip this line
from collections import Counter
from sklearn.neighbors import KNeighborsClassifier, NearestNeighbors
from sklearn.model_selection import cross_validate, cross_val_score, cross_val_predict
from sklearn import metrics
import sys
if not sys.warnoptions:
    import warnings
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import RadiusNeighborsClassifier

In [1]:
#loading data (here, file 'example_data.csv' with separator ';'); one file per supply chain phase, e.g. one file for handling, one file for preconditioning etc.
#data should have the following columns: 'deviation reason', 'deviation measure', 'sensor role', 'absolute setpoint deviation',
#'slope of two most recent temperature measurements', 'average setpoint deviation within one last hour',
#'membership degree in a fuzzy set "transportation and/or storage" with regard to the previous physical handling point',
#'membership degree in a fuzzy set "transportation and/or storage" with regard to the next physical handling point'

#first two columns correspond to the target variables in two prediction settings (for deviation reason and corrective measure); others comprise predictor features
df = pd.read_csv('example_data.csv', sep = ';')
df.head(1)

Implementing the voting

In [2]:
#optimal radius values, i.e. first cut points minimizing entropy, for deviation reason ('rr') and corrective measure ('rm')
#(as an example, 'rr' will be used further in this code, however, replacing it with 'rm' allows for making predictions for
#corrective measures, provided that the file 'example_data.csv' contains corrective measures is taken from 'example_data.csv' as a target variable)
rr = rr #calculated in the course of entropy minimization procedure for deviation reasons
rm = rm #calculated in the course of entropy minimization procedure for corrective measures

In [3]:
#creating a dataframe to which we will write voting results; can be used to back-engineer the voting process and understand the reasons for a specific prediction
#colums: 'true/actual label', 'k-NN prediction', 'radius neighbor prediction', 'decision tree prediction', 'final prediction of the first round', '1-NN prediction',
#'second major k-NN prediction', 'second major radius neighbor prediction', 'second major 1-NN prediction', 'final prediction of the second round', 'final second-best prediction', 'note' (for inspection purposes)
rad_test_r = pd.DataFrame(columns = ('actual', 'knn', 'knn_rad', 'dt', '1nn', 'final1', 'knn2', 'knn_rad2', '1nn2', 'final2', 'finalfinal', 'note'))

In [5]:
#defining functions for 1st and 2nd most common class and removal of the first prediction (returns a new list)
def first(lst):
    if len(lst) == 0:
        result = 'first_empty'
    else:
        result = max(set(lst), key=lst.count)
    return result
def second(lst):
    if len(lst) == 0:
        result = 'second_empty'
    else:
        to_remove = max(set(lst), key=lst.count)
        lst = [x for x in lst if x != to_remove]
        if len(lst) == 0:
            result = 'second_empty'
        else:
            result = max(set(lst), key=lst.count)
    return result
def removal(lst, pred):
    lst = [x for x in lst if x != pred]
    return lst

In [6]:
#voting procedure for the first round (step)

#initializing classifiers
knn = KNeighborsClassifier(n_neighbors = n_neighbors) #n_neighbors is calculated either in CV or LOOCV procedure with the help of grid or random hyperparameter search
dt = DecisionTreeClassifier(min_samples_leaf = n_neighbors) #min_samples_leaf corresponds to the optimal value of k neighbors to restrict overfitting of decision trees
knnRr = RadiusNeighborsClassifier(radius = rr) #knnRm = RadiusNeighborsClassifier(radius = rm) for corrective measures

#running a voting procedure with LOOCV
for i in range(len(df)):
    #selecting training observations and singling out a test observation
    X = df.iloc[:, 2:]
    y = list(df.iloc[:, 0])
    X = X.reset_index(drop = True)
    obs_x = list(X.iloc[i, :])
    obs_y = y[i]
    y_new = y
    X_new = X.drop(X.index[i])
    X_new = X_new.reset_index(drop = True)
    del y_new[i]
    
    #fitting classifiers and making first-best predictions
    knn.fit(X_new, y_new)
    knnRr.fit(X_new, y_new)
    dt.fit(X_new, y_new)
    note = 'result: '
    neigh = knnRr.radius_neighbors((np.array(obs_x).reshape(1, -1)))
    neighKNN = knn.kneighbors((np.array(obs_x).reshape(1, -1)))
    oneNN = y_new[neighKNN[1][0][0]]
    knn_pred = knn.predict((np.array(obs_x)).reshape(1, -1))[0]
    dt_pred = dt.predict((np.array(obs_x)).reshape(1, -1))[0]
    dt_partitions = dt.apply(X_new)
    dt_partition_obs = dt.apply((np.array(obs_x)).reshape(1, -1))[0]
    ind = []
    list_part = list(dt_partitions)
    for j in range(len(list_part)):
        if list_part[j] == dt_partition_obs:
            ind.append(j)
    df_partition = X_new[X_new.index.isin(ind)]
    
    #making second-best predictions
    #first, for k-NN
    knn_list = [y_new[a] for a in list(neighKNN[1][0])] #list with labels of k neighbors
    knn_list_new = removal(knn_list, knn_pred)
    if len(knn_list_new) == 0:
        knn_pred2 = 'null'
    else:
        most_common_knn = first(knn_list_new)
        more_common_knn = second(knn_list_new)
        if knn_list_new.count(most_common_knn) == knn_list_new.count(more_common_knn):
            note += 'knn' + str(most_common_knn) + ' vs ' + str(more_common_knn)
            knn_pred2 = str(most_common_knn) + ' vs ' + str(more_common_knn)
        else:
            knn_pred2 = most_common_knn
    
    #then, for radius neighbors classifier
    if len(neigh[0][0]) == 0:
        knnRr_pred = 'null'
        knnRr_pred2 = 'null'
    else:
        knnRr_pred = knnRr.predict((np.array(obs_x)).reshape(1, -1))[0]
        knnRr_list = [y_new[a] for a in list(neigh[1][0])]
        knnRr_list_new = removal(knnRr_list, knnRr_pred)
        if len(knnRr_list_new) == 0:
            knnRr_pred2 = 'null'
        else:
            most_common_knnRr = first(knnRr_list_new)
            more_common_knnRr = second(knnRr_list_new)
            if knnRr_list_new.count(most_common_knnRr) == knnRr_list_new.count(more_common_knnRr):
                note += 'knnr' + str(most_common_knnRr) + ' vs ' + str(more_common_knnRr)
                knnRr_pred2 = str(most_common_knnRr) + ' vs ' + str(more_common_knnRr)
            else:
                knnRr_pred2 = most_common_knnRr

    #finally, for 1-NN classifier
    ind_remain = [] #index of labels NOT of the magority class (knn_pred)
    ind_list = list(neighKNN[1][0])
    for b in range(len(ind_list)):
        if y_new[ind_list[b]] != oneNN:
            ind_remain.append(b)
    ind_list_new = [ind_list[a] for a in ind_remain] #with the first majority class deleted
    if len(ind_list_new) == 0:
        oneNN2 = 'null'
    else:
        oneNN2 = y_new[ind_list_new[0]]
        
    #writing predictions to the created dataframe
    if len(neigh[0][0]) != 0:
        if knn_pred == knnRr_pred:
            rad_test_r = rad_test_r.append({'actual': obs_y, 'knn': knn_pred, 'knn_rad': knnRr_pred, 'dt': dt_pred,
                                           'final1': knn_pred, '1nn': oneNN, 'knn2': knn_pred2, 'knn_rad2':
                                           knnRr_pred2, '1nn2': oneNN2, 'note': note},
                                           ignore_index = True)
        elif knn_pred == dt_pred and knn_pred != knnRr_pred:
            rad_test_r = rad_test_r.append({'actual': obs_y, 'knn': knn_pred, 'knn_rad': knnRr_pred, 'dt': dt_pred,
                                           'final1': knn_pred, '1nn': oneNN, 'knn2': knn_pred2, 'knn_rad2':
                                           knnRr_pred2, '1nn2': oneNN2, 'note': note},
                                           ignore_index = True)
        elif knnRr_pred == dt_pred and knn_pred != knnRr_pred:
            rad_test_r = rad_test_r.append({'actual': obs_y, 'knn': knn_pred, 'knn_rad': knnRr_pred, 'dt': dt_pred,
                                           'final1': dt_pred, '1nn': oneNN, 'knn2': knn_pred2, 'knn_rad2':
                                           knnRr_pred2, '1nn2': oneNN2, 'note': note},
                                           ignore_index = True)
        elif knnRr_pred != knn_pred and knn_pred != dt_pred:
            rad_test_r = rad_test_r.append({'actual': obs_y, 'knn': knn_pred, 'knn_rad': knnRr_pred, 'dt': dt_pred,
                                           'final1': 'disagr', '1nn': oneNN, 'knn2': knn_pred2, 'knn_rad2':
                                           knnRr_pred2, '1nn2': oneNN2, 'note': note},
                                           ignore_index = True)
        else:
            rad_test_r = rad_test_r.append({'actual': obs_y, 'knn': knn_pred, 'knn_rad': knnRr_pred, 'dt': dt_pred,
                                           'final1': 'missing', '1nn': oneNN, 'knn2': knn_pred2, 'knn_rad2':
                                           knnRr_pred2, '1nn2': oneNN2, 'note': note},
                                           ignore_index = True)
    else:
        rad_test_r = rad_test_r.append({'actual': obs_y, 'knn': knn_pred, 'knn_rad': 'null', 'dt': dt_pred,
                                       'final1': 'null', '1nn': oneNN, 'knn2': knn_pred2, 'knn_rad2':
                                        'null', '1nn2': oneNN2, 'note': note}, ignore_index = True)

In [19]:
#defining functions for extracting classes from strings written to the created dataframe with prediction/voting results
def f_class(string):
    result = float(string[0])
    return result
def s_class(string):
    if string[-1] == '0' and string[-2] == '.':
        result = float(string[-3])
    else:
        result = float(string[-1])
    return result

Different scenarios for the second best prediction (SBP)

In [6]:
#accuracy of k-NN (first or second best prediciton is correct)
knn_acc = 0
for i in range(len(rad_test_r)):
    if rad_test_r.iloc[i, 1] == rad_test_r.iloc[i, 0] or rad_test_r.iloc[i, 6] == rad_test_r.iloc[i, 0]:
        knn_acc += 1
print(knn_acc/len(rad_test_r))

In [5]:
#inclusive accuracy calcualtion strategy (considering avoided erroneous predictions)
#accuracy of Hk-NN
dfd = rad_test_r.copy()
fin_acc = 0
for i in range(len(dfd)):
    act = dfd.iloc[i, 0]
    knn1 = dfd.iloc[i, 1]
    if dfd.iloc[i, 5] != knn1: #knn1 does NOT coincide with the prediction in the first round
        knn2 = [dfd.iloc[i, 1]] #knn2 is assigned the value that failed in the first round, but still remains majority
    else: #VOTED in the first round and knn1 mojority class removed
        if type(dfd.iloc[i, 6]) != str: #digit (perfect case)
            knn2 = [dfd.iloc[i, 6]]
        elif dfd.iloc[i, 6] == 'null': #knn1 majority class was the only class
            knn2 = [0] #no knn2 prediction possible
        else: #competition
            knn2 = [f_class(dfd.iloc[i, 6]), s_class(dfd.iloc[i, 6])] #list of values for knn2
    if type(dfd.iloc[i, 5]) != str: #predicted value
        if knn2 == [0]:
            if dfd.iloc[i, 5] == act or act != knn1:
                fin_acc += 1
        elif len(knn2) == 1:
            if dfd.iloc[i, 5] == act or act == knn2[0]:
                fin_acc += 1
        elif len(knn2) == 2:
            if dfd.iloc[i, 5] == act or act == knn2[0] or act == knn2[1]:
                fin_acc += 1
        else:
            print('something missing if final1 is not string')
    else: #'null' or 'disagr'
        if dfd.iloc[i, 5] == 'null' or dfd.iloc[i, 5] == 'disagr':
            if knn2 == [0]: #both are empty or no unity
                if act != knn1:
                    fin_acc += 1
            elif len(knn2) == 1: #knn2 is not empty
                if act != knn1 or knn2[0] == act:
                    fin_acc += 1
            elif len(knn2) == 2: #competition in knn2
                if act != knn1 or knn2[0] == act or knn2[1] == act:
                    fin_acc += 1
            else:
                print('something missing if final1 is null or disagr')
print(fin_acc/len(dfd))

In [4]:
#constrained accuracy calculation strategy (NOT considering avoided erroneous predictions)
#accuracy for Hk-NN
dfd = rad_test_r.copy()
fin_acc = 0
for i in range(len(dfd)):
    act = dfd.iloc[i, 0]
    knn1 = dfd.iloc[i, 1]
    if dfd.iloc[i, 5] != knn1: #knn1 does NOT coincide with the prediction in the first round
        knn2 = [dfd.iloc[i, 1]] #knn2 is assigned the value that failed in the first round, but still remains majority
    else: #VOTED in the first round and knn1 mojority class removed
        if type(dfd.iloc[i, 6]) != str: #digit (perfect case)
            knn2 = [dfd.iloc[i, 6]]
        elif dfd.iloc[i, 6] == 'null': #knn1 majority class was the only class
            knn2 = [0] #no knn2 prediction possible
        else: #competition
            knn2 = [f_class(dfd.iloc[i, 6]), s_class(dfd.iloc[i, 6])] #list of values for knn2
    if type(dfd.iloc[i, 5]) != str: #predicted value
        if knn2 == [0]:
            if dfd.iloc[i, 5] == act:
                fin_acc += 1
        elif len(knn2) == 1:
            if dfd.iloc[i, 5] == act or act == knn2[0]:
                fin_acc += 1
        elif len(knn2) == 2:
            if dfd.iloc[i, 5] == act or act == knn2[0] or act == knn2[1]:
                fin_acc += 1
        else:
            print('something missing if final1 is not string')
    else: #'null' or 'disagr'
        if dfd.iloc[i, 5] == 'null' or dfd.iloc[i, 5] == 'disagr':
            if knn2 == [0]: #both are empty or no unity
                pass
            elif len(knn2) == 1: #knn2 is not empty
                if knn2[0] == act:
                    fin_acc += 1
            elif len(knn2) == 2: #competition in knn2
                if knn2[0] == act or knn2[1] == act:
                    fin_acc += 1
            else:
                print('something missing if final1 is null or disagr')
print(fin_acc/len(dfd))