In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.metrics import accuracy_score, f1_score
from itertools import product
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from sklearn.model_selection import GridSearchCV
import os
from keras.models import Sequential
from keras.layers import Dense
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split




# DATA

先從總資料抽20%(每年抽等比例的等分)並把這20%隨機分成10%的train跟10%的test(同時上升跟下降的樣本數要一樣)，用這20%的資料去train出最佳hyper parameters，最後在比較用各模型的最佳hyper parameters來跑全資料，並比較各模型的表現(f1, accuracy)。 

In [2]:
def combined_df_for_model(df_price, df_tech):
    df_price['diff'] = df_price['close'].diff().dropna()
    df_price = df_price[df_price['diff'] != 0]
    df_price['diff'] = df_price['diff'].apply(lambda x : -1 if x<0 else 1)
    df_price_diff = df_price['diff']
    df_tech.Date = pd.to_datetime(df_tech.Date, format = '%Y/%m/%d')
    df_tech = df_tech.set_index('Date')
    df_merge = pd.merge(df_price_diff, df_tech, left_index=True, right_index=True, how='inner')
    return df_merge

def spilit_for_par(df):

    df['Year'] = df.index.year
    samples_per_year = df.groupby('Year').size()

    training = []
    testing = []

    for year, days in zip(df['Year'].unique(), samples_per_year):
        d = int(days*0.1)
        year_data = df[df['Year'] == year]
        
        up_samples = year_data[year_data['diff'] == 1].sample(d)
        up_training = up_samples.sample(n = int(len(up_samples)/2))
        up_testing = up_samples.copy().drop(up_training.index)
        
        down_samples = year_data[year_data['diff'] == -1].sample(d)
        down_training = down_samples.sample(n = int(len(down_samples)/2))
        down_testing = down_samples.copy().drop(down_training.index)
        
        training.append(pd.concat([up_training, down_training]))
        testing.append(pd.concat([up_testing, down_testing]))

    final_training = pd.concat(training)
    final_testing = pd.concat(testing)

    final_training = final_training.drop('Year', axis=1)
    final_testing = final_testing.drop('Year', axis=1)
    return final_training, final_testing

def spilit_for_off(df):

    df['Year'] = df.index.year
    training = []
    testing = []

    for year in df['Year'].unique():
        year_data = df[df['Year'] == year]
        
        up_samples = year_data[year_data['diff'] == 1]
        up_training = up_samples.sample(frac = 0.5)
        up_testing = up_samples.copy().drop(up_training.index)
        
        down_samples = year_data[year_data['diff'] == -1]
        down_training = down_samples.sample(frac = 0.5)
        down_testing = down_samples.copy().drop(down_training.index)
        
        training.append(pd.concat([up_training, down_training]))
        testing.append(pd.concat([up_testing, down_testing]))

    final_training = pd.concat(training)
    final_testing = pd.concat(testing)

    final_training = final_training.drop('Year', axis=1)
    final_testing = final_testing.drop('Year', axis=1)
    return final_training, final_testing

# Models

In [3]:
# ANN
# def ann(X_train, y_train, X_test, y_test, par = []):

#     # parameters 
#     if len(par) == 0:
#         n = list(np.arange(10, 101, 10))
#         mc = list(np.arange(0.1, 1, 0.1))
#         ep = list(np.arange(1000, 10001, 1000))
#     else : 
#         n = [par[0]]
#         mc = [par[1]]
#         ep = [par[2]]

#     lr = 0.1

#     score_ann = pd.DataFrame(columns = ['hidden layer neurons', 'momentum constant', 'epochs', 'accuracy', 'f1 score'])
#     for combine in product(n, mc, ep):
#         n = combine[0]
#         mc = combine[1]
#         ep = combine[2]

#         input_size = 10  
#         hidden_size = n  
#         output_size = 1  

#         model = models.Sequential([
#             layers.Dense(hidden_size, activation = 'tanh', input_shape = (input_size,)),
#             layers.Dense(output_size, activation = 'sigmoid')
#         ])

#         model.compile(optimizer = tf.keras.optimizers.SGD(learning_rate=lr, momentum = mc),
#                     loss='binary_crossentropy',
#                     metrics=['accuracy'])

#         model.fit(X_train, y_train, epochs=int(ep), batch_size = 32, validation_split = 0) # 不確定 batch_size 跟 vs 要設多少，就先用default

#         y_pred = model.predict(X_test)
#         y_pred_binary = pd.Series((y_pred >= 0.5).astype(int).ravel()).apply(lambda x : 1 if x>= 0.5 else -1)

#         accuracy = accuracy_score(y_test, y_pred_binary)
#         f1 = f1_score(y_test, y_pred_binary)
#         score_ann.loc[len(score_ann)] = [n, mc, ep, accuracy, f1]
#         #print(f"Accuracy : {accuracy} /// F1 score : {f1}")
#     return score_ann

def ann(X_train, y_train, X_test, y_test, par=[]):
    if len(par) == 0:
        n = list(np.arange(10, 101, 10))
        mc = list(np.arange(0.1, 1, 0.1))
        ep = list(np.arange(1000, 10001, 1000))
    else:
        n = [par[0]]
        mc = [par[1]]
        ep = [par[2]]

    lr = 0.1

    score_ann = pd.DataFrame(columns=['hidden layer neurons', 'momentum constant', 'epochs', 'accuracy', 'f1 score'])
    for combine in product(n, mc, ep):
        n_val = combine[0]
        mc_val = combine[1]
        ep_val = combine[2]
        
        model = MLPClassifier(
            hidden_layer_sizes=(int(n_val),),  # 只有一層 hidden layer
            activation='tanh',  # tanh 作為激活函數
            solver='sgd',  # 隨機梯度下降優化器
            learning_rate_init=lr,
            momentum=mc_val,
            max_iter=int(ep_val),
            random_state=42  # 設定隨機種子以保證結果可重現
        )

        model.fit(X_train, y_train)

        y_pred = model.predict(X_test)

        accuracy = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        score_ann.loc[len(score_ann)] = [n_val, mc_val, ep_val, accuracy, f1]
    return score_ann
    
# SVM (polynomial)
def svm_p(X_train, y_train, X_test, y_test, par = []):
    # parameters 
    if len(par) == 0:
        degree = [1, 2, 3, 4]
        reg_para = [0.5, 1, 5, 10, 100] 
    else:
        degree = [par[0]]
        reg_para = [par[1]] 

    score_svm_p = pd.DataFrame(columns = ['Degree of kernel function', 'Regularization parameter', 'accuracy', 'f1 score'])

    for combine in product(degree ,reg_para):
        d = combine[0]
        c = combine[1]
        poly_svm = svm.SVC(kernel='poly', degree=int(d), C=c)
        poly_svm.fit(X_train, y_train)
        y_pred_poly = poly_svm.predict(X_test)

        accuracy = accuracy_score(y_test, y_pred_poly)
        f1 = f1_score(y_test, y_pred_poly)

        score_svm_p.loc[len(score_svm_p)] = [d, c, accuracy, f1]
    return score_svm_p

# SVM (radial basis)
def svm_r(X_train, y_train, X_test, y_test, par = []):
    # parameters 
    if len(par) == 0:
        gamma = list(np.arange(0.5, 5.1, 0.5)) + [10] # 這邊paper超怪，他寫這樣總共有10個，幹但明明就11個，大概是typo
        reg_para = [0.5, 1, 5, 10] 
    else:
        gamma = [par[0]]
        reg_para = [par[1]]
    
    score_svm_r = pd.DataFrame(columns = ['Gamma in kernel function', 'Regularization parameter', 'accuracy', 'f1 score'])
    for combine in product(gamma ,reg_para):
        g = combine[0]
        c = combine[1]
        rbf_svm = svm.SVC(kernel='rbf', C = c, gamma = float(g))
        rbf_svm.fit(X_train, y_train)
        y_pred_rbf = rbf_svm.predict(X_test)

        accuracy = accuracy_score(y_test, y_pred_rbf)
        f1 = f1_score(y_test, y_pred_rbf)

        score_svm_r.loc[len(score_svm_r)] = [g, c, accuracy, f1]
    return score_svm_r

# RF
def rf(X_train, y_train, X_test, y_test, par = []):
    # parameters
    if len(par)==0:
        ntree = list(np.arange(10, 201, 10))
    else:
        ntree = [par[0]]
    score_rf = pd.DataFrame(columns = ['n', 'accuracy', 'f1 score'])
    for n in ntree:
        
        random_forest = RandomForestClassifier(n_estimators = int(n), random_state = 5278)

        random_forest.fit(X_train, y_train)

        y_pred_rf = random_forest.predict(X_test)

        accuracy = accuracy_score(y_test, y_pred_rf)
        f1 = f1_score(y_test, y_pred_rf)
        score_rf.loc[len(score_rf)] = [n, accuracy, f1]
    return score_rf

# NB
def nb(X_train, y_train, X_test, y_test, flag):
    if flag == 1:
        naive_bayes = GaussianNB() # for 連續特徵
    elif flag == 2:
        naive_bayes = BernoulliNB() # for 離散特徵
    
    naive_bayes.fit(X_train, y_train)

    y_pred_nb = naive_bayes.predict(X_test)

    accuracy_nb = accuracy_score(y_test, y_pred_nb)
    f1_nb = f1_score(y_test, y_pred_nb)
    return [accuracy_nb, f1_nb]

In [4]:
param_grid_ann = {
    'hidden_size': list(np.arange(10, 101, 10)),
    'momentum': list(np.arange(0.1, 1, 0.1)),
    'epochs': list(np.arange(1000, 10001, 1000))
}


param_grid_svm_p = {
    'degree': [1, 2, 3, 4],
    'C': [0.5, 1, 5, 10, 100]
}

param_grid_svm_r = {
    'gamma': list(np.arange(0.5, 5.1, 0.5)) + [10],
    'C': [0.5, 1, 5, 10]
}

param_grid_rf = {
    'n_estimators': list(np.arange(10, 201, 10))
}


# Running

In [5]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.metrics import accuracy_score, f1_score
from itertools import product
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from sklearn.model_selection import GridSearchCV
import os
from keras.models import Sequential
from keras.layers import Dense
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
df_raw = pd.read_excel('../data/data.xlsx', index_col = 0, header=1)
df_raw.dropna(inplace=True)
df_raw.index = pd.to_datetime(df_raw.index)
col_name = ['open', 'high', 'low', 'close', 'vol']
df_TX = df_raw.iloc[:,:5].set_axis(col_name, axis = 1)
df_0050 = df_raw.iloc[:,5:10].set_axis(col_name, axis = 1)
df_2330 = df_raw.iloc[:,10:15].set_axis(col_name, axis = 1)
df_2881 = df_raw.iloc[:,15:20].set_axis(col_name, axis = 1)

data_input = '../data/tidy'
data_output_par = '../data/output/parameters'
data_output_off = '../data/output/official'
data_output_off_ra = '../data/output/official_random_split'

for file in os.listdir(data_input):
    print(file)
    df_tech = pd.read_csv(os.path.join(data_input, file))

    # com
    if '0050' in file:
        df_price = df_0050.copy()
    elif '2330' in file:
        df_price = df_2330.copy()
    elif '2881' in file:
        df_price = df_2881.copy()
    elif 'TX' in file:
        df_price = df_TX.copy()
    
    # flag for nb
    if file.split('_')[1] == '1':
        flag = 1
    elif file.split('_')[1] == '2':
        flag = 2

    df_all = combined_df_for_model(df_price, df_tech)
    # for training parameters
    training_par, testing_par = spilit_for_par(df_all)
    X_train_par = training_par.iloc[:, 1:]
    y_train_par = training_par.iloc[:, 0]
    X_test_par = testing_par.iloc[:, 1:]
    y_test_par = testing_par.iloc[:, 0]

    # for official parameters
    training, testing = spilit_for_off(df_all)
    X_train = training.iloc[:, 1:]
    y_train = training.iloc[:, 0]
    X_test = testing.iloc[:, 1:]
    y_test = testing.iloc[:, 0]

    # for randomly split
    X_train_ra, X_test_ra, y_train_ra, y_test_ra = train_test_split(df_all.iloc[:, 1:], df_all.iloc[:, 0], test_size=0.5, random_state=5278)

    df_ann_par = ann(X_train_par, y_train_par, X_test_par, y_test_par)
    df_ann_par.to_csv(os.path.join(data_output_par, file.replace('.csv', '') + '_ann.csv'), index = False)
    ann_temp = df_ann_par[df_ann_par['accuracy'] == max(df_ann_par['accuracy'])]
    ann_par = [ann_temp['hidden layer neurons'].values[0], ann_temp['momentum constant'].values[0], ann_temp['epochs'].values[0]]
    df_ann = ann(X_train, y_train, X_test, y_test, ann_par)
    df_ann.to_csv(os.path.join(data_output_off, file.replace('.csv', '') + '_ann.csv'), index = False)
    df_ann_ra = ann(X_train_ra, y_train_ra, X_test_ra, y_test_ra, ann_par)
    df_ann_ra.to_csv(os.path.join(data_output_off_ra, file.replace('.csv', '') + '_ann.csv'), index = False)

    df_svm_p_par = svm_p(X_train_par, y_train_par, X_test_par, y_test_par)
    df_svm_p_par.to_csv(os.path.join(data_output_par, file.replace('.csv', '') + '_svm_p.csv'), index = False)
    svm_p_temp = df_svm_p_par[df_svm_p_par['accuracy'] == max(df_svm_p_par['accuracy'])]
    svm_p_par = [svm_p_temp['Degree of kernel function'].values[0], svm_p_temp['Regularization parameter'].values[0]]
    df_svm_p = svm_p(X_train, y_train, X_test, y_test, svm_p_par)
    df_svm_p.to_csv(os.path.join(data_output_off, file.replace('.csv', '') + '_svm_p.csv'), index = False)
    df_svm_p_ra = svm_p(X_train_ra, y_train_ra, X_test_ra, y_test_ra, svm_p_par)
    df_svm_p_ra.to_csv(os.path.join(data_output_off_ra, file.replace('.csv', '') + '_svm_p.csv'), index = False)

    df_svm_r_par = svm_r(X_train_par, y_train_par, X_test_par, y_test_par)
    df_svm_r_par.to_csv(os.path.join(data_output_par, file.replace('.csv', '') + '_svm_r.csv'), index = False)
    svm_r_temp = df_svm_r_par[df_svm_r_par['accuracy'] == max(df_svm_r_par['accuracy'])]
    svm_r_par = [svm_r_temp['Gamma in kernel function'].values[0], svm_r_temp['Regularization parameter'].values[0]]
    df_svm_r = svm_r(X_train, y_train, X_test, y_test, svm_r_par)
    df_svm_r.to_csv(os.path.join(data_output_off, file.replace('.csv', '') + '_svm_r.csv'), index = False)
    df_svm_r_ra = svm_r(X_train_ra, y_train_ra, X_test_ra, y_test_ra, svm_r_par)
    df_svm_r_ra.to_csv(os.path.join(data_output_off_ra, file.replace('.csv', '') + '_svm_r.csv'), index = False)

    df_rf_par = rf(X_train_par, y_train_par, X_test_par, y_test_par)
    df_rf_par.to_csv(os.path.join(data_output_par, file.replace('.csv', '') + '_rf.csv'), index = False)
    rf_temp = df_rf_par[df_rf_par['accuracy'] == max(df_rf_par['accuracy'])]
    rf_par = [rf_temp['n'].values[0]]
    df_rf = rf(X_train, y_train, X_test, y_test, rf_par)
    df_rf.to_csv(os.path.join(data_output_off, file.replace('.csv', '') + '_rf.csv'), index = False)
    df_rf_ra = rf(X_train_ra, y_train_ra, X_test_ra, y_test_ra, rf_par)
    df_rf_ra.to_csv(os.path.join(data_output_off_ra, file.replace('.csv', '') + '_rf.csv'), index = False)

    df_nb = pd.DataFrame(nb(X_train, y_train, X_test, y_test, flag)) # nb no parameters
    df_nb.to_csv(os.path.join(data_output_off, file.replace('.csv', '') + '_nb.csv'), index = False)
    df_nb_ra = pd.DataFrame(nb(X_train_ra, y_train_ra, X_test_ra, y_test_ra, flag)) # nb no parameters
    df_nb_ra.to_csv(os.path.join(data_output_off_ra, file.replace('.csv', '') + '_nb.csv'), index = False)
    

method_1_0050.csv


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_price['diff'] = df_price['diff'].apply(lambda x : -1 if x<0 else 1)


method_1_2330.csv


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_price['diff'] = df_price['diff'].apply(lambda x : -1 if x<0 else 1)


method_1_2881.csv


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_price['diff'] = df_price['diff'].apply(lambda x : -1 if x<0 else 1)


method_1_TX.csv
method_2_0050.csv


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_price['diff'] = df_price['diff'].apply(lambda x : -1 if x<0 else 1)


method_2_2330.csv


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_price['diff'] = df_price['diff'].apply(lambda x : -1 if x<0 else 1)


method_2_2881.csv


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_price['diff'] = df_price['diff'].apply(lambda x : -1 if x<0 else 1)


method_2_TX.csv
