In [1]:
#!/usr/bin/env python

import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error



In [2]:
class TradingStrategy:

    def __init__(self, short_term, long_term, price_range, amp_window, change_holding, amp_lo_threshold, amp_hi_threshold, mse_threshold_1, slope_threshold_1, price_change_threshold, mse_threshold_2, slope_threshold_2):
        self.SHORT_TERM = short_term
        self.LONG_TERM = long_term
        self.PRICE_RANGE = price_range
        self.AMP_WINDOW = amp_window
        self.CHANGE_HOLDING = change_holding
        self.AMP_LO_THRESHOLD = amp_lo_threshold
        self.AMP_HI_THRESHOLD = amp_hi_threshold
        self.MSE_THRESHOLD_1 = mse_threshold_1
        self.SLOPE_THRESHOLD_1 = slope_threshold_1
        self.PRICE_CHANGE_THRESHOLD = price_change_threshold
        self.MSE_THRESHOLD_2 = mse_threshold_2
        self.SLOPE_THRESHOLD_2 = slope_threshold_2



    def getMyPosition(self,prcSoFar):
	
        day = prcSoFar.shape[1]

        currentPrices = prcSoFar[:,-1] # price of last day

        amp = self.range_so_far(prcSoFar)
        
        # Get long term and short term average prices
        for stock in range(50):
            single_stock_data = prcSoFar[stock]

            # Use short term and long term average to determine sign
            long_mean = single_stock_data[-self.LONG_TERM:].mean()
            short_mean = single_stock_data[-self.SHORT_TERM:].mean()
            today_sign = np.sign(short_mean - long_mean)

            # Use a price window to make decision
            n_day_diff = single_stock_data[-self.PRICE_RANGE] - single_stock_data[-1]
            n_day_range = np.max(single_stock_data[-self.PRICE_RANGE:]) - np.min(single_stock_data[-self.PRICE_RANGE:])

            # Calculate the MSE of price movement during the range
            n_day_gap = np.diff(single_stock_data[-self.PRICE_RANGE:])
            LR = LinearRegression(n_jobs=-1).fit(np.array(range(self.PRICE_RANGE-1)).reshape(-1, 1), n_day_gap.reshape(-1,1))
            n_day_mse = mean_squared_error(n_day_gap, LR.predict(np.array(range(self.PRICE_RANGE-1)).reshape(-1, 1)))
            
            if self.currentPos[stock] * n_day_range * np.sign(n_day_diff) > np.abs(self.currentPos[stock] * currentPrices[stock]) * self.PRICE_CHANGE_THRESHOLD and n_day_mse > np.abs(n_day_diff*self.MSE_THRESHOLD_2) or ((LR.coef_ * self.currentPos[stock] < 0)[0][0] and (np.abs(LR.coef_) > self.SLOPE_THRESHOLD_2)[0][0]):
                self.currentPos[stock] = 0

            elif np.abs(n_day_diff) <= amp[stock]/self.AMP_LO_THRESHOLD or (n_day_mse > np.abs(n_day_diff*self.MSE_THRESHOLD_1) and (np.abs(LR.coef_) < self.SLOPE_THRESHOLD_1)[0][0]):
                pass
                
            elif np.abs(n_day_diff) >= amp[stock]/self.AMP_HI_THRESHOLD:
                value = today_sign * self.CHANGE_HOLDING
                self.currentPos[stock] -= value//currentPrices[stock]
        
            else:
                value = today_sign * self.CHANGE_HOLDING
                self.currentPos[stock] += value//currentPrices[stock]

            self.yesterday_sign[stock] = today_sign
        
        return self.currentPos


    def range_so_far(self, data):
        amp = []
        for j in range(50):
            single_stock_data = data[j]

            base_range = np.max(single_stock_data[-self.AMP_WINDOW:]) -  np.min(single_stock_data[-self.AMP_WINDOW:])
            amp.append(base_range)
        return amp
    
    def predict(self, prcHist, period_start_date):

        nInst = 50

        commRate = 0.0010
        dlrPosLimit = 10000

        self.currentPos = np.zeros(nInst)
        self.yesterday_sign = np.zeros(nInst)

        cash = 0
        curPos = np.zeros(nInst)
        totDVolume = 0
        value = 0
        todayPLL = []
        (_,nt) = prcHist.shape
        for t in range(period_start_date, period_start_date+250): 
            prcHistSoFar = prcHist[:,:t]
            newPosOrig = self.getMyPosition(prcHistSoFar)
            curPrices = prcHistSoFar[:,-1] #prcHist[:,t-1]
            posLimits = np.array([int(x) for x in dlrPosLimit / curPrices])
            clipPos = np.clip(newPosOrig, -posLimits, posLimits)
            newPos = np.array([np.trunc(x) for x in clipPos])
            deltaPos = newPos - curPos
            dvolumes = curPrices * np.abs(deltaPos)
            dvolume = np.sum(dvolumes)
            totDVolume += dvolume
            comm = dvolume * commRate
            cash -= curPrices.dot(deltaPos) + comm
            curPos = np.array(newPos)
            posValue = curPos.dot(curPrices)
            todayPL = cash + posValue - value
            todayPLL.append(todayPL)
            value = cash + posValue
            ret = 0.0
            if (totDVolume > 0):
                ret = value / totDVolume
        pll = np.array(todayPLL)
        (plmu,plstd) = (np.mean(pll), np.std(pll))
        annSharpe = 0.0
        if (plstd > 0):
            annSharpe = np.sqrt(250) * plmu / plstd
        
        return plmu - 0.1*plstd

In [3]:
def loadPrices(fn):
    global nt, nInst
    df=pd.read_csv(fn, sep='\s+', header=None, index_col=None)
    nt, nInst = df.values.shape
    return (df.values).T

data = loadPrices('./data/prices.txt')

In [4]:
# 05/07/2023





import pandas as pd
import copy
import time
import numpy as np
import pickle

from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_squared_error
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, balanced_accuracy_score





class JiaoCheng:



    def __init__(self):
        """ Initialise class """
        self._initialise_objects()

        print('JiaoCheng Initialised')



    def _initialise_objects(self):
        """ Helper to initialise objects """

        self.train_x = None
        self.train_y = None
        self.val_x = None
        self.val_y = None
        self.test_x = None
        self.test_y = None
        self.tuning_result = None
        self.model = None
        self.parameter_choices = None
        self.hyperparameters = None
        self.feature_n_ningxiang_score_dict = None
        self.non_tuneable_parameter_choices = list()
        self._feature_combo_n_index_map = None
        self.checked = None
        self.result = None
        self.tuning_result_saving_address = None
        self.object_saving_address = None
        self._up_to = 0
        self._tune_features = False
        self._seed = 19210216
        self.best_score = -np.inf
        self.best_combo = None
        self.best_clf = None
        self.clf_type = None
        self.combos = None
        self.n_items = None
        self.hyperparameter_tuning_order = None
        self._tuning_order_map_hp = None
        self._parameter_value_map_index = None
        self._total_combos = None
        self._tune_features = False
        self.hyperparameter_default_values = None
        self.best_model_saving_address = None

        self.regression_extra_output_columns = ['Train r2', 'Val r2', 'Test r2', 
            'Train RMSE', 'Val RMSE', 'Test RMSE', 'Train MAPE', 'Val MAPE', 'Test MAPE', 'Time']
        self.classification_extra_output_columns = ['Train accu', 'Val accu', 'Test accu', 
            'Train balanced_accu', 'Val balanced_accu', 'Test balanced_accu', 'Train f1', 'Val f1', 'Test f1', 
            'Train precision', 'Val precision', 'Test precision', 'Train recall', 'Val recall', 'Test recall', 'Time']

        

    def read_in_data(self, data):
        """ Reads in train validate test data for tuning """

        self.data = data



    def read_in_model(self, model, type):
        """ Reads in underlying model object for tuning, and also read in what type of model it is """

        assert type == 'Classification' or type == 'Regression' # check

        # record
        self.model = model
        self.clf_type = type 

        print(f'Successfully read in model {self.model}, which is a {self.clf_type} model')



    def set_hyperparameters(self, parameter_choices):
        """ Input hyperparameter choices """

        self.parameter_choices = parameter_choices
        self._sort_hyperparameter_choices()

        self.param_value_reverse_map = {param:{self.parameter_choices[param][j]:j for j in range(len(self.parameter_choices[param]))} for param in self.parameter_choices}

        self.hyperparameters = list(parameter_choices.keys())

        # automatically calculate how many different values in each hyperparameter
        self.n_items = [len(parameter_choices[key]) for key in self.hyperparameters]
        self._total_combos = np.prod(self.n_items)

        # automatically calculate all combinations and setup checked and result arrays and tuning result dataframe
        self._get_combinations()
        self._get_checked_and_result_array()
        self._setup_tuning_result_df()

        print("Successfully recorded hyperparameter choices")



    def _sort_hyperparameter_choices(self):
        """ Helper to ensure all hyperparameter choice values are in order from lowest to highest """

        for key in self.parameter_choices:
            tmp = copy.deepcopy(list(self.parameter_choices[key]))
            tmp.sort()
            self.parameter_choices[key] = tuple(tmp)

    

    def _get_combinations(self):
        """ Helper to calculate all combinations """

        ##ALGORITHM

        # recursively append values to get every combination in ordinal/numerical form
        self.combos = [[]]
        for i in range(len(self.n_items)):

            tmp = copy.deepcopy(self.combos)
            self.combos = list()

            for x in tmp:

                for k in range(self.n_items[i]):
                    y = copy.deepcopy(x)
                    
                    y.append(k)

                    self.combos.append(y)



    def _get_checked_and_result_array(self):
        """ Helper to set up checked and result array """

        self.checked = np.zeros(shape=self.n_items)
        self.result = np.zeros(shape=self.n_items)



    def _setup_tuning_result_df(self):
        """ Helper to set up tuning result dataframe """

        tune_result_columns = copy.deepcopy(self.hyperparameters)

        if self._tune_features == True:
            tune_result_columns.append('feature combo ningxiang score')

        # Different set of metric columns for different types of models
        if self.clf_type == 'Classification':
            tune_result_columns.extend(self.classification_extra_output_columns)
        elif self.clf_type == 'Regression':
            tune_result_columns.extend(self.regression_extra_output_columns)

        self.tuning_result = pd.DataFrame({col:list() for col in tune_result_columns})



    def set_non_tuneable_hyperparameters(self, non_tuneable_hyperparameter_choice):
        """ Input Non tuneable hyperparameter choice """

        if type(non_tuneable_hyperparameter_choice) is not dict:
            raise TypeError('non_tuneable_hyeprparameters_choice must be dict, please try again')
            
        
        for nthp in non_tuneable_hyperparameter_choice:
            if type(non_tuneable_hyperparameter_choice[nthp]) in (set, list, tuple, dict):
                raise TypeError('non_tuneable_hyperparameters_choice must not be of array-like type')
                

        self.non_tuneable_parameter_choices = non_tuneable_hyperparameter_choice

        print("Successfully recorded non_tuneable_hyperparameter choices")



    def set_features(self, ningxiang_output):
        """ Input features """

        if type(ningxiang_output) is not dict:
            raise TypeError("Please ensure NingXiang output is a dict")
            
        
        if not self.hyperparameters:
            raise AttributeError("Missing hyperparameter choices, please run .set_hyperparameters() first")
            
        
        for feature in list(ningxiang_output.keys())[-1]:
            if feature not in list(self.train_x.columns):
                raise ValueError(f'feature {feature} in ningxiang output is not in train_x. Please try again')
                
            if feature not in list(self.val_x.columns):
                raise ValueError(f'feature {feature} in ningxiang output is not in val_x. Please try again')
                
            if feature not in list(self.test_x.columns):
                raise ValueError(f'feature {feature} in ningxiang output is not in test_x. Please try again')
                
        
        # sort ningxiang just for safety, and store up
        ningxiang_output_sorted = self._sort_features(ningxiang_output)
        self.feature_n_ningxiang_score_dict = ningxiang_output_sorted

        # activate this switch
        self._tune_features = True

        # update previous internal structures based on first set of hyperparameter choices
        ##here used numbers instead of tuples as the values in parameter_choices; thus need another mapping to get map back to the features
        self.parameter_choices['features'] = tuple([i for i in range(len(ningxiang_output_sorted))])
        self._feature_combo_n_index_map = {i: list(ningxiang_output_sorted.keys())[i] for i in range(len(ningxiang_output_sorted))}

        self.param_value_reverse_map = {param:{self.parameter_choices[param][j]:j for j in range(len(self.parameter_choices[param]))} for param in self.parameter_choices}
        
        self.hyperparameters = list(self.parameter_choices.keys())

        # automatically calculate how many different values in each hyperparameter
        self.n_items = [len(self.parameter_choices[key]) for key in self.hyperparameters]
        self._total_combos = np.prod(self.n_items)

        # automatically calculate all combinations and setup checked and result arrays and tuning result dataframe
        self._get_combinations()
        self._get_checked_and_result_array()
        self._setup_tuning_result_df()

        print("Successfully recorded tuneable feature combination choices and updated relevant internal structures")


    
    def _sort_features(self, ningxiang_output):
        """ Helper for sorting features based on NingXiang values (input dict output dict) """

        ningxiang_output_list = [(key, ningxiang_output[key]) for key in ningxiang_output]

        ningxiang_output_list.sort(key = lambda x:x[1])

        ningxiang_output_sorted = {x[0]:x[1] for x in ningxiang_output_list}

        return ningxiang_output_sorted


    
    def set_tuning_order(self, order):
        """ Input sorting order """
        
        if type(order) is not list:
            raise TypeError("order must be a list, please try agian")
            
        
        if self.hyperparameters == False:
            raise AttributeError('Please run set_hyperparameters() first')
            
        
        if 'features' in self.hyperparameters:
            if self._tune_features == False:
                raise AttributeError('Please run set_features() first')
                
        
        for hp in order:
            if hp not in self.hyperparameters:
                raise ValueError(f'Feature {hp} is not in self.hyperparameters which was set by set_hyperparameters(); consider reinitiating JiaoCheng or double checking input')
                

        self.hyperparameter_tuning_order = order
        self._tuning_order_map_hp = {self.hyperparameters[i]:i for i in range(len(self.hyperparameters))}
    

    
    def set_hyperparameter_default_values(self, default_values):
        """ Input default values for hyperparameters """

        if type(default_values) is not dict:
            raise TypeError("default_values must be a dict, please try agian")
            
        
        if self.hyperparameters == False:
            raise AttributeError('Please run set_hyperparameters() first')
            
        
        if 'features' in self.hyperparameters:
            if self._tune_features == False:
                raise AttributeError('Please run set_features() first')

        
        for hp in default_values:
            if hp not in self.hyperparameters:
                raise ValueError(f'Feature {hp} is not in self.hyperparameter which was set by set_hyperparameters(); consider reinitiating JiaoCheng or double checking input')
                
            if default_values[hp] not in self.parameter_choices[hp]:
                raise ValueError(f'{default_values[hp]} is not a value to try out in self.hyperparameter which was set by set_hyperparameters(). consider reinitiating JiaoCheng or double checking input')
                

        self.hyperparameter_default_values = default_values


        
    def tune(self, key_stats_only = False): #TODO
        """ Begin tuning """
            

        if self.model is None:
            raise AttributeError(" Missing model, please run .read_in_model() ")
            
        
        if self.combos is None:
            raise AttributeError("Missing hyperparameter choices, please run .set_hyperparameters() first")
            

        if self.tuning_result_saving_address is None:
            raise AttributeError("Missing tuning result csv saving address, please run .set_tuning_result_saving_address() first")

        self.key_stats_only = key_stats_only
        
        
        starting_hp_combo = [self.param_value_reverse_map[hp][self.hyperparameter_default_values[hp]] for hp in self.hyperparameters] # setup starting combination
        print('\nDefault combo:', starting_hp_combo, '\n')

        round = 1
        continue_tuning = 1 # continuously loop through features until converge (combo stays same after a full round)
        while continue_tuning:
            print("\nROUND", round)

            # first store previous round's best combo/the starting combo before each round; for comparison at the end
            last_round_starting_hp_combo = copy.deepcopy(starting_hp_combo)

            for hp in self.hyperparameter_tuning_order: # tune each hp in order
                print("\nRound", round, '\nHyperparameter:', hp, f'(index: {self._tuning_order_map_hp[hp]})', '\n')

                last_hyperparameter_best_hp_combo = copy.deepcopy(starting_hp_combo) # store last iteration's best combo

                combo = list(copy.deepcopy(starting_hp_combo)) # tune the root combo
                combo[self._tuning_order_map_hp[hp]] = 0

                for i in range(self.n_items[self._tuning_order_map_hp[hp]]):
                
                    if not self.checked[tuple(combo)]:
                        self._up_to += 1
                        self._train_and_test_combo(combo)
                    else:
                        self._check_already_trained_best_score(combo)
                      
                    combo[self._tuning_order_map_hp[hp]] += 1 
                
                starting_hp_combo = copy.deepcopy(self.best_combo) # take the best combo after this hyperparameter has been tuned
                
                if starting_hp_combo == last_hyperparameter_best_hp_combo:
                    print('\nBest combo after this hyperparameter:', starting_hp_combo, ', NOT UPDATED SINCE LAST HYPERPARAMETER\n')
                else:
                    print('\nBest combo after this hyperparameter:', starting_hp_combo, ', UPDATED SINCE LAST HYPERPARAMETER\n')
            
            round += 1
            
            if starting_hp_combo == last_round_starting_hp_combo: # if after this full round best combo hasn't moved, then can terminate
                continue_tuning = 0
        

        # Display final information
        self.view_best_combo_and_score()
            
    

    def _eval_combo(self, df_building_dict, train_pred, val_pred, test_pred):

        if self.clf_type == 'Regression':

            train_score = val_score = test_score = train_rmse = val_rmse = test_rmse = train_mape = val_mape = test_mape = 0

            try:
                train_score = r2_score(self.train_y, train_pred)
            except:
                pass
            try:
                val_score = r2_score(self.val_y, val_pred)
            except:
                pass
            try:
                test_score = r2_score(self.test_y, test_pred)
            except:
                pass
            
            try:
                train_rmse = np.sqrt(mean_squared_error(self.train_y, train_pred))
            except:
                pass
            try:
                val_rmse = np.sqrt(mean_squared_error(self.val_y, val_pred))
            except:
                pass
            try:
                test_rmse = np.sqrt(mean_squared_error(self.test_y, test_pred))
            except:
                pass

            if self.key_stats_only == False:
                try:
                    train_mape = mean_absolute_percentage_error(self.train_y, train_pred)
                except:
                    pass
                try:
                    val_mape = mean_absolute_percentage_error(self.val_y, val_pred)
                except:
                    pass
                try:
                    test_mape = mean_absolute_percentage_error(self.test_y, test_pred)
                except:
                    pass
            
            df_building_dict['Train r2'] = [np.round(train_score, 6)]
            df_building_dict['Val r2'] = [np.round(val_score, 6)]
            df_building_dict['Test r2'] = [np.round(test_score, 6)]
            df_building_dict['Train RMSE'] = [np.round(train_rmse, 6)]
            df_building_dict['Val RMSE'] = [np.round(val_rmse, 6)]
            df_building_dict['Test RMSE'] = [np.round(test_rmse, 6)]
            
            if self.key_stats_only == False:
                df_building_dict['Train MAPE'] = [np.round(train_mape, 6)]
                df_building_dict['Val MAPE'] = [np.round(val_mape, 6)]
                df_building_dict['Test MAPE'] = [np.round(test_mape, 6)]

        
        elif self.clf_type == 'Classification':

            train_score = val_score = test_score = train_bal_accu = val_bal_accu = test_bal_accu = train_f1 = val_f1 = test_f1 = \
                train_precision = val_precision = test_precision = train_recall = val_recall = test_recall = 0

            try:    
                train_score = accuracy_score(self.train_y, train_pred)
            except:
                pass
            try:
                val_score = accuracy_score(self.val_y, val_pred)
            except:
                pass
            try:
                test_score = accuracy_score(self.test_y, test_pred)
            except:
                pass

            try:
                train_bal_accu = balanced_accuracy_score(self.train_y, train_pred)
            except:
                pass
            try:
                val_bal_accu = balanced_accuracy_score(self.val_y, val_pred)
            except:
                pass
            try:
                test_bal_accu = balanced_accuracy_score(self.test_y, test_pred)
            except:
                pass
            
            try:
                train_f1 = f1_score(self.train_y, train_pred, average='weighted')
            except:
                pass
            try:
                val_f1 = f1_score(self.val_y, val_pred, average='weighted')
            except:
                pass
            try:
                test_f1 = f1_score(self.test_y, test_pred, average='weighted')
            except:
                pass
            
            try:
                train_precision = precision_score(self.train_y, train_pred, average='weighted')
            except:
                pass
            try:
                val_precision = precision_score(self.val_y, val_pred, average='weighted')
            except:
                pass
            try:
                test_precision = precision_score(self.test_y, test_pred, average='weighted')
            except:
                pass

            try:
                train_recall = recall_score(self.train_y, train_pred, average='weighted')
            except:
                pass
            try:
                val_recall = recall_score(self.val_y, val_pred, average='weighted')
            except:
                pass
            try:
                test_recall = recall_score(self.test_y, test_pred, average='weighted')
            except:
                pass

            df_building_dict['Train accu'] = [np.round(train_score, 6)]
            df_building_dict['Val accu'] = [np.round(val_score, 6)]
            df_building_dict['Test accu'] = [np.round(test_score, 6)]
            df_building_dict['Train balanced_accuracy'] = [np.round(train_bal_accu, 6)]
            df_building_dict['Val balanced_accuracy'] = [np.round(val_bal_accu, 6)]
            df_building_dict['Test balanced_accuracy'] = [np.round(test_bal_accu, 6)]
            df_building_dict['Train f1'] = [np.round(train_f1, 6)]
            df_building_dict['Val f1'] = [np.round(val_f1, 6)]
            df_building_dict['Test f1'] = [np.round(test_f1, 6)]
            df_building_dict['Train precision'] = [np.round(train_precision, 6)]
            df_building_dict['Val precision'] = [np.round(val_precision, 6)]
            df_building_dict['Test precision'] = [np.round(test_precision, 6)]
            df_building_dict['Train recall'] = [np.round(train_recall, 6)]
            df_building_dict['Val recall'] = [np.round(val_recall, 6)]
            df_building_dict['Test recall'] = [np.round(test_recall, 6)]

        return df_building_dict, val_score, test_score
    


    def _train_and_test_combo(self, combo):
        """ Helper to train and test each combination as part of tune() """

        combo = tuple(combo)
        
        params = {self.hyperparameters[i]:self.parameter_choices[self.hyperparameters[i]][combo[i]] for i in range(len(self.hyperparameters))}
        
        
        # initialise object
        clf = self.model(**params)

        start = time.time()
        score_500 = clf.predict(self.data, 500)
        score_250 = clf.predict(self.data, 250)

        val_score = score_500

        CV_score_list = []
        for day in [100, 200, 300, 400]:  
            CV_score_list.append(clf.predict(self.data, day)) 
        cv_score = np.mean(CV_score_list)     
        end = time.time()

        # build output dictionary and save result
        df_building_dict = params
        df_building_dict['Val r2'] = [val_score]
        df_building_dict['score_250'] = [score_250]
        df_building_dict['score_500'] = [score_500]
        df_building_dict['cv_score'] = [cv_score]


        tmp = pd.DataFrame(df_building_dict)


        self.tuning_result = pd.concat([self.tuning_result, tmp])
        self.tuning_result.index = range(len(self.tuning_result))
        self._save_tuning_result()

        # update best score stats
        if val_score > self.best_score: 
            self.best_score = val_score
            self.best_clf = clf
            self.best_combo = combo

        # update internal governing DataFrames
        self.checked[combo] = 1
        self.result[combo] = val_score

        self._up_to += 1

        print(f'''Trained and Tested combination {self._up_to} of {self._total_combos}: {combo}, taking time {np.round(end-start, 4)} seconds to get score of {np.round(val_score,4)}
        Current best combo: {self.best_combo} with val score {np.round(self.best_score, 4)}''')


    def _check_already_trained_best_score(self, combo):
        """ Helper for checking whether an already trained combo is best score """
        
        combo = tuple(combo)
        
        # update best score stats
        if self.result[combo] > self.best_score: 
            self.best_score = self.result[combo]
            self.best_clf = None
            print(f"As new Best Combo {combo} was read in, best_clf is set to None")
            self.best_combo = combo

        print(f'''Already Trained and Tested combination {combo}, which had val score of {np.round(self.result[combo],4)}
        Current best combo: {self.best_combo} with val score {np.round(self.best_score, 4)}. 
        Has trained {self._up_to} of {self._total_combos} combinations so far''')



    def _save_tuning_result(self):
        """ Helper to export tuning result csv """

        tuning_result_saving_address_strip = self.tuning_result_saving_address.split('.csv')[0]

        self.tuning_result.to_csv(f'{tuning_result_saving_address_strip}.csv', index=False)


    
    def view_best_combo_and_score(self):
        """ View best combination and its validation score """
        
        print('Max Score: \n', self.best_score)

        if self.clf_type == 'Classification':
            max_val_id = self.tuning_result['Val accu'].idxmax()
            print('Max Test Score: \n', self.tuning_result.iloc[max_val_id]['Test accu'])
            
        elif self.clf_type == 'Regression':
            max_val_id = self.tuning_result['Val r2'].idxmax()
            print('Max Test Score: \n', self.tuning_result.iloc[max_val_id]['Test r2'])

        print('Max Combo Index: \n', self.best_combo, 'out of', self.n_items, '(note best combo is 0-indexed)')

        final_combo = {self.hyperparameters[i]:self.parameter_choices[self.hyperparameters[i]][self.best_combo[i]] for i in range(len(self.hyperparameters))}
        print('Max Combo Hyperparamer Combination: \n', final_combo)

        if self._tune_features:
            print('Max Combo Features: \n', self._feature_combo_n_index_map[self.best_combo[-1]])

        print('% Combos Checked:', int(sum(self.checked.reshape((np.prod(self.n_items))))), 'out of', np.prod(self.n_items), 'which is', f'{np.mean(self.checked).round(8)*100}%')

    

    def read_in_tuning_result_df(self, address): 
        """ Read in tuning result csv and read data into checked and result arrays """

        BOOL_MAP = {'1': True, '0': False, '1.0': True, '0.0': False, True: True, False: False, 'True': True, 'False': False, 1: True, 0: False, 1.0: True, 0.0: False}

        if self.parameter_choices is None:
            raise AttributeError("Missing parameter_choices to build _parameter_value_map_index, please run set_hyperparameters() first")

        if self.clf_type is None:
            raise AttributeError('Missing clf_type. Please run .read_in_model() first.')

        self.tuning_result = pd.read_csv(address)

        self._up_to = 0

        self._create_parameter_value_map_index()

        # read DataFrame data into internal governing DataFrames of JiaoCheng
        for row in self.tuning_result.iterrows():

            try:
                self._up_to += 1
        
                combo = list()
                for hyperparam in self.hyperparameters:
                    if hyperparam == 'features':
                        
                        # reverse two dicts
                        index_n_feature_combo_map = {self._feature_combo_n_index_map[key]:key for key in self._feature_combo_n_index_map}
                        # special input
                        combo.append(index_n_feature_combo_map[tuple(self._str_to_list(row[1]['features']))])
                        
                    else:
                        if type(self.parameter_choices[hyperparam][0]) is bool:
                            combo.append(self._parameter_value_map_index[hyperparam][BOOL_MAP[row[1][hyperparam]]])
                        else:
                            combo.append(self._parameter_value_map_index[hyperparam][row[1][hyperparam]])

                combo = tuple(combo)
                
                self.checked[combo] = 1
                
                if self.clf_type == 'Regression':
                    self.result[combo] = row[1]['Val r2']
                elif self.clf_type == 'Classification':
                    self.result[combo] = row[1]['Val accu']

            except Exception as e:
                print(f"Error message: {str(e)}")
                print('Error Importing this Row:', row)

        print(f"Successfully read in tuning result of {len(self.tuning_result)} rows")



    def _str_to_list(self, string):
        """ Helper to convert string to list"""

        out = list()
        for feature in string.split(', '):
            out.append(feature.strip('[').strip(']').strip("'"))
        
        return out


    
    def _create_parameter_value_map_index(self):
        """ Helper to create parameter-value index map """

        self._parameter_value_map_index = dict()
        for key in self.parameter_choices.keys():
            tmp = dict()
            for i in range(len(self.parameter_choices[key])):
                tmp[self.parameter_choices[key][i]] = i
            self._parameter_value_map_index[key] = tmp
    


    def set_tuning_result_saving_address(self, address):
        """ Read in where to save tuning object """

        self.tuning_result_saving_address = address
        print('Successfully set tuning output address')


    
    def set_best_model_saving_address(self, address):
        """ Read in where to save best model  """

        self.best_model_saving_address = address
        print('Successfully set best model output address')

    

    def _save_best_model(self):
        """ Helper to save best model as a pickle """

        best_model_saving_address_split = self.best_model_saving_address.split('.pickle')[0]

        with open(f'{best_model_saving_address_split}.pickle', 'wb') as f:
            pickle.dump(self.best_clf, f)

In [7]:
tuner = JiaoCheng()
tuner.read_in_data(data)
tuner.read_in_model(TradingStrategy, 'Regression')

parameter_choices = {
    'short_term': (2, 3, 4, 5, 6),
    'long_term': (15, 20, 25, 30),
    'price_range': (3, 4, 5, 6, 7),
    'amp_window': (75, 100, 150, 200),
    'change_holding': (250, 500, 1000, 2000, 4000),
    'amp_lo_threshold': (7.5, 10, 12.5, 15),
    'amp_hi_threshold': (0.5, 1, 1.5, 2, 2.5),
    'mse_threshold_1': (0.05, 0.15, 0.25, 0.35),
    'slope_threshold_1': (0.05, 0.1, 0.2, 0.4, 0.8),
    'mse_threshold_2': (0.005, 0.01, 0.02, 0.04),
    'slope_threshold_2': (0.5, 1, 2, 4, 8),
    'price_change_threshold': (0.005, 0.01, 0.025, 0.05, 0.1),
}

tuner.set_hyperparameters(parameter_choices)

tuner.set_tuning_order(['price_range', 'short_term', 'long_term', 'amp_window', 'change_holding',
                        'amp_lo_threshold', 'amp_hi_threshold', 'price_change_threshold', 
                        'mse_threshold_2','slope_threshold_2', 'mse_threshold_1', 'slope_threshold_1'])

tuner.set_hyperparameter_default_values({
    'short_term': 2,
    'long_term': 15,
    'price_range': 5,
    'amp_window': 75,
    'change_holding': 500,
    'amp_lo_threshold': 12.5,
    'amp_hi_threshold': 0.5,
    'mse_threshold_1': 0.25,
    'slope_threshold_1': 0.2,
    'mse_threshold_2': 0.02,
    'slope_threshold_2': 2,
    'price_change_threshold': 0.025

})

try:
    tuner.read_in_tuning_result_df(f'./tuning/TradingStrategy_500.csv')
except:
    pass

tuner.set_tuning_result_saving_address('./tuning/TradingStrategy_500.csv')


JiaoCheng Initialised
Successfully read in model <class '__main__.TradingStrategy'>, which is a Regression model
Successfully recorded hyperparameter choices
Successfully set tuning output address


In [8]:
tuner.tune()


Default combo: [0, 0, 2, 0, 1, 2, 0, 2, 2, 2, 2, 2] 


ROUND 1

Round 1 
Hyperparameter: price_range (index: 2) 

Trained and Tested combination 2 of 80000000: (0, 0, 0, 0, 1, 2, 0, 2, 2, 2, 2, 2), taking time 54.9465 seconds to get score of 27.5062
        Current best combo: (0, 0, 0, 0, 1, 2, 0, 2, 2, 2, 2, 2) with val score 27.5062
Trained and Tested combination 4 of 80000000: (0, 0, 1, 0, 1, 2, 0, 2, 2, 2, 2, 2), taking time 51.8458 seconds to get score of 25.665
        Current best combo: (0, 0, 0, 0, 1, 2, 0, 2, 2, 2, 2, 2) with val score 27.5062
Trained and Tested combination 6 of 80000000: (0, 0, 2, 0, 1, 2, 0, 2, 2, 2, 2, 2), taking time 51.7049 seconds to get score of 44.3252
        Current best combo: (0, 0, 2, 0, 1, 2, 0, 2, 2, 2, 2, 2) with val score 44.3252
Trained and Tested combination 8 of 80000000: (0, 0, 3, 0, 1, 2, 0, 2, 2, 2, 2, 2), taking time 51.8737 seconds to get score of 29.4093
        Current best combo: (0, 0, 2, 0, 1, 2, 0, 2, 2, 2, 2, 2) with val scor