Generating synthetic alphas : we can process any input data with open prices and generate a random variable that has desired correlation with forward 5 time steps

In [None]:
from generate_alpha.generate_alphav3 import process_data
import pandas as pd

df = pd.read_parquet('data/IBM-5min-2020-2023.parquet', engine='pyarrow')
# Ensure the index is in datetime format
df.index = pd.to_datetime(df.index)
    # Filter out rows based on time
df = df.between_time('09:30:00', '16:00:00')   
    # Add the ret and fwd_ret5 columns
# Calculate ret and fwd_ret5 day by day
df['ret'] = df.groupby(df.index.date)['Open'].pct_change()
df['fwd_ret5'] = df.groupby(df.index.date)['Open'].pct_change(periods=-5)
df_new = df.drop(columns=['High', 'Low', 'Close', 'Volume'])
df_final = process_data(df_new, name_to_save='data/IBM_SCC.csv', desired_correlation=0.1, save_to_csv=True)

If you would like to split your data into train, validate and test datasets. Code below will do that and save csv files for all three

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load your data
#df = pd.read_csv('data/df_final.csv')

# Convert 'Unnamed: 0' to datetime
df['Unnamed: 0'] = pd.to_datetime(df['Unnamed: 0'])

# Filter out rows with the year 2020
df = df[df['Unnamed: 0'].dt.year != 2020]
# Split the data into an 80-20 ratio
train_data, x_data = train_test_split(df, train_size=0.7, shuffle=False)
validate_data, test_data = train_test_split(x_data, train_size=0.5, shuffle=False)

# Save the train and test data to separate CSV files
train_data.to_csv('data/IBMtrain_data.csv', index=False)
test_data.to_csv('data/IBMtest_data.csv', index=False)
validate_data.to_csv('data/IBMvalidation_data.csv', index=False)

# Reinforcement Learning Model Training

The training process for our RL model is structured into the following key steps:

## 1. **Ray Instance Initialization**
Initialize the Ray framework to manage distributed training.

## 2. **Environment Registration**
Register our custom trading environment to make it accessible for Ray's RLlib.

## 3. **Data Path Setup**
Determine the paths for training and validation datasets.

## 4. **Training with PPO and Ray Tune (PBT)**
Leverage Proximal Policy Optimization (PPO) for training, and use Population Based Training (PBT) via Ray Tune for hyperparameter optimization.


In [None]:
import ray

ray.shutdown()
ray.init()

In [None]:
from gymnasium.wrappers import EnvCompatibility
from ray.tune.registry import register_env
from simpletrading_env import TradingEnv

def create_compatible_trading_env(env_config):
    env = TradingEnv(env_config)
    return EnvCompatibility(env)

register_env("wrapped_trading_env", create_compatible_trading_env)

In [None]:
import os

# Get the current working directory
cwd = os.getcwd()

# Construct the absolute paths of the data files
train_data_path = os.path.join(cwd, 'data/IBMtrain_data.csv')
validation_data_path = os.path.join(cwd, 'data/IBMvalidation_data.csv')

In [None]:
#import ray
from ray.tune.schedulers import PopulationBasedTraining
from ray import tune
from ray.rllib.algorithms.ppo import PPO
from custom_stopper import EarlyStoppingStopper
import numpy as np

# Define a PBT scheduler
pbt = PopulationBasedTraining(
    time_attr="time_total_s",
    metric="episode_reward_mean",
    mode="max",
    perturbation_interval=50.0, # This defines how frequently (in seconds) to perturb hyperparameters.
    hyperparam_mutations={
        "lr": tune.loguniform(1e-4, 1e-3),
        "gamma": tune.uniform(0.95, 0.99),
        "clip_param": tune.uniform(0.1, 0.3),
        "kl_coeff" : tune.uniform(0.2, 1.5)
    }
)

# Define a configuration
config = {
    "env": "wrapped_trading_env",
    "env_config": {
        "data_filepath": train_data_path,
        "window_size": 5,
        "least_episode_size": 75
    },
    "evaluation_interval": 100,
    "evaluation_duration": 2,
    "evaluation_parallel_to_training": True,
    "evaluation_config": {
        "env": "wrapped_trading_env",
        "env_config": {
        "data_filepath": validation_data_path,  # replace with your validation data
        "window_size": 5,
        "least_episode_size": 75
        },
        "explore": False,
    },
    "batch_mode": "truncate_episodes",
    "rollout_fragment_length": 'auto',
    "evaluation_num_workers": 1,
    "always_attach_evaluation_results": True,
    "num_workers": 3,
    "num_cpus_per_worker": 1,
    "framework" : 'torch',
    "num_gpus": 0,
    "shuffle_sequences": False,
    "vf_loss_coeff": 0, 
    "lr": 0.0003,
    "gamma": 0.99,
    "clip_param": 0.2,
    "kl_coeff": 0.5,
    "num_sgd_iter" : tune.randint(25, 35),
    "sgd_minibatch_size" : tune.sample_from(lambda _: np.random.randint(50, 150)),
    "train_batch_size": 2250,
    "model":{
        #"fcnet_hiddens": [64, 32],
        "fcnet_hiddens": [128, 64],
        "fcnet_activation": "relu",
        "use_lstm": False,    
    },
    "log_level": "WARNING"
}
stopper = EarlyStoppingStopper(patience=100, eval_patience=100, min_iterations=500)

# Run the Tune experiment
analysis = tune.run(
    PPO,
    #resources_per_trial={"cpu": 1, "gpu": 0},
    name="PPO_PBT_Trading",
    #reuse_actors=True,
    scheduler=pbt,
    #stop={
    #    "episode_reward_mean": 0.15,
    #},
    stop = stopper,
    config=config,
    checkpoint_at_end=True,
    checkpoint_freq=10,
    num_samples=2,
    verbose=1
    #local_dir='/home/himanshu/ray_results/'
)


We can quick analyse the generated training results below. For indepth analysis I would recommend trying tensorboard

In [None]:
# Get a dataframe for all the trials
df = analysis.results_df

# Get the trial with the highest mean reward
best_trial = analysis.get_best_trial("episode_reward_mean", "max", "last")
print("Best trial config: {}".format(best_trial.config))
print("Best trial final reward:", best_trial.last_result["episode_reward_mean"])
print(df['evaluation/episode_reward_mean'])
best_trial_evaluation_results = best_trial.last_result["evaluation"]
print(best_trial_evaluation_results["episode_reward_mean"])
best_checkpoint = analysis.get_best_checkpoint(best_trial, metric="episode_reward_mean", mode="max")
print(best_checkpoint) # u will get the checkpoint_path_to_save from best_checkpoint
# can make new cells for any further anaylsis 

If the model is upto mark or you would like to use the model for further testing or as a pre-trained model to further improve. DO save the model into a json

In [None]:
import json
# Extract the checkpoint path from the best_checkpoint object
#for example Checkpoint(local_path=/Users/himanshuagrawal/ray_results/PPO_PBT_Trading/PPO_wrapped_trading_env_76be2_00000_0_num_sgd_iter=43_2023-09-19_19-27-25/checkpoint_003580)
checkpoint_path_to_save = '/Users/himanshuagrawal/ray_results/PPO_PBT_Trading/PPO_wrapped_trading_env_c95c4_00001_1_num_sgd_iter=30_2023-10-06_12-07-14/checkpoint_000290'

# Save the checkpoint path and best trial config
with open('model_info1.json', 'w') as f:
    json.dump({
        'checkpoint_path': checkpoint_path_to_save,
        'best_trial_config': best_trial.config
    }, f)

Here comes 2nd part where we can test any model json's on any data defined , further can make cells to analyse the results and also save the results in a json 

In [None]:
import ray

ray.shutdown()
ray.init()

In [None]:
from test import test_trained_model

rewards, results = test_trained_model(model_info_path='model_info1.json', data_filepath='data/IBMtest_data.csv', num_episodes=50)
# Analyze the results or plot graphs as needed

In [None]:
import json
import numpy as np

def default_serialize(o):
    if isinstance(o, np.integer):
        return int(o)
    elif isinstance(o, np.floating):
        return float(o)
    elif isinstance(o, np.ndarray):
        return o.tolist()
    else:
        raise TypeError(f"Object of type '{type(o).__name__}' is not JSON serializable")

with open('resultsIBM.json', 'w') as f:
    json.dump(results, f, default=default_serialize)