In [1]:
import pandas as pd
import copy
import time
import numpy as np
import random
import pickle

from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_squared_error
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, balanced_accuracy_score

In [2]:
class JiXi:



    def __init__(self):
        self._initialise_objects()

        print('JiXi Initialised')



    def _initialise_objects(self):

        self.train_x = None
        self.train_y = None
        self.val_x = None
        self.val_y = None
        self.test_x = None
        self.test_y = None
        self.tuning_result = None
        self.model = None
        self.parameter_choices = None
        self.hyperparameters = None
        self.checked = None
        self.result = None
        self.tuning_result = None
        self.tuning_result_saving_address = None
        self.object_saving_address = None
        self._up_to = 0
        self._seed = 19421221
        self.best_score = 0
        self.best_combo = None
        self.best_clf = None
        self.clf_type = None

        self.regression_extra_output_columns = ['Train r2', 'Val r2', 'Test r2', 
            'Train RMSE', 'Val RMSE', 'Test RMSE', 'Train MAPE', 'Val MAPE', 'Test MAPE', 'Time']
        self.classification_extra_output_columns = ['Train accu', 'Val accu', 'Test accu', 
            'Train balanced_accu', 'Val balanced_accu', 'Test balanced_accu', 'Train f1', 'Val f1', 'Test f1', 
            'Train precision', 'Val precision', 'Test precision', 'Train recall', 'Val recall', 'Test recall', 'Time']

        

    def read_in_data(self, train_x, train_y, val_x, val_y, test_x, test_y):

        self.train_x = train_x
        print("Read in Train X data")

        self.train_y = train_y
        print("Read in Train x data")

        self.val_x = val_x
        print("Read in Val X data")

        self.val_y = val_y
        print("Read in Val y data")

        self.test_x = test_x
        print("Read in Test X data")

        self.test_y = test_y
        print("Read in Test y data")



    def read_in_model(self, model, type):

        assert type == 'Classification' or type == 'Regression'

        self.model = model
        self.clf_type = type

        print(f'Successfully read in model {self.model}, which is a {self.clf_type} model')



    def set_parameters(self, parameter_choices):
        
        self.parameter_choices = parameter_choices

        self.hyperparameters = list(parameter_choices.keys())

        self.n_items = [len(parameter_choices[key]) for key in self.hyperparameters]

        self._get_combinations()
        self._get_checked_and_result_array()
        self._setup_tuning_result_df()

        print("Successfully recorded parameter choices")



    def set_tuning_result_saving_address(self, address):
        
        self.tuning_result_saving_address = address
        print('Successfully set tuning output address')


    
    def _set_object_saving_address(self, address):
        
        self.object_saving_address = address
        print('Successfully set object output address')



    def _get_combinations(self):
        
        self.combos = [[]]
        for i in range(len(self.n_items)):

            tmp = copy.deepcopy(self.combos)
            self.combos = list()

            for x in tmp:

                for k in range(self.n_items[i]):
                    y = copy.deepcopy(x)
                    
                    y.append(k)

                    self.combos.append(y)



    def _get_checked_and_result_array(self):

        self.checked = np.zeros(shape=self.n_items)
        self.result = np.zeros(shape=self.n_items)



    def _setup_tuning_result_df(self):

        tune_result_columns = copy.deepcopy(self.hyperparameters)

        if self.clf_type == 'Classification':
            tune_result_columns.extend(self.classification_extra_output_columns)
        elif self.clf_type == 'Regression':
            tune_result_columns.extend(self.regression_extra_output_columns)

        self.tuning_result = pd.DataFrame({col:list() for col in tune_result_columns})


    
    def change_tuning_style(self, type, seed = None): #TODO
        
        if type == 'a':
            self.combos.sort()
            print('Changed tuning order to sorted')
        
        elif type == 'b':
            if seed:
                random.seed(seed)
            else:
                random.seed(self._seed)
            
            random.shuffle(self.combos)
            print('Changed tuning_order to randomised')
        
        elif type == 'c':
            pass

        elif type == 'd':
            pass


    
    def tune(self): #TODO
        
        for combo in self.combos:
            
            self._up_to += 1

            if not self.checked[tuple(combo)]:

                self._train_and_test_combo(combo)
            
            else:
                print(f'Already Trained and Tested combination {self._up_to}')
            


    def _train_and_test_combo(self, combo):
        
        
        combo = tuple(combo)
        
        params = {self.hyperparameters[i]:self.parameter_choices[self.hyperparameters[i]][combo[i]] for i in range(len(self.hyperparameters))}

        # initialise object
        clf = self.model(**params)

        # get time and fit
        start = time.time()
        clf.fit(self.train_x, self.train_y)
        end = time.time()

        # get predicted labels/values for three datasets
        train_pred = clf.predict(self.train_x)
        val_pred = clf.predict(self.val_x)
        test_pred = clf.predict(self.test_x)

        # get scores and time used
        time_used = end-start

        # build output dictionary and save result
        df_building_dict = params

        if self.clf_type == 'Regression':
            train_score = r2_score(self.train_y, train_pred)
            val_score = r2_score(self.val_y, val_pred)
            test_score = r2_score(self.test_y, test_pred)

            train_rmse = np.sqrt(mean_squared_error(self.train_y, train_pred))
            val_rmse = np.sqrt(mean_squared_error(self.val_y, val_pred))
            test_rmse = np.sqrt(mean_squared_error(self.test_y, test_pred))

            train_mape = mean_absolute_percentage_error(self.train_y, train_pred)
            val_mape = mean_absolute_percentage_error(self.val_y, val_pred)
            test_mape = mean_absolute_percentage_error(self.test_y, test_pred)

            df_building_dict['Train r2'] = [np.round(train_score, 4)]
            df_building_dict['Val r2'] = [np.round(val_score, 4)]
            df_building_dict['Test r2'] = [np.round(test_score, 4)]
            df_building_dict['Train RMSE'] = [np.round(train_rmse, 4)]
            df_building_dict['Val RMSE'] = [np.round(val_rmse, 4)]
            df_building_dict['Test RMSE'] = [np.round(test_rmse, 4)]
            df_building_dict['Train MAPE'] = [np.round(train_mape, 4)]
            df_building_dict['Val MAPE'] = [np.round(val_mape, 4)]
            df_building_dict['Test MAPE'] = [np.round(test_mape, 4)]
            df_building_dict['Time'] = [np.round(time_used, 2)]

        
        elif self.clf_type == 'Classification':
            train_score = accuracy_score(self.train_y, train_pred)
            val_score = clf.score(self.val_y, val_pred)
            test_score = clf.score(self.test_y, test_pred)

            train_bal_accu = balanced_accuracy_score(self.train_y, train_pred)
            val_bal_accu = balanced_accuracy_score(self.val_y, val_pred)
            test_bal_accu = balanced_accuracy_score(self.test_y, test_pred)

            train_f1 = f1_score(self.train_y, train_pred, average='weighted')
            val_f1 = f1_score(self.val_y, val_pred, average='weighted')
            test_f1 = f1_score(self.test_y, test_pred, average='weighted')

            train_precision = precision_score(self.train_y, train_pred, average='weighted')
            val_precision = precision_score(self.val_y, val_pred, average='weighted')
            test_precision = precision_score(self.test_y, test_pred, average='weighted')
        
            train_recall = recall_score(self.train_y, train_pred, average='weighted')
            val_recall = recall_score(self.val_y, val_pred, average='weighted')
            test_recall = recall_score(self.test_y, test_pred, average='weighted')

            df_building_dict['Train accu'] = [np.round(train_score, 4)]
            df_building_dict['Val accu'] = [np.round(val_score, 4)]
            df_building_dict['Test accu'] = [np.round(test_score, 4)]
            df_building_dict['Train balanced_accuracy'] = [np.round(train_bal_accu, 4)]
            df_building_dict['Val balanced_accuracy'] = [np.round(val_bal_accu, 4)]
            df_building_dict['Test balanced_accuracy'] = [np.round(test_bal_accu, 4)]
            df_building_dict['Train f1'] = [np.round(train_f1, 4)]
            df_building_dict['Val f1'] = [np.round(val_f1, 4)]
            df_building_dict['Test f1'] = [np.round(test_f1, 4)]
            df_building_dict['Train precision'] = [np.round(train_precision, 4)]
            df_building_dict['Val precision'] = [np.round(val_precision, 4)]
            df_building_dict['Test precision'] = [np.round(test_precision, 4)]
            df_building_dict['Train recall'] = [np.round(train_recall, 4)]
            df_building_dict['Val recall'] = [np.round(val_recall, 4)]
            df_building_dict['Test recall'] = [np.round(test_recall, 4)]
            df_building_dict['Time'] = [np.round(time_used, 2)]

        tmp = pd.DataFrame(df_building_dict)

        self.tuning_result = self.tuning_result.append(tmp)
        self._save_tuning_result()

        # update best score stats
        if val_score > self.best_score: 
            self.best_score = val_score
            self.best_clf = clf
            self.best_combo = combo

        # update internal governing DataFrames
        self.checked[combo] = 1
        self.result[combo] = val_score


        print(f'''Trained and Tested combination {self._up_to}, taking {np.round(time_used, 2)} seconds
        Current best combo: {self.best_combo} with val score {self.best_score}''')



    def _save_tuning_result(self):

        self.tuning_result.to_csv(self.tuning_result_saving_address, index=False)

    

    def read_in_tuning_result_df(self, address): 
        
        self.tuning_result = pd.read_csv(address)
        print(f"Successfully read in tuning result of {len(self.tuning_result)} rows")

        self._create_parameter_value_map_index()

        # read DataFrame data into internal governing DataFrames of JiXi
        for row in self.tuning_result.iterrows():
    
            combo = tuple([self.parameter_value_map_index[hyperparam][row[1][hyperparam]] for hyperparam in self.hyperparameters])
            
            self.checked[combo] = 1
            
            if self.clf_type == 'Regression':
                self.result[combo] = row[1]['Val r2']
            elif self.clf_type == 'Classification':
                self.result[combo] = row[1]['Val accu']
        
            # update best score stats
            if self.result[combo] > self.best_score: 
                self.best_score = self.result[combo]
                self.best_clf = None
                print(f"As new Best Combo {combo} is read in, best_clf is set to None")
                self.best_combo = combo


    
    def _create_parameter_value_map_index(self):
        self.parameter_value_map_index = dict()
        for key in self.parameter_choices.keys():
            tmp = dict()
            for i in range(len(self.parameter_choices[key])):
                tmp[self.parameter_choices[key][i]] = i
            self.parameter_value_map_index[key] = tmp
    


    def export_jixi(self, address):
        
        self._set_object_saving_address(address)

        object_save = copy.deepcopy(self)
        
        object_save.train_x = None
        object_save.train_y = None
        object_save.val_x = None
        object_save.val_y = None
        object_save.test_x = None
        object_save.test_y = None
        object_save._up_to = 0

        with open(f'{self.object_saving_address}.pickle', 'wb') as f:
            pickle.dump(object_save, f)

        print(f'Successfully exported JiXi object as {self.object_saving_address}')


    def view_best_combo_and_score(self):

        print(f'(Current) Best combo: {self.best_combo} with val score {self.best_score}')