## Import Library

In [1]:
import os
import json
import argparse
import numpy as np
import pandas as pd
import pickle
from sklearn.utils import shuffle
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.neighbors import KNeighborsClassifier

## DataReader Class
The DataReader can read train_valid_file and test_file with selected features and an ouptut.
The format of file must be an excel file. If you want to make csv file compatible with datareader, {pd.read_csv} is suggested to be replace with {pd.read_excel}.

In [2]:
class DataReader():
    def __init__(self, data_dir, train_valid_file, test_file, feature_in, output_col, k_fold):
        self.data_dir = data_dir 
        self.train_valid_file = os.path.join(data_dir, train_valid_file)
        self.test_file = os.path.join(data_dir, test_file)
        ## selected features, a string list is requested. 
        ## If you create datareader object on your own, a string list is requested, like ['A1','A2'.'A3'].
        ## If you run the scripts of run_knn.sh, a string with space is requested, like "A1 A2 A3".
        self.feature_in = feature_in  
        ## selecetd output columns, a string is requested.
        self.output_col = output_col
        self.k_fold = k_fold
        self.usecols = feature_in + [output_col]
        
    def get_train_valid_data(self, is_shuffle, split_ratio):
        train_valid_data = pd.read_excel(self.train_valid_file, usecols=self.usecols)
        ## Classification Type of t.rain_valid_data is ascending. 
        ## It will cause unbalenced data when we split it into training set and validation set without shuffling it.
        if is_shuffle:
            train_valid_data = shuffle(train_valid_data, random_state=0)
            train_valid_data.index = range(len(train_valid_data))
        
        ## This means k_fold validation don't work,
        ## so the split ratio is requested to be split train_valid_data into training set and valid set.
        if self.k_fold == 1:
            split_idx = int(train_valid_data.shape[0]*split_ratio)
            train_data, valid_data = train_valid_data[:split_idx], train_valid_data[split_idx:]
            train_x, train_y = train_data[self.feature_in], train_data[self.output_col]
            valid_x, valid_y = valid_data[self.feature_in], valid_data[self.output_col]
            return train_valid_data, train_data, train_x, train_y, valid_data, valid_x, valid_y 
        
        ## This means k_fold validation works. It returns the shuffle data and the split index.
        elif self.k_fold > 1:
            kf = KFold(n_splits=self.k_fold)
            return train_valid_data, kf.split(train_valid_data)
                
    def get_test_data(self):
        test_data = pd.read_excel(self.test_file, usecols=self.usecols)
        test_x, test_y = test_data[self.feature_in], test_data[self.output_col]
        return test_data, test_x, test_y

## KNearestNeighbor Class

In [3]:
class KNN():
    def __init__(self, n_neighbors, weights, power_param):
        self.n_neighbors = n_neighbors
        self.weights = weights
        self.power_param = power_param        
        self.knn = KNeighborsClassifier(n_neighbors=n_neighbors, weights=weights, p=power_param)
        
    def train(self, data_x, data_y): 
        return self.knn.fit(data_x, data_y) 
    
    ## it returns accuracy score
    def evaluate(self, data_x, data_y):
        return self.knn.score(data_x, data_y)
    
    def predict(self, data_x):
        return self.knn.predict(data_x)
    
    @staticmethod
    def classification_report(y_true, y_pred):        
        ## Number of digits for formatting output floating point values. 
        ## When output_dict is True, this will be ignored and the returned values will not be rounded.
        print (classification_report(y_true, y_pred, digits=4))        ## for terminal look
        return classification_report(y_true, y_pred, output_dict=True) ## for report   
        
    @staticmethod
    def confusion_matrix(y_true, y_pred, labels):
        def cm2pd(cm, labels):            
            cm_df = pd.DataFrame(index=labels)
            for idx, label in enumerate(labels):
                cm_df[f'pred_{label}'] = cm[:, idx]
            return cm_df
        cm = confusion_matrix(y_true, y_pred)
        print(cm)
        cm_df = cm2pd(cm, labels)
        return cm_df

## Without K fold validation (k = 1)

In [4]:
def no_k_fold(datareader, knn, output_dir):
    
    ## data preparation
    train_valid_data, \
    train_data, train_x, train_y, \
    valid_data, valid_x, valid_y = datareader.get_train_valid_data(is_shuffle=True, split_ratio=0.7)       
    test_data, test_x, test_y = datareader.get_test_data()       
    
    ## classification
    ## train knn
    knn.train(train_x, train_y)
    with open(output_dir+'/knn.pickle', 'wb') as f:
        pickle.dump(knn, f)
    
    ## predict 
    train_y_pred = knn.predict(train_x)
    valid_y_pred = knn.predict(valid_x)
    test_y_pred = knn.predict(test_x)                
        
    ## evaluate
    print ('###################')
    print ('#### train set ####')
    print ('###################')
    train_report = knn.classification_report(train_y, train_y_pred)
    train_labels = [*train_report][:-3]
    train_report = pd.DataFrame(train_report).transpose()    
    train_cm_df  = knn.confusion_matrix(train_y, train_y_pred, train_labels)    
    print ()
    
    print ('###################')
    print ('#### valid set ####')
    print ('###################')
    valid_report = knn.classification_report(valid_y, valid_y_pred) 
    valid_labels = [*valid_report][:-3]
    valid_report = pd.DataFrame(valid_report).transpose()
    valid_cm_df = knn.confusion_matrix(valid_y, valid_y_pred, valid_labels)
    print ()
    
    print ('##################')
    print ('#### test set ####')
    print ('##################')
    test_report = knn.classification_report(test_y, test_y_pred)
    test_labels = [*test_report][:-3]
    test_report = pd.DataFrame(test_report).transpose()
    test_cm_df = knn.confusion_matrix(test_y, test_y_pred, test_labels)
    
    ## report to excel file
    with pd.ExcelWriter(output_dir+'/result.xlsx') as writer:   
        train_data.insert(len(datareader.usecols), 'pred_Type', train_y_pred)        
        train_data.to_excel(writer, sheet_name='train_data')        
        train_report.to_excel(writer, sheet_name='train_report')
        train_cm_df.to_excel(writer, sheet_name='train_cm')
        
        valid_data.insert(len(datareader.usecols), 'pred_Type', valid_y_pred)
        valid_data.to_excel(writer, sheet_name='valid_data')
        valid_report.to_excel(writer, sheet_name='valid_report')
        valid_cm_df.to_excel(writer, sheet_name='valid_cm')
        
        test_data.insert(len(datareader.usecols), 'pred_Type', test_y_pred)
        test_data.to_excel(writer, sheet_name='test_data')
        test_report.to_excel(writer, sheet_name='test_report')
        test_cm_df.to_excel(writer, sheet_name='test_cm')

## With K Fold validation (k > 1)

In [5]:
def k_fold_validation(datareader, knn, output_dir):
    
    ## data preparation
    train_valid_data, kf = datareader.get_train_valid_data(is_shuffle=True, split_ratio=None)       
    test_data, test_x, test_y = datareader.get_test_data()
    
    result = pd.DataFrame()
    ## k-fold validation
    for i, (train_idx, valid_idx) in enumerate(kf):
        train_data, valid_data = train_valid_data.iloc[train_idx], train_valid_data.iloc[valid_idx]
        train_x, train_y = train_data[datareader.feature_in], train_data[datareader.output_col]
        valid_x, valid_y = valid_data[datareader.feature_in], valid_data[datareader.output_col]
        
        ## classification
        ## train knn
        knn.train(train_x, train_y)                
        
        ## evaluate
        acc_train = knn.evaluate(train_x, train_y) 
        acc_valid = knn.evaluate(valid_x, valid_y) 
        acc_test = knn.evaluate(test_x, test_y) 
        
#         train_report = knn.classification_report(train_y, train_y_pred)
#         train_report = pd.DataFrame(train_report).transpose()
#         valid_report = knn.classification_report(valid_y, valid_y_pred) 
#         valid_report = pd.DataFrame(valid_report).transpose()
#         test_report = knn.classification_report(test_y, test_y_pred)
#         test_report = pd.DataFrame(test_report).transpose()
#         print (f"precision: {train_report.loc['weighted avg', 'precision']}")
#         print (f"precision: {train_report.loc['weighted avg', 'recall']}")
#         print (f"precision: {train_report.loc['weighted avg', 'f1-score']}")
        
        result.loc[i, 'k_fold'] = str(i+1)
        result.loc[i, 'train'] = acc_train
        result.loc[i, 'valid'] = acc_valid
        result.loc[i, 'test'] = acc_test
    
    ## report to excel file
    result.loc[i+1, 'k_fold'] = 'avg'
    result.loc[i+1, 'train'] = np.mean(result['train'][:i+1])
    result.loc[i+1, 'valid'] = np.mean(result['valid'][:i+1])
    result.loc[i+1, 'test'] = np.mean(result['test'][:i+1])
    #print (result)
    with pd.ExcelWriter(output_dir+'/result.xlsx') as writer:           
        result.to_excel(writer, sheet_name='accuracy_report') 
    
    return result.loc[i+1,'train'], result.loc[i+1,'valid'], result.loc[i+1,'test']

## Configuration func. for args

In [7]:
def configuration(args):
    config = {
        'data_dir': args.data_dir,
        'train_valid_file': args.train_valid_file,
        'test_file': args.test_file,
        'feature_input': args.feature_in,
        'output_column': args.output_col,
        'is_shuffle': args.is_shuffle,
        'k_fold': args.k_fold,  
        'n_neighbors': args.n_neighbors,
        'weights': args.weights,
        'power_param': args.power_param}        
    return config

## Configuration func. for grid search.ipynb or any program

In [8]:
def configuration_grid_search(datareader, knn):
    config = {
        'data_dir': datareader.data_dir,
        'train_valid_file': datareader.train_valid_file,
        'test_file': datareader.test_file,
        'feature_input': datareader.feature_in,
        'output_column': datareader.output_col,
        'is_shuffle': True,
        'k_fold': datareader.k_fold,  
        'n_neighbors': knn.n_neighbors,
        'weights': knn.weights,
        'power_param': knn.power_param}        
    return config