In [None]:
import ray
from ray.rllib.algorithms.dqn import DQNConfig
from ray.tune.registry import register_env
from ray.rllib.env.env_context import EnvContext
import pandas as pd
import numpy as np
from gym_anytrading.envs import StocksEnv
from sklearn.preprocessing import MinMaxScaler
import os

# Initialize Ray
ray.init(ignore_reinit_error=True)

## 1. Data import and data scaling
df_train = pd.read_csv('data/df_train.csv', index_col=0)
df_train.index = pd.to_datetime(df_train.index)

df_test = pd.read_csv('data/df_test.csv', index_col=0)
df_test.index = pd.to_datetime(df_test.index)

# Removing duplicates in index (if any)
df_train = df_train[~df_train.index.duplicated(keep='first')]
df_test = df_test[~df_test.index.duplicated(keep='first')]

# Scaling the data
scaler = MinMaxScaler(feature_range=(0, 1))
df_train = pd.DataFrame(scaler.fit_transform(df_train), columns=df_train.columns, index=df_train.index)
df_test = pd.DataFrame(scaler.transform(df_test), columns=df_test.columns, index=df_test.index)

# Features to use
features = ['rsi', 'ema', 'roc']

In [None]:
## 2. Create the custom environment
def add_signals(env):
    start = env.frame_bound[0] - env.window_size
    end = env.frame_bound[1]
    prices = env.df.loc[:, 'Price'].to_numpy()[start:end]
    signal_features = env.df.loc[:, features].to_numpy()[start:end]
    return prices, signal_features

class MyCustomEnv(StocksEnv):
    _process_data = add_signals

def my_env_creator(env_config: EnvContext):
    return MyCustomEnv(df=df_train, window_size=12, frame_bound=(12, 440473))

register_env("custom_stock_env", my_env_creator)

In [None]:
## 3. Build and Train with RLlib
config = (
    DQNConfig()
    .environment("custom_stock_env", env_config={})
    .framework("torch")  # You can change this to "tf" if using TensorFlow
    .env_runners(num_env_runners=63)  # Set to use all CPU cores
    .rollouts(num_envs_per_worker=4)  # Specify the number of envs each worker handles
    .training(lr=1e-3, gamma=0.99, train_batch_size=8000)  # Larger batch size for faster training
    .resources(num_gpus=1)
# Use full GPU and allocate CPUs per worker

In [None]:
relative_path = 'results'

from pathlib import Path
absolute_path = Path(relative_path).resolve()

model_name = 'model_1'

In [None]:
# Configure Ray Tune for better logging and monitoring
tune_analysis = ray.tune.run(
    "DQN",  # The name of the algorithm
    config=config.to_dict(),  # Convert config to dict
    storage_path=absolute_path, 
    name=model_name, # Custom log directory for results
    stop={"timesteps_total": 450000},  # Stop after 50k timesteps
    checkpoint_freq=1,  # Save a checkpoint after every iteration
    checkpoint_at_end=True  # Save the final model
)

# You can now launch TensorBoard and check the logs
# tensorboard --logdir ./ray_results"""