## Load NumPy, pandas and time

In [1]:
import numpy as np
import pandas as pd
import time
from collections import Counter #added python standard library: collections

In [2]:
def create_bins(df,nobins=10,bintype='equal-width'):
    cdf=df.copy()
    numerical_cdf=df.copy()
    for i in ['ID','CLASS']:
        if i in cdf: numerical_cdf=numerical_cdf.drop([i], axis=1)
    numerical_cdf=numerical_cdf.select_dtypes(include=['number'])
    binfunc=[pd.cut,pd.qcut]
    binning={}

    if bintype=="equal-width": a=binfunc[0]
    elif bintype=="equal-size": a=binfunc[1]
    else: a=binfunc[0]

    for i in numerical_cdf:
        res,bins=a(cdf[i],nobins,retbins=True,labels=False,duplicates='drop')
        bins[0]=-np.inf
        bins[-1]=np.inf
        binning[i]=bins
        cdf[i]=res.astype('category')
    return cdf,binning

def apply_bins(df,binning):
    cdf=df.copy()
    numerical_cdf=df.copy()
    for i in ['ID','CLASS']:
        numerical_cdf=numerical_cdf.drop([i], axis=1)
    numerical_cdf=numerical_cdf.select_dtypes(include=['number'])

    for i in numerical_cdf:
        bins=binning[i]
        res,bins=pd.cut(cdf[i],bins,retbins=True,labels=False,duplicates='drop')
        cdf[i]=res.astype('category')
    return cdf


def create_imputation(anneal_train_df):
    anneal_train_df_copy = anneal_train_df.copy()
    imputation_template = {}
    
    df_cols = anneal_train_df_copy.loc[:, ~anneal_train_df_copy.columns.isin(['ID', 'CLASS'])]
    numerical_columns = df_cols.select_dtypes(include=['number']).columns
    non_numerical_columns = df_cols.select_dtypes(include=['category', 'object']).columns

    anneal_train_df_numerical = anneal_train_df_copy.loc[:, numerical_columns]
    for column in anneal_train_df_numerical.columns:
        non_missing_col_data = anneal_train_df_numerical.loc[:,column]
        if(non_missing_col_data.dropna().shape[0] > 0):
            mean_value = non_missing_col_data.mean()
        else:
            mean_value = 0
            
        anneal_train_df_copy[column] = anneal_train_df_copy[column].fillna(mean_value)
        imputation_template[column] = mean_value

    anneal_train_df_categorical = anneal_train_df_copy.loc[:, non_numerical_columns]
    for column in anneal_train_df_categorical.columns:
        non_missing_col_data = anneal_train_df_categorical[column]
        if(non_missing_col_data.dropna().shape[0] > 0):
            mode_value = non_missing_col_data.mode()[0]
        else:
            mode_value = ""
        
        anneal_train_df_copy[column] = anneal_train_df_copy[column].fillna(mode_value)
        imputation_template[column] = mode_value
            
        
    return anneal_train_df_copy, imputation_template

def apply_imputation(anneal_test_df, imputation):
    anneal_test_df_copy = anneal_test_df.copy()
    df_cols = anneal_test_df_copy.loc[:, ~anneal_test_df_copy.columns.isin(['ID', 'CLASS'])]
    
    for column in df_cols.columns:
        anneal_test_df_copy[column] = anneal_test_df_copy[column].fillna(imputation[column])

    return anneal_test_df_copy
    


def create_normalization(file_df,normalizationtype):
    normalization = {}
    new_file_df = file_df.copy()
    
    if (normalizationtype == "minmax"):
        for col_name in file_df:
            if (col_name != "ID" and col_name != "CLASS"):
                min_col = file_df[col_name].min()
                max_col = file_df[col_name].max()
                normalization[col_name] = ("minmax", min_col,max_col)
                new_file_df[col_name] = file_df[col_name].apply(lambda x:(x-min_col)/(max_col-min_col))
        
    elif (normalizationtype == "zscore"):
        for col_name in file_df:
            if (col_name != "ID" and col_name != "CLASS"):
                mean_col = file_df[col_name].mean()
                std_col = file_df[col_name].std()
                normalization[col_name] = ("zscore", mean_col,std_col)
                new_file_df[col_name] = file_df[col_name].apply(lambda x:(x-mean_col)/std_col)
    
    return (new_file_df, normalization)

def apply_normalization(file_df,normalization):
    
    new_file_df = file_df.copy()
    
    for col_name in normalization:
        
        if (normalization[col_name][0] == "minmax"):
            min_col = normalization[col_name][1]
            max_col = normalization[col_name][2]
            new_file_df[col_name] = file_df[col_name].apply(lambda x:(x-min_col)/(max_col-min_col))
        
        elif (normalization[col_name][0] == "zscore"):
            mean_col = normalization[col_name][1]
            std_col = normalization[col_name][2]
            new_file_df[col_name] = file_df[col_name].apply(lambda x:(x-mean_col)/std_col)
    
    return (new_file_df)


def create_one_hot(df_file):
    one_hot_df = df_file.copy()
    one_hot_dict = {}
    for col_name in df_file:
        if (col_name != "CLASS" and col_name != "ID"):
            col = df_file[col_name]
            col_hot = pd.get_dummies(col,prefix = col_name, dtype = "float")
            one_hot_df = pd.concat([one_hot_df, col_hot], axis=1)
            one_hot_df = one_hot_df.drop(col_name, axis=1)
            one_hot_dict[col_name] = set(col)
    
    return (one_hot_df,one_hot_dict)


def apply_one_hot(df_file, one_hot): 
    
    #Gives zero column for categories who doesnt exist on the test df
    
    hot_df_file = df_file.copy()

    for col_name, col_unique_values  in one_hot.items():
        col = df_file[col_name]
        cat = col.astype(pd.api.types.CategoricalDtype(categories= col_unique_values))
        col_hot = pd.get_dummies(cat, prefix = col_name, dtype = "float")
        hot_df_file = pd.concat([hot_df_file, col_hot], axis=1)
        hot_df_file = hot_df_file.drop(col_name, axis=1)
                                    
    return (hot_df_file)

def accuracy(pred_df, label):
    
    score = 0
    for row in range(len(pred_df)):
        pred_row = pred_df.iloc[row,:]
        if pred_row.idxmax() == label[row]:
            score = score + 1
            
    return (score/len(pred_df))


def brier_score(df_file, correctlabels):
    correctlabels_df = pd.DataFrame(correctlabels)
    correctlabels_df = correctlabels_df.astype(pd.api.types.CategoricalDtype(categories= df_file.columns))
    correctlabels_hot = pd.get_dummies(correctlabels_df)

    score = []
    for row in range(np.size(df_file,0)):
        for col in range(np.size(df_file,1)):
            score_each_element = df_file.iloc[row,col]- correctlabels_hot.iloc[row,col]
            score_each_element = np.power(score_each_element, 2)
            score.append(score_each_element)

    brier_score = sum(score)/np.size(df_file,0)
    
    return(brier_score)


def TP_AND_FP(predictions_decoded, correct_labels, positive_label):
    """Takes decoded predictions and labels"""
 
    true_positives = 0
    false_positives = 0
    for prediction, label in zip(predictions_decoded.tolist(), correct_labels):
        if prediction == 1:
            if(label == positive_label):
                true_positives += 1
            else:
                false_positives += 1
           
    num_positives = sum([1 for label in correct_labels if label == positive_label])
    num_negatives = sum([1 for label in correct_labels if label != positive_label])
   
    return true_positives/num_positives, false_positives/num_negatives

def auc_calc_np(tp_fp_dict):
    unique_values = sorted( list(set([value for key, value in tp_fp_dict.items()])) )
    tpr = np.array([value[0] for value in unique_values])
    fpr = np.array([value[1] for value in unique_values])
 
    area = np.trapz(tpr, fpr)
    return area

def binary_auc(predictions, correct_labels, positive_label='A', num_thresholds=10):
    """preprocesses the data into binary predictions, positives vs rest(negatives)"""
    tp_fp_dict = {}
    without_boundaries = np.linspace(0, 1, num_thresholds)
    for threshold in without_boundaries:
        predictions_copy = predictions.copy()
        binary_positive_pred = predictions_copy.apply(lambda x: 1 if x[positive_label] >= threshold else 0, axis=1)
 
        tp_fp_dict[threshold] = TP_AND_FP(binary_positive_pred, correct_labels, positive_label)
 
    return auc_calc_np(tp_fp_dict)
 
def auc(predictions, correct_labels, num_thresholds=100):
    correct_labels = correct_labels.tolist() 
    label_frequencies = {x:correct_labels.count(x)/len(correct_labels) for x in correct_labels}
 
    total_auc = 0
    for label, label_frequency in label_frequencies.items():
        auc = binary_auc(predictions, correct_labels, positive_label=label, num_thresholds=num_thresholds)
        total_auc += auc*label_frequency
 
    return total_auc

## 1. Define the class kNN

In [4]:
class kNN:
    
    def __init__(self):
        
        self.imputation = None
        self.normalization = None 
        self.one_hot = None 
        self.labels = None 
        self.training_labels = None 
        self.training_data = None 
        self.training_time = None 
        
    def fit(self, dataframe, normtype = "minmax"): 
        
        self.training_labels = pd.Series(dataframe["CLASS"], dtype ="category")
        self.labels = self.training_labels.cat.categories
        cleaned_df = dataframe.drop(["CLASS" ,"ID"] , axis=1)  
        self.imputation = create_imputation(cleaned_df)
        self.normalization = create_normalization(self.imputation[0],normtype)
        self.training_data = self.normalization[0].values

    
    def predict(self, df, k):

        def get_nearest_neighbor_predictions(x_test, k = 5):
            row_index_distance_list = []

            for row_index,row_sample in enumerate(self.training_data):
                dist = np.linalg.norm(row_sample-x_test)
                row_index_distance_list.append((row_index,dist))
  
            row_index_distance_list.sort(key=lambda x:x[1])
 
            row_index_list = [i[0] for i in row_index_distance_list]

            k_near_index = row_index_list[0:k]

            k_near_classes = self.training_labels[k_near_index]
            
            k_counter = Counter(k_near_classes)

            len_k_near_classes = len(k_near_classes)
            
            test_class_prob = {y: k_counter.get(y, 0)/len_k_near_classes for y in self.labels}

            return(test_class_prob)
        
        df_noClassID = df.drop(["CLASS" ,"ID"], axis=1)
        
        test_imp = apply_imputation(df_noClassID,self.imputation[1])
        
        test_norm = apply_normalization(test_imp,self.normalization[1])

        test_data = test_norm.values
        
        df_prob = pd.DataFrame(columns=self.labels)
        
        for index ,row_test in enumerate(test_data):
    
            row_prob = get_nearest_neighbor_predictions(row_test , k)
            
            df_prob.loc[index] = row_prob
        
        return(df_prob)

In [5]:

glass_train_df = pd.read_csv("glass_train.txt")

glass_test_df = pd.read_csv("glass_test.txt")

knn_model = kNN()

t0 = time.perf_counter()
knn_model.fit(glass_train_df)
print("Training time: {0:.2f} s.".format(time.perf_counter()-t0))

test_labels = glass_test_df["CLASS"]

k_values = [1,3,5,7,9]
results = np.empty((len(k_values),3))

for i in range(len(k_values)):
    t0 = time.perf_counter()
    predictions = knn_model.predict(glass_test_df,k=k_values[i])
    print("Testing time (k={0}): {1:.2f} s.".format(k_values[i],time.perf_counter()-t0))
    results[i] = [accuracy(predictions,test_labels),brier_score(predictions,test_labels),
                  auc(predictions,test_labels)] # Assuming that you have defined auc - remove otherwise

results = pd.DataFrame(results,index=k_values,columns=["Accuracy","Brier score","AUC"])

results


Training time: 0.06 s.
Testing time (k=1): 0.78 s.
Testing time (k=3): 0.91 s.
Testing time (k=5): 0.72 s.
Testing time (k=7): 0.42 s.
Testing time (k=9): 0.49 s.


Unnamed: 0,Accuracy,Brier score,AUC
1,0.747664,0.504673,0.7582
3,0.663551,0.488058,0.813829
5,0.579439,0.474019,0.833424
7,0.598131,0.470723,0.834465
9,0.616822,0.483674,0.828734


In [6]:
train_labels = glass_train_df["CLASS"]
predictions = knn_model.predict(glass_train_df,k=1)
print("Accuracy on training set (k=1): {0:.2f}".format(accuracy(predictions,train_labels)))
print("AUC on training set (k=1): {0:.2f}".format(auc(predictions,train_labels)))
print("Brier score on training set (k=1): {0:.2f}".format(brier_score(predictions,train_labels)))


Accuracy on training set (k=1): 1.00
AUC on training set (k=1): 1.00
Brier score on training set (k=1): 0.00


### Comment on assumptions, things that do not work properly, etc.
