In [None]:
#Last edited/used: 5/11/2023

#Purpose: look at performances of models

In [None]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from scipy.stats import chisquare
from sklearn.preprocessing import minmax_scale

from matplotlib.font_manager import FontProperties

font = {'family' : 'Helvetica',
        'size'   : 12}

plt.rc('font', **font)

In [None]:
# performance metrics & functions

def mda(df):
    mda = 0
    for idx in range(len(df)):
        if df['binary_pred'].iloc[idx] == df['TrueLabel'].iloc[idx]:
            mda += 1
    mda = mda / len(df)
    return round(mda, 4)


def get_net_profit(df):
    net = 0
    for idx in range(len(df)):
        if df['binary_pred'].iloc[idx] == df['TrueLabel'].iloc[idx]:
            net += df['Profit'].iloc[idx]
        else:
            net += -df['Profit'].iloc[idx]
    return round(net, 2)


def get_binary_prediction(forecasted, threshold=0.5):
    binary_pred = []
#     for predicted in forecasted.iloc[:,0]:
    for predicted in forecasted:
        if predicted > threshold:
            binary_pred.append(1)
        else:
            binary_pred.append(0)
    return {'binary_pred':binary_pred}


def get_profit_thr_df(predictions, profits, image_type):
    thresh = list(np.arange(0.0, 1.0, 0.001))
    net_profit_list = []
    
    max_net = -100.00
    max_net_thresh = 0.0
    for thr in thresh:
        pred_dict = get_binary_prediction(predictions, threshold=thr)
        df = pd.DataFrame.from_dict(pred_dict)
        df = pd.concat([df, profit_], axis=1)
        net = get_net_profit(df)
        net_profit_list.append(net)
        if net > max_net:
            max_net = net
            max_net_thresh = thr
    
    return max_net, max_net_thresh, net_profit_list

def get_log_return(df):
    logs = []
    for idx in range(len(df)):
        pt = df['FutureValue'].iloc[idx]
        pt_1 = df['CurrentValue'].iloc[idx]
        if df['binary_pred'].iloc[idx] == 0: # if long (buy)
#             print(pt, pt_1)
            gross_return = pt / pt_1
#             print('gross return', gross_return)
        else: # if short (sell)
            gross_return = pt_1 / pt
#             print('gross return for short', gross_return)
        logs.append(np.log(gross_return))
    return np.array(logs)

# It is custom for the risk free return to use the 10 Year Treasury Note,
# but as it has been low for long time, often 0 is used.

# https://www.bankrate.com/banking/savings/bank-of-america-savings-rates/
# bank of america savings interest rate = 0.01% 
def sharpe_ratio(log_returns, interst_rate = 0):
#     https://www.learnpythonwithrune.org/how-to-calculate-sharpe-ratio-with-pandas-and-numpy/
    # visualize the log-return of the portfolio
    fig, ax = plt.subplots()
    log_df = pd.DataFrame(log_returns)
    log_df.hist(bins=50, ax=ax)
    # daily sharpe ratio
    sharpe_ratio = (log_returns.mean() - interst_rate) / log_returns.std()
    # annualized sharpe ratio
    asr = sharpe_ratio*252**.5
    
    return sharpe_ratio, asr


def get_confusion(df):
     ### Confusion Matrix
    ConfusionMatrixDisplay.from_predictions(df['TrueLabel'][:], df['binary_pred'][:])
    tn, fp, fn, tp = confusion_matrix(df['TrueLabel'][:], df['binary_pred'][:]).ravel()
    print('tn: {}, fp: {}, fn: {}, tp: {}'.format(tn, fp, fn, tp))

    # Sensitivity, hit rate, recall, or true positive rate
    TPR = tp/(tp+fn)
    # Specificity or true negative rate
    TNR = tn/(tn+fp) 

    # Overall accuracy
    ACC = (tp+tn)/(tp+fp+fn+tn)

    plt.show()

    sr, asr = sharpe_ratio(get_log_return(df))
    
    return TPR, TNR, ACC, sr, asr


def get_chi_square(df):
    # Statistical Test: Chi-Squared Statistics 
    # Used to determine whether the confusion matrix of a classifier 
    # is statistically significant, or merely white noise

    # A chi-square test is a statistical test used to compare observed results with expected results. 
    # The purpose of this test is to determine if a difference between observed data and expected data 
    # is due to chance, or if it is due to a relationship between the variables you are studying.

    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.chisquare.html
    # Calculate a one-way chi-square test.
    # The chi-square test tests the null hypothesis that the categorical data has the given frequencies.
    # By setting axis=None, the test is applied to all data in the array, which is equivalent to applying the test to the flattened array.

    observed = [df['binary_pred'].values.tolist().count(0), df['binary_pred'].values.tolist().count(1)]
    expected = [df['TrueLabel'].values.tolist().count(0), df['TrueLabel'].values.tolist().count(1)]
    print('observed: ',observed)
    print('expected: ', expected)
    print(chisquare(observed, expected))
    
    
# def evaluate_results(prediction, horizon, profit, THRESH):
def evaluate_results(prediction, profit, THRESH):
    predictions = prediction
#     predictions = prediction[["predicted"]]
    # Drop Index
    profit_ = profit.reset_index(drop=True)
    
    
    # Set threshold to compare with true labels
    pred_dict = get_binary_prediction(predictions, threshold=THRESH) # POSITION 
    df = pd.DataFrame.from_dict(pred_dict)
    df['TrueLabel'] = profit_['TrueLabel']
    df['FutureValue'] = profit_['FutureValue']
    df['CurrentValue'] = profit_['CurrentValue']
    df['Profit'] = profit_['Profit']
    
    ### Confusion Matrix
    TPR, TNR, ACC, sr, asr = get_confusion(df)
    
    # Results
    print("Net Profit: ", get_net_profit(df))
    print("MDA: ", mda(df)*100)
    print("True Positive Rate :", TPR*100)
    print("True Negative Rate :", TNR*100)
    print("Accuracy :", ACC)
    print("Sharpe Ratio :", sr)
    print("Annualized Sharpe Ratio :", asr)
    
    get_chi_square(df)
    

In [None]:
# Load data here

#POSITION
# horizon = 'POSITION'
# horizon = 'SWING'
horizon = 'SCALP'

# {'LONG': 0, 'SHORT': 1}
if horizon == "SWING":
    vote = [0] #LONG (SWING)
    profit_from = '2018-04-07 00:00:00'
elif horizon == "POSITION":
    vote = [1] #SHORT
    profit_from = '2018-04-15 00:00:00'
elif horizon == "SCALP":
    vote = [1] #SHORT (POSITION, SCALP)
    profit_from = '2018-03-23 08:00:00'
    
    
# get profit chart from test date -> 
# 2018_04_15_00_00_00 # POSITION
# 2018_04_07_00_00_00 # SWING
# 2018_04_07_00_00_00 # SCALP


# test = pd.read_csv('../data/results_POSITION/true_pred_ARIMA_POSITION_1D.csv')
arima = pd.read_csv('../data/results_{0}/true_pred_ARIMA_{0}_1D.csv'.format(horizon))
cnn1d = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_1D_1D.csv'.format(horizon))
gaf = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_GAF.csv'.format(horizon))
gaf_agg = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_GAF_AGG.csv'.format(horizon))
cs = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_CS.csv'.format(horizon))
ti = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_TI.csv'.format(horizon))
gaf_ST = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_ST_GAF.csv'.format(horizon))
gaf_agg_ST = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_ST_GAF_AGG.csv'.format(horizon))
cs_ST = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_ST_CS.csv'.format(horizon))
ti_ST = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_ST_TI.csv'.format(horizon))


# gaf_cut = pd.read_csv('../data/results_{0}/true_pred_2dCNN_{0}_CUT_GAF.csv'.format(horizon))

# Load Profit Chart
profit_chart = pd.read_csv('../data/{}_profit_chart.csv'.format(horizon))[["PredictionDate", "CurrentValue", "FutureValue", "TrueLabel", "Profit"]]

# observed:  [16, 204]
# expected:  [127, 93]

# chisquare([2000, 400], [1400, 1000])

# normalize predictions of 2D CNN
predictions_arima = minmax_scale(arima[["predicted"]], feature_range=(0,1))
predictions_cnn1d = minmax_scale(cnn1d[["predicted"]], feature_range=(0,1))
predictions_gaf = minmax_scale(gaf[["predicted"]], feature_range=(0,1))
predictions_gaf_agg = minmax_scale(gaf_agg[["predicted"]], feature_range=(0,1))
predictions_cs = minmax_scale(cs[["predicted"]], feature_range=(0,1))
predictions_ti = minmax_scale(ti[["predicted"]], feature_range=(0,1))
predictions_gaf_ST = minmax_scale(gaf_ST[["predicted"]], feature_range=(0,1))
predictions_gaf_agg_ST = minmax_scale(gaf_agg_ST[["predicted"]], feature_range=(0,1))
predictions_cs_ST = minmax_scale(cs_ST[["predicted"]], feature_range=(0,1))
predictions_ti_ST = minmax_scale(ti_ST[["predicted"]], feature_range=(0,1))

# predictions_gaf = minmax_scale(gaf_cut[["predicted"]], feature_range=(0,1))

In [None]:
#Visualize Prediction Frequency

# Prediction Frequency Histograms
fig, axs = plt.subplots(5, 2, figsize=(20,17))
axs[0, 0].hist(predictions_arima, bins=20)
axs[0, 0].set_title('ARIMA')
axs[0, 1].hist(predictions_cnn1d, bins=20)
axs[0, 1].set_title('1D CNN')
axs[1, 0].hist(predictions_gaf, bins=20)
axs[1, 0].set_title('GAF')
axs[1, 1].hist(predictions_gaf_agg, bins=20)
axs[1, 1].set_title('GAF AGG')
axs[2, 0].hist(predictions_cs, bins=20)
axs[2, 0].set_title('CS')
axs[2, 1].hist(predictions_ti, bins=20)
axs[2, 1].set_title('TI')
axs[3, 0].hist(predictions_gaf_ST, bins=20)
axs[3, 0].set_title('GAF ST')
axs[3, 1].hist(predictions_gaf_agg_ST, bins=20)
axs[3, 1].set_title('GAF AGG ST')
axs[4, 0].hist(predictions_cs_ST, bins=20)
axs[4, 0].set_title('CS ST')
axs[4, 1].hist(predictions_ti_ST, bins=20)
axs[4, 1].set_title('TI ST')

# fig.suptitle('Prdiction Frequency', fontsize=12)
# for ax in axs.flat:
#     ax.set(xlabel='predictions', ylabel='frequency')

# # Hide x labels and tick labels for top plots and y ticks for right plots.
# for ax in axs.flat:
#     ax.label_outer()

fig.savefig('./{}_prediction_freq.png'.format(horizon),bbox_inches='tight')

In [None]:
#1. Choose threshold with a subset of test data 

In [None]:
# subset of test data

mask = (profit_chart.PredictionDate >= profit_from) & (profit_chart.PredictionDate  < '2019')
profit_ = profit_chart.loc[mask]
profit_ = profit_.reset_index(drop=True)

profit_evaluate = profit_chart[profit_chart.PredictionDate >= '2019']

In [None]:
# Net Profit by threshold
# Profits by Thresholds on a Subset of test data

In [None]:
# true labels for threshold test subset
true_0 = list(profit_.TrueLabel).count(0)
true_1 = list(profit_.TrueLabel).count(1)
print('number of labels 0, 1:', true_0, true_1)
print('total number of test set:', len(profit_))


# true labels for evaluation test subset
true_0_eval = list(profit_evaluate.TrueLabel).count(0)
true_1_eval = list(profit_evaluate.TrueLabel).count(1)
print('number of labels 0, 1:', true_0_eval, true_1_eval)
print('total number of test set:', len(profit_evaluate))

# profit_ = profit_[profit_['PredictionDate'] < '2019-01-01 00:00:00']
# majority vote (set up here)
# majority_vote = pd.DataFrame(majority_vote * len(profit_), columns=['binary_pred'])
# majority_vote.head()
majority_vote = vote * len(profit_)

In [None]:
# get profits of each model to determine threshold

arima_max_net, arima_max_net_thresh, arima_lst = get_profit_thr_df(predictions_arima[:len(profit_)], profit_, 'arima')
cnn1dmax_net, cnn1d_max_net_thresh, cnn1d_lst = get_profit_thr_df(predictions_cnn1d[:len(profit_)], profit_, 'cnn1d')
gaf_max_net, gaf_max_net_thresh, gaf_lst = get_profit_thr_df(predictions_gaf[:len(profit_)], profit_, 'gaf')
gaf_agg_max_net, gaf_agg_max_net_thresh, gaf_agg_lst = get_profit_thr_df(predictions_gaf_agg[:len(profit_)], profit_, 'gaf_agg')
cs_max_net, cs_max_net_thresh, cs_lst = get_profit_thr_df(predictions_cs[:len(profit_)], profit_, 'CS')
ti_max_net, ti_max_net_thresh, ti_lst = get_profit_thr_df(predictions_ti[:len(profit_)], profit_, 'TI')
gaf_ST_max_net, gaf_ST_max_net_thresh, gaf_ST_lst = get_profit_thr_df(predictions_gaf_ST[:len(profit_)], profit_, 'gaf_ST')
gaf_agg_ST_max_net, gaf_agg_ST_max_net_thresh, gaf_agg_ST_lst = get_profit_thr_df(predictions_gaf_agg_ST[:len(profit_)], profit_, 'gaf_agg_ST')
cs_ST_max_net, cs_ST_max_net_thresh, cs_ST_lst = get_profit_thr_df(predictions_cs_ST[:len(profit_)], profit_, 'CS_ST')
ti_ST_max_net, ti_ST_max_net_thresh, ti_ST_lst = get_profit_thr_df(predictions_ti_ST[:len(profit_)], profit_, 'TI_ST')
mv_max_net, mv_max_net_thresh, mv_lst = get_profit_thr_df(majority_vote, profit_, 'mv')

In [None]:
threshes = [arima_max_net_thresh, cnn1d_max_net_thresh, gaf_max_net_thresh, gaf_agg_max_net_thresh, cs_max_net_thresh,
           ti_max_net_thresh, gaf_ST_max_net_thresh, gaf_agg_ST_max_net_thresh, cs_ST_max_net_thresh, ti_ST_max_net_thresh,
           mv_max_net_thresh]

nets = [arima_max_net, cnn1dmax_net, gaf_max_net, gaf_agg_max_net, cs_max_net, ti_max_net, gaf_ST_max_net, gaf_agg_ST_max_net,
       cs_ST_max_net, ti_ST_max_net, mv_max_net]

models = ['arima', 'cnn1d', 'gaf', 'gaf_agg', 'cs', 'ti', 'gaf_ST', 'gaf_agg_ST',
       'cs_ST', 'ti_ST', 'majority vote']


In [None]:
profit_thr_df = pd.DataFrame({'arima': arima_lst, 'cnn1d':cnn1d_lst, 'gaf': gaf_lst, 'gaf_agg':gaf_agg_lst,
                             'cs': cs_lst, 'ti': ti_lst, 'majority': mv_lst}, index=list(np.arange(0.0, 1.0, 0.001)))

lines = profit_thr_df.plot.line(title='Profit by Threshold', xlabel='Threshold', ylabel='Net Profit')
lines.figure.savefig('./{}_profit_thresh_1.png'.format(horizon),bbox_inches='tight')


In [None]:
profit_thr_df = pd.DataFrame({'arima': arima_lst, 'cnn1d':cnn1d_lst, 'gaf_ST': gaf_ST_lst, 'gaf_agg_ST':gaf_agg_ST_lst,
                             'cs_ST': cs_ST_lst, 'ti_ST': ti_ST_lst, 'majority': mv_lst}, index=list(np.arange(0.0, 1.0, 0.001)))

lines = profit_thr_df.plot.line(title='Profit by Threshold', xlabel='Threshold', ylabel='Net Profit')
lines.figure.savefig('./{}_profit_thresh_2.png'.format(horizon),bbox_inches='tight')


In [None]:
# max net profit possible
round(sum(profit_evaluate['Profit']), 2)

In [None]:
for i in range(len(threshes)):
    print('{} threshold:'.format(models[i]), threshes[i])


In [None]:
evaluate_results(vote * len(profit_evaluate), profit_evaluate, mv_max_net_thresh)

In [None]:
evaluate_results(cs_ST['predicted'][len(profit_):], profit_evaluate, cs_ST_max_net_thresh)

In [None]:
profit_evaluate['Profit_logged'] = np.log(profit_evaluate['Profit'] + 0.00001)

In [None]:
plt.hist(profit_evaluate['Profit'], bins=20)

plt.xlabel('Profits')
plt.ylabel('Frequency')
plt.title('Profit Frequency for {}'.format(horizon))
plt.savefig('./{}_profit_dist.png'.format(horizon),bbox_inches='tight')

In [None]:
plt.hist(profit_evaluate['Profit_logged'], bins=20)

plt.xlabel('Logged Profits')
plt.ylabel('Frequency')
plt.title('Profit Frequency for {}'.format(horizon))
plt.savefig('./{}_logged_profit_dist.png'.format(horizon),bbox_inches='tight')

In [None]:
med_logged = np.median(profit_evaluate['Profit_logged'])

In [None]:
one_std = np.var(profit_evaluate['Profit_logged'])

In [None]:
lower_cutoff = med_logged - one_std
upper_cutoff = med_logged + one_std

In [None]:
lower_cutoff

In [None]:
upper_cutoff

In [None]:
# 0 - 1 is low
print('low: ', len(profit_evaluate[profit_evaluate['Profit_logged'] < lower_cutoff]))

# 1 - 3 is mid
mask_cutoff = (profit_evaluate.Profit_logged >= lower_cutoff) & (profit_evaluate.Profit_logged  <= upper_cutoff)
print('mid: ', len(profit_evaluate[mask_cutoff]))

# greater than 3 is high
print('high: ', len(profit_evaluate[profit_evaluate['Profit_logged'] > upper_cutoff]))

In [None]:
def get_profit_segments(profit_evaluate):
    profit_segments = []
    df = profit_evaluate.copy()
    med_logged = np.median(df['Profit_logged'])
    one_std = np.var(df['Profit_logged'])
    lower_cutoff = med_logged - one_std
    upper_cutoff = -2.0 #med_logged + one_std
    
    for profit in df['Profit_logged']:
        if profit < lower_cutoff:
            segment = 'low'
        elif profit > upper_cutoff:
            segment = 'high'
        else:
            segment = 'mid'

        profit_segments.append(segment)
    
    print('low: ', profit_segments.count('low'))
    print('mid: ', profit_segments.count('mid'))
    print('high: ', profit_segments.count('high'))
    df["segments"] = profit_segments
    
    return df

In [None]:
with_seg = get_profit_segments(profit_evaluate)

In [None]:
with_seg

In [None]:
def eval_segments(profit, predicted, THRESH):

    correct = []
    wrong = []
#     predicted_labels = get_binary_prediction(arima['predicted'][len(profit_):], threshold=0.7)
    predicted_labels = get_binary_prediction(predicted, threshold=THRESH)
    eval_ =  profit.reset_index(drop=True)

    for i in range(len(eval_)):
        # have to choose a threshold first and change predicted from probability to labels
        if eval_['TrueLabel'].iloc[i] == predicted_labels['binary_pred'][i]:
            correct.append(eval_['segments'].iloc[i])
        else:
            wrong.append(eval_['segments'].iloc[i])
            
    dict_segments = {'correct':correct, 'wrong':wrong}
    high_seg = dict_segments['correct'].count('high') / (dict_segments['correct'].count('high') + dict_segments['wrong'].count('high'))
    mid_seg = dict_segments['correct'].count('mid') / (dict_segments['correct'].count('mid') + dict_segments['wrong'].count('mid'))
    low_seg = dict_segments['correct'].count('low') / (dict_segments['correct'].count('low') + dict_segments['wrong'].count('low'))
    
    print('Proportion Classified Correctly for each segment\n')
    print('high: ', round(high_seg,4))
    print('mid: ', round(mid_seg,4))
    print('low: ', round(low_seg,4))


In [None]:
eval_segments(with_seg, arima['predicted'][len(profit_):], arima_max_net_thresh)

In [None]:
eval_segments(with_seg, cnn1d['predicted'][len(profit_):], cnn1d_max_net_thresh)

In [None]:
eval_segments(with_seg, gaf['predicted'][len(profit_):], gaf_max_net_thresh)

In [None]:
eval_segments(with_seg, gaf_agg['predicted'][len(profit_):], gaf_agg_max_net_thresh)

In [None]:
eval_segments(with_seg, cs['predicted'][len(profit_):], cs_max_net_thresh)

In [None]:
eval_segments(with_seg, ti['predicted'][len(profit_):], ti_max_net_thresh)

In [None]:
eval_segments(with_seg, gaf_ST['predicted'][len(profit_):], gaf_ST_max_net_thresh)

In [None]:
eval_segments(with_seg, gaf_agg_ST['predicted'][len(profit_):], gaf_agg_ST_max_net_thresh)

In [None]:
eval_segments(with_seg, cs_ST['predicted'][len(profit_):], cs_ST_max_net_thresh)

In [None]:
eval_segments(with_seg, ti_ST['predicted'][len(profit_):], ti_ST_max_net_thresh)

In [None]:
sum(profit_evaluate[profit_evaluate.TrueLabel == 1].Profit)

In [None]:
sum(profit_evaluate[profit_evaluate.TrueLabel == 0].Profit)

In [None]:
# # Statistical Test: Comparisons of Models using Accuracy CI test

# # Assume given two models f1 and f2, w/ estimated accuracies a1 
# # (estimated on test set with size n1) and a2 (estimated on test set with size n2)
# # Conclude accuracy f1 > f2 or accuracy f1 – accracy f2 > 0 , if CI for a1-a2 is entirely to the right of 0
# # If interval contains 0, no difference

# # alpha = 0.05, two-sided Z = 1.96
# ci_lower = (a1 - a2) - 1.96*np.sqrt(((a1*(1-a1))/n1) + ((a2*(1-a2))/n2))
# ci_upper = (a1 - a2) + 1.96*np.sqrt(((a1*(1-a1))/n1) + ((a2*(1-a2))/n2))

# print("CI interval :", ci_lower, ci_upper)