In [None]:
from ARIMA_LSTM_FUNCTIONS_REAL import *

# S&P 500

In [None]:
SENS = 'RANDOM_SEARCH_1'
PATH = F'S&P500/Sensitivity Analysis/{SENS}'

## ARIMA

In [None]:
df_SnP_ARIMA = get_data_ARIMA(index_symbol="^GSPC", start="1996-01-17")

### Hyperparameter Tunning SET-UP

In [None]:
lookback_bars = 1000
validation_bars = 250
testing_bars = 250

p_max = 6
q_max = 6

### ARIMA

In [None]:
if path.exists(f'./Results/{PATH}/Visualisations/ARIMA_resids.csv') == True:
    ARIMA_resids = pd.read_csv(f'./Results/{PATH}/Visualisations/ARIMA_resids.csv')
    print(f'ARIMA resids results already exits')
else: 
    model_predictions_SnP, train_data_SnP, test_validation_data_SnP = main_arima(p_max, q_max, df_SnP_ARIMA, lookback_bars, validation_bars, testing_bars)
    df_test_SnP_ARIMA = pd.concat(test_validation_data_SnP, axis=0)
    ARIMA_resids = df_test_SnP_ARIMA['Close'] - model_predictions_SnP
    
    if not os.path.exists(f'./Results/{PATH}/Visualisations/'):
        os.makedirs(f'./Results/{PATH}/Visualisations/')
    ARIMA_resids.to_csv(f'./Results/{PATH}/Visualisations/ARIMA_resids.csv', index=False)

# S&P 500

### Downloading data

In [None]:
# downloading data
df_SnP = get_data(index_symbol="^GSPC")
df_ViX = get_data(index_symbol='^VIX')

# setting data
df_SnP['Vix_Close'] = df_ViX['Close']
df_SnP['stock_returns'] = df_SnP['Close'].pct_change().dropna()
df_SnP['ARIMA_residuals'] = ARIMA_resids.values

### Data preperation

In [None]:
# data preperation 
values = df_SnP.values.astype('float32')

values_X = values[:, [3,5,6,8]] 
values_Y = values[:, 3]

# scaling data
scaler_X = MinMaxScaler(feature_range=(0, 1))
scaler_Y = MinMaxScaler(feature_range=(0, 1))

S_C_X = scaler_X.fit_transform(values_X)
S_C_Y = scaler_Y.fit_transform(values_Y.reshape(-1,1))

S_C_X = pd.DataFrame(S_C_X)
S_C_Y = pd.DataFrame(S_C_Y)

scaled_values_X_7, scaled_values_Y_7 = create_dataset(S_C_X[14:], S_C_Y[14:], 7)
scaled_values_X_14, scaled_values_Y_14 = create_dataset(S_C_X[7:], S_C_Y[7:], 14)
scaled_values_X_21, scaled_values_Y_21 = create_dataset(S_C_X, S_C_Y, 21)

datasets = {
    7: (scaled_values_X_7, scaled_values_Y_7), 
    14: (scaled_values_X_14, scaled_values_Y_14),
    21: (scaled_values_X_21, scaled_values_Y_21)
}

# Hyperparameter Tunning SET-UP

In [None]:
epochs = 100
patience = 10

lookback_bars = 1000
validation_bars = 250
testing_bars = 250
NUM_TRIAL = 20

TRANSACTION_COST = (0.2/100) #(0.1/100)

# Hyper-Parameter tunning

In [None]:
# FUNC_CLASS_TNR = HyperparameterTuner(datasets, lookback_bars, validation_bars, testing_bars, PATH, epochs, patience, NUM_TRIAL)

# TUNER = FUNC_CLASS_TNR.hyperparameter_tuning()

# Long Only

In [None]:
# Getting the best model for the Long-Only strategy based on the IR2 metric

# df_LO = find_best_model_LO(datasets, lookback_bars, validation_bars, testing_bars, PATH, scaler_Y, TRANSACTION_COST, NUM_TRIAL) #change bars

In [None]:
# df_LO.to_csv(f'./Best_Models/{PATH}/df_copy_LO.csv', index=False)
DF_COPY_LO = pd.read_csv(f'./Best_Models/{PATH}/df_copy_LO.csv')

In [None]:
grouped = DF_COPY_LO.groupby('id_')

# Initialize empty lists to store results
id_list = []
model_list = []

# Define a weight for the proximity of 'IR_2_train_value' to 'IR_2_validation_value'
train_validation_weight = 1  # Weight for the proximity of 'IR_2_train_value' to 'IR_2_validation_value'
loss_weight = 0.8

# Iterate through each group and find the model_num based on the custom criterion
for id_, group in grouped:
    # Calculate the custom score based on the weight
    group['custom_score'] = abs(group['IR_2_validation_value'] - group['IR_2_train_value'])
    group['custom_score'] = np.where(group['IR_2_validation_value']==0, np.nan, group['custom_score'])
    
    # Find the row with the highest custom score
    try:
        best_model_row = group.loc[group['custom_score'].idxmin()]['model_num']
    except:
        best_model_row = 'model_0'

    
    id_list.append(id_)
    model_list.append(best_model_row)

# Create a new DataFrame to store the results
result_df_LO = pd.DataFrame({'id_': id_list, 'model_num': model_list})

# Convert 'id_' column to numeric and then sort
result_df_LO['id_'] = result_df_LO['id_'].str.replace('id_', '').astype(int)
result_df_LO.sort_values(by='id_', inplace=True)

# Convert 'id_' column back to string format
result_df_LO['id_'] = 'id_' + result_df_LO['id_'].astype(str)
result_df_LO = result_df_LO.reset_index(drop=True)

print(result_df_LO)

### THE BEST SO FAR

In [None]:
DATASETS = datasets
stock_name = PATH

ranges = list(range(lookback_bars, len(DATASETS[21][0]) - testing_bars, validation_bars))
for i in range(0, len(ranges)): 
    
    if path.exists(f'./Best_Models/{stock_name}/Long-Only/model_ID_{i}.h5') == True:
        print(f'[SECTION_1 --> id_{i}] path exits') 

    else:
        best_hyperparameters_index = result_df_LO['model_num'][i][6:]
        
        src_path = f'./Hyperparameter_tunning/{stock_name}/HP_Grid_Search_{i}/model_GS_{best_hyperparameters_index}.h5'
        pre_dst_path = f'./Best_Models/{stock_name}/Long-Only'
        if not os.path.exists(pre_dst_path):
            os.makedirs(pre_dst_path)
        dst_path = pre_dst_path + f'/model_ID_{i}.h5'
        shutil.copy(src_path, dst_path)

        print(f'File Moved --> ID_{i}')
    
    print(f"id_{i}")

# PREDICT

In [None]:
TEST_Y_LO, test_model_predictions_LO = predict_LSTM(datasets, lookback_bars, validation_bars, testing_bars, f'{PATH}/Long-Only')

In [None]:
TEST_Y_LO = np.concatenate(TEST_Y_LO)
y_pred_test_LO = np.concatenate(test_model_predictions_LO)

In [None]:
inv_y_test_LO = scaler_Y.inverse_transform(TEST_Y_LO.reshape(-1,1)).flatten()
inv_y_pred_test_LO = scaler_Y.inverse_transform(y_pred_test_LO).flatten()

In [None]:
num = lookback_bars+validation_bars+21

In [None]:
df_inv_y_test_LO = pd.DataFrame(data={'Date': df_SnP.index[num:(len(inv_y_test_LO)+num)], 'inv_y__test': inv_y_test_LO})
df_inv_y_pred_test_LO = pd.DataFrame(data={'Date': df_SnP.index[num:(len(inv_y_test_LO)+num)], 'inv_y_pred_test': inv_y_pred_test_LO})

In [None]:
mse(df_inv_y_test_LO['inv_y__test'], df_inv_y_pred_test_LO['inv_y_pred_test'])

In [None]:
fig_LSTM(df_index=df_SnP, df_test=df_inv_y_test_LO, df_predictions=df_inv_y_pred_test_LO)

# Performance Metrics

In [None]:
position_LO = np.where(df_inv_y_pred_test_LO['inv_y_pred_test'].shift(-1)>df_inv_y_test_LO['inv_y__test'],1,0)

In [None]:
df_Equity_Curve = df_SnP[['stock_returns', 'Close']]
df_Equity_Curve.loc[:, 'return'] = df_SnP.loc[:, 'stock_returns']

In [None]:
df_Equity_Curve = df_Equity_Curve[num:len(inv_y_test_LO)+num]

In [None]:
df_Equity_Curve['strat_return'] = df_Equity_Curve['Close'].pct_change().dropna()
df_Equity_Curve['bnh_return'] = df_Equity_Curve['Close'].pct_change().dropna()

In [None]:
df_Equity_Curve['position'] = position_LO
df_Equity_Curve = transaction_cost(df_Equity_Curve, TRANSACTION_COST) 

In [None]:
df_Equity_Curve["strategy"] = (df_Equity_Curve["strat_return"] * df_Equity_Curve['position'].shift(1))
df_Equity_Curve["strategy"] = (1+df_Equity_Curve["strategy"].fillna(0)).cumprod()

df_Equity_Curve['buy_n_hold'] = (1 + df_Equity_Curve['bnh_return'].fillna(0)).cumprod()

In [None]:
fig_strategies(df_Equity_Curve)

In [None]:
df_Equity_Curve['position'].value_counts()

# PERFORMANCE METRICS

In [None]:
wyniki(np.array(df_Equity_Curve['buy_n_hold'].values), 'Equity_Curve_BuyAndHold')

In [None]:
wyniki(np.array(df_Equity_Curve['strategy'].values), 'Equity_Curve_strategii')

In [None]:
porownanie(np.array(df_Equity_Curve['strategy'].values), np.array(df_Equity_Curve['buy_n_hold'].values))

In [None]:
SENS_SENS = 'SENS_0A'
PATH_SNES = F'S&P500/Sensitivity Analysis/{SENS_SENS}'

STOCK_NAME_FOLDER = f'./Results/{PATH_SNES}/Visualisations'

if not os.path.exists(STOCK_NAME_FOLDER):
    os.makedirs(STOCK_NAME_FOLDER)

df_Equity_Curve.to_csv(f'{STOCK_NAME_FOLDER}/df_EC_LO_MAIN.csv')

# Long Short

In [None]:
# Getting the best model for the Long-Only strategy based on the IR2 metric

# df_LS = find_best_model_LS(datasets, lookback_bars, validation_bars, testing_bars, PATH, scaler_Y, TRANSACTION_COST, NUM_TRIAL) #change bars

In [None]:
# df_LS.to_csv(f'./Best_Models/{PATH}/df_copy_LS.csv', index=False)
DF_COPY_LS = pd.read_csv(f'./Best_Models/{PATH}/df_copy_LS.csv')

In [None]:
grouped = DF_COPY_LS.groupby('id_')

# Initialize empty lists to store results
id_list = []
model_list = []

# Define a weight for the proximity of 'IR_2_train_value' to 'IR_2_validation_value'
train_validation_weight = 1  # Weight for the proximity of 'IR_2_train_value' to 'IR_2_validation_value'
loss_weight = 0.8

# Iterate through each group and find the model_num based on the custom criterion
for id_, group in grouped:
    # Calculate the custom score based on the weight
    group['custom_score'] = abs(group['IR_2_validation_value'] - group['IR_2_train_value'])
    group['custom_score'] = np.where(group['IR_2_validation_value']==0, np.nan, group['custom_score'])
    
    # Find the row with the highest custom score
    try:
        best_model_row = group.loc[group['custom_score'].idxmin()]['model_num']
    except:
        best_model_row = 'model_0'

    
    id_list.append(id_)
    model_list.append(best_model_row)

# Create a new DataFrame to store the results
result_df_LS = pd.DataFrame({'id_': id_list, 'model_num': model_list})

# Convert 'id_' column to numeric and then sort
result_df_LS['id_'] = result_df_LS['id_'].str.replace('id_', '').astype(int)
result_df_LS.sort_values(by='id_', inplace=True)

# Convert 'id_' column back to string format
result_df_LS['id_'] = 'id_' + result_df_LS['id_'].astype(str)
result_df_LS = result_df_LS.reset_index(drop=True)

print(result_df_LS)

### THE BEST SO FAR

In [None]:
DATASETS = datasets
stock_name = PATH

ranges = list(range(lookback_bars, len(DATASETS[21][0]) - testing_bars, validation_bars))
for i in range(0, len(ranges)): 
    
    if path.exists(f'./Best_Models/{stock_name}/Long-Short/model_ID_{i}.h5') == True:
        print(f'[SECTION_1 --> id_{i}] path exits') 

    else:
        best_hyperparameters_index = result_df_LS['model_num'][i][6:]
        
        src_path = f'./Hyperparameter_tunning/{stock_name}/HP_Grid_Search_{i}/model_GS_{best_hyperparameters_index}.h5'
        pre_dst_path = f'./Best_Models/{stock_name}/Long-Short'
        if not os.path.exists(pre_dst_path):
            os.makedirs(pre_dst_path)
        dst_path = pre_dst_path + f'/model_ID_{i}.h5'
        shutil.copy(src_path, dst_path)

        print(f'File Moved --> ID_{i}')
    
    print(f"id_{i}")

# PREDICT

In [None]:
TEST_Y_LS, test_model_predictions_LS = predict_LSTM(datasets, lookback_bars, validation_bars, testing_bars, f'{PATH}/Long-Short')

In [None]:
TEST_Y_LS = np.concatenate(TEST_Y_LS)
y_pred_test_LS = np.concatenate(test_model_predictions_LS)

In [None]:
inv_y_test_LS = scaler_Y.inverse_transform(TEST_Y_LS.reshape(-1,1))
inv_y_pred_test_LS = scaler_Y.inverse_transform(y_pred_test_LS)

In [None]:
inv_y_test_LS = inv_y_test_LS.flatten()
inv_y_pred_test_LS = inv_y_pred_test_LS.flatten()

In [None]:
num = lookback_bars+validation_bars+21

In [None]:
df_inv_y_test_LS = pd.DataFrame(data={'Date': df_SnP.index[num:(len(inv_y_test_LS)+num)], 'inv_y__test': inv_y_test_LS})
df_inv_y_pred_test_LS = pd.DataFrame(data={'Date': df_SnP.index[num:(len(inv_y_test_LS)+num)], 'inv_y_pred_test': inv_y_pred_test_LS})

In [None]:
mse(df_inv_y_test_LS['inv_y__test'], df_inv_y_pred_test_LS['inv_y_pred_test'])

In [None]:
fig_LSTM(df_index=df_SnP, df_test=df_inv_y_test_LS, df_predictions=df_inv_y_pred_test_LS)

# Performance Metrics

In [None]:
# position_LO = (test_model_predictions_LO>0.5).astype(int)
position_LS = np.where(df_inv_y_pred_test_LS['inv_y_pred_test'].shift(-1)>df_inv_y_test_LS['inv_y__test'],1,-1)

In [None]:
df_Equity_Curve_LS = df_SnP[['stock_returns', 'Close']]
df_Equity_Curve_LS.loc[:, 'return'] = df_SnP.loc[:, 'stock_returns']

In [None]:
df_Equity_Curve_LS = df_Equity_Curve_LS[num:len(inv_y_test_LS)+num]

In [None]:
df_Equity_Curve_LS['strat_return'] = df_Equity_Curve_LS['Close'].pct_change().dropna()
df_Equity_Curve_LS['bnh_return'] = df_Equity_Curve_LS['Close'].pct_change().dropna()

In [None]:
df_Equity_Curve_LS['position'] = position_LS
df_Equity_Curve_LS = transaction_cost(df_Equity_Curve_LS, TRANSACTION_COST) 

In [None]:
df_Equity_Curve_LS["strategy"] = (df_Equity_Curve_LS["strat_return"] * df_Equity_Curve_LS['position'].shift(1))
df_Equity_Curve_LS["strategy"] = (1+df_Equity_Curve_LS["strategy"].fillna(0)).cumprod()

df_Equity_Curve_LS['buy_n_hold'] = (1 + df_Equity_Curve_LS['stock_returns'].fillna(0)).cumprod()

In [None]:
fig_strategies(df_Equity_Curve_LS)

In [None]:
df_Equity_Curve_LS['position'].value_counts()

# PERFORMANCE METRICS

In [None]:
wyniki(np.array(df_Equity_Curve_LS['buy_n_hold'].values), 'Equity_Curve_BuyAndHold')

In [None]:
wyniki(np.array(df_Equity_Curve_LS['strategy'].values), 'Equity_Curve_strategii')

In [None]:
porownanie(np.array(df_Equity_Curve_LS['strategy'].values), np.array(df_Equity_Curve_LS['buy_n_hold'].values))

### TO CSV - SAVE

In [None]:
SENS_SENS = 'SENS_0A'
PATH_SNES = F'S&P500/Sensitivity Analysis/{SENS_SENS}'

STOCK_NAME_FOLDER = f'./Results/{PATH_SNES}/Visualisations'

if not os.path.exists(STOCK_NAME_FOLDER):
    os.makedirs(STOCK_NAME_FOLDER)
    
df_Equity_Curve_LS.to_csv(f'{STOCK_NAME_FOLDER}/df_EC_LS_MAIN.csv')