In [None]:
import torch
import numpy as np
import pandas as pd
import gymnasium as gym
from gymnasium.envs.registration import register
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import BaseCallback
import matplotlib.pyplot as plt
import gym_env_sharpe as env_sharpe


df_result = pd.read_csv("whole_data.csv")
open_price = pd.read_csv("open_price.csv")
close_price = pd.read_csv("close_price.csv")

In [None]:
data = []
for col in range(df_result.shape[1]):
    data.append(np.nan_to_num(df_result.iloc[:, col].to_numpy(dtype=np.float32)))

test_data = data[int(0.8*len(data)):]
validation_data = data[int(0.6*len(data)):int(0.8*len(data))]
training_data = data[:int(0.6*len(data))]

In [None]:
def grid_search(train_data, val_data, n_steps, batch_size, clip_range, vf_coef):
    vec_env = make_vec_env(lambda : gym.make('PO-v0', data=train_data, n_steps=n_steps), n_envs=10)
    params = {'n_steps':n_steps, 'batch_size':batch_size, 'clip_range':clip_range, 'vf_coef':vf_coef}

    model = PPO("MlpPolicy", vec_env, verbose=1, learning_rate=1, **params,
                device="cuda" if torch.cuda.is_available() else "cpu")
    
    model.learn(total_timesteps=10_000, callback=env_sharpe.CustomCallback())
    val_env = gym.make('PO-v0', data=val_data, n_steps=n_steps)
    eval_result, val_budgets = evaluate_model(model, val_env, val_data)

    # print("For parameters : n_steps={0}, batch_size={1}, clip_range={2}, vf_coef={3}\n Result :{4}"
    #       .format(n_steps, batch_size, clip_range, vf_coef, eval_result))
    # plotting_result(val_budgets, **params)

    return val_budgets, eval_result, params

# calculating an yield for a model : Only Using the last day's budget and the first day's budget
def evaluate_model(model, env, data):
    obs, _ = env.reset()
    val_budgets = []
    
    for _ in range(len(data)):
        action, _ = model.predict(obs)
        obs, _, done, _, info = env.step(action)
        val_budgets.append(info['current_budget'])
        if done:
            obs, _ = env.reset()

    yield_rate = (val_budgets[-1]-val_budgets[0]) / val_budgets[0]

    return yield_rate, val_budgets

def plotting_result(budgets, **params):
    plt.figure(figsize=(6,4))
    timestamps = [i for i in range(len(budgets))]
    plt.plot(timestamps, budgets)
    plt.xlabel("timestamps")
    plt.ylabel("budgets")
    plt.title("{}, {}, {}, {}".format(params['n_steps'], params['batch_size'], params['clip_range'], params['vf_coef']))
    plt.show()


In [None]:
# Grids for the selected parameters
clip_ranges = [0.15, 0.2, 0.25, 0.3] # Higher value, more radical policy change possible (default=0.2)
n_steps = [16, 32, 48, 64, ]
batch_sizes = [32, 64, 128] # to make n_steps * n_env % batch_size == 0 (default=64)
vf_coefs = [0.5, 0.55, 0.6, 0.65, 0.7] # Higher vf_coef higher vf_accuray but slower policy optimization (default=0.5)

best_result = [-float('inf') for _ in range(6)]
best_params = [None for _ in range(6)]
best_budgets = [None for _ in range(6)]

for clip_range in clip_ranges:
    for n_step in n_steps:
        for batch_size in batch_sizes:
            for vf_coef in vf_coefs:
                val_budgets, eval_result, params = grid_search(train_data=training_data, val_data=validation_data, n_steps=n_step, batch_size=batch_size,
                                                    clip_range=clip_range, vf_coef=vf_coef)
                
                for i, val in enumerate(best_result):
                    if eval_result > val:
                        best_result.insert(i,eval_result)
                        best_result.pop()
                        best_params.insert(i,params)
                        best_params.pop()
                        best_budgets.insert(i, val_budgets)
                        best_budgets.pop()
                        break

for i in range(6):
    print(f"Rank {i} best_parameters : {best_params[i]} and corresponding best_result : {best_result[i]}")

In [None]:
# Plotting the best results for validation data
param_lst = best_params
fig, axs = plt.subplots(2,3, figsize=(6,4))
for i in range(6):
    param = param_lst[i]
    eval_result = best_result[i]
    result_budget = best_budgets[i]
    timesteps = [i for i in range(len(result_budget))]
    if i<=2:
        axs[0,i].plot(timesteps, result_budget)
        axs[0,i].set_title("{}, {}, {}, {} : {:.2f}".format(param['n_steps'], param['batch_size'], param['clip_range'], param['vf_coef'], eval_result), fontsize=8)
    else:
        axs[1,i-3].plot(timesteps, result_budget)
        axs[1,i-3].set_title("{}, {}, {}, {} : {:.2f}".format(param['n_steps'], param['batch_size'], param['clip_range'], param['vf_coef'], eval_result), fontsize=8)

plt.subplots_adjust(top=0.8, hspace=0.8, wspace=0.4)
fig.suptitle("Validation Performance for top 6 parameters", fontsize=12)
plt.show()

In [None]:
# Incorporate train, val data and check the performance using the test data

def test_perform(train_data, val_data, n_steps, batch_size, clip_range, vf_coef):
    vec_env = make_vec_env(lambda : gym.make('PO-v0', data=train_data, n_steps=n_steps), n_envs=10)
    params = {'n_steps':n_steps, 'batch_size':batch_size, 'clip_range':clip_range, 'vf_coef':vf_coef}

    model = PPO("MlpPolicy", vec_env, verbose=1, learning_rate=1, **params,
                device="cuda" if torch.cuda.is_available() else "cpu")
    
    model.learn(total_timesteps=10_000, callback=env_sharpe.CustomCallback())
    val_env = gym.make('PO-v0', data=val_data, n_steps=n_steps)
    eval_result, val_budgets = evaluate_model(model, val_env, val_data)

    return eval_result, val_budgets

In [None]:
data = []
for col in range(df_result.shape[1]):
    data.append(np.nan_to_num(df_result.iloc[:, col].to_numpy(dtype=np.float32)))

test_data = data[int(0.8*len(data)):]
training_data = data[:int(0.8*len(data))]

In [None]:
param_lst = best_params

fig, axs = plt.subplots(2,3, figsize=(6,4))

for i in range(6):
    param = param_lst[i]
    eval_result, result_budget = test_perform(training_data, validation_data, **param)
    timesteps = [i for i in range(len(result_budget))]
    if i<=2:
        axs[0,i].plot(timesteps, result_budget)
        axs[0,i].set_title("{}, {}, {}, {} : {:.2f}".format(param['n_steps'], param['batch_size'], param['clip_range'], param['vf_coef'], eval_result), fontsize=8)
    else:
        axs[1,i-3].plot(timesteps, result_budget)
        axs[1,i-3].set_title("{}, {}, {}, {} : {:.2f}".format(param['n_steps'], param['batch_size'], param['clip_range'], param['vf_coef'], eval_result), fontsize=8)

plt.subplots_adjust(top=0.8, hspace=0.8, wspace=0.4)
fig.suptitle("Test Performance for top 6 parameters", fontsize=12)
plt.show()