In [1]:
import ray
assert ray.__version__ >='2.2.0', "Please install ray 2.2.0 by doing 'pip install ray[rllib] ray[tune] lz4' , lz4 is for population based tuning"

#Importing the libraries
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

# matplotlib.use('Agg')
import ray.rllib.algorithms.ppo as ppo
import ray.rllib.algorithms.a2c as a2c
import ray.rllib.algorithms.a3c as a3c
import ray.rllib.algorithms.td3 as td3
import ray.rllib.algorithms.ddpg as ddpg
import ray.rllib.algorithms.appo as appo
import datetime
%matplotlib inline
from finrl import config
from finrl.meta.preprocessor.yahoodownloader import YahooDownloader
from finrl.meta.preprocessor.preprocessors import FeatureEngineer, data_split
from finrl.meta.env_stock_trading.env_stocktrading_np import StockTradingEnv as StockTradingEnv_numpy 
from finrl.meta.data_processor import DataProcessor
from finrl.plot import backtest_stats, backtest_plot, get_daily_return, get_baseline
import ray
from pprint import pprint
# from ray.rllib.algorithms.ppo import ppo
# from ray.rllib.algorithms.ddpg import ddpg
# from ray.rllib.algorithms.a2c import a2c
# from ray.rllib.algorithms.ddpg import ddpg,td3
# from ray.rllib.algorithms import ddpg
import ray.rllib.algorithms.ppo as ppo
# from ray.rllib.algorithms.sac import sac
import sys
sys.path.append("../FinRL-Library")
import os
import itertools
from ray import tune
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.optuna import OptunaSearch
from ray.tune.schedulers import ASHAScheduler
from ray.tune.registry import register_env
from ray import air
from ray.air import session
import time
import psutil
psutil_memory_in_bytes = psutil.virtual_memory().total
ray._private.utils.get_system_memory = lambda: psutil_memory_in_bytes
from typing import Dict, Optional, Any



  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# @Author: Astarag Mohapatra

import ray

assert (
    ray.__version__ > "2.0.0"
), "Please install ray 2.2.0 by doing 'pip install ray[rllib] ray[tune] lz4' , lz4 is for population based tuning"
from pprint import pprint

from ray import tune
from ray.tune.search import ConcurrencyLimiter
from ray.rllib.algorithms import Algorithm
from ray.tune import register_env

from ray.air import RunConfig, FailureConfig, ScalingConfig
from ray.tune.tune_config import TuneConfig
from ray.air.config import CheckpointConfig
from ray.tune.callback import Callback

import psutil

psutil_memory_in_bytes = psutil.virtual_memory().total
ray._private.utils.get_system_memory = lambda: psutil_memory_in_bytes
from typing import Dict, Optional, Any, List, Union


class DRLlibv2:
    """
    It instantiates RLlib model with Ray tune functionality
    Params
    -------------------------------------
    trainable:
        Any Trainable class that takes config as parameter
    train_env:
        Training environment instance
    train_env_name: str
        Name of the training environment
    params: dict
        hyperparameters dictionary
    run_name: str
        tune run name
    framework: str
        "torch" or "tf" for tensorflow
    local_dir: str
         to save the results and tensorboard plots
    num_workers: int
        number of workers
    search_alg
        search space for hyperparameters
    concurrent_trials:
         Number of concurrent hyperparameters trial to run
    num_samples: int
         Number of samples of hyperparameters config to run
    scheduler:
        Stopping suboptimal trials
    log_level: str = "WARN",
        Verbosity: "DEBUG"
    num_gpus: Union[float, int] = 0
        GPUs for trial
    num_cpus: Union[float, int] = 2
        CPUs for rollout collection
    dataframe_save: str
        Saving the tune results
    metric: str
        Metric for hyperparameter optimization in Bayesian Methods
    mode: str
        Maximize or Minimize the metric
    max_failures: int
        Number of failures to TuneError
    training_iterations: str
         Number of times session.report() is called
    checkpoint_num_to_keep: int
        Number of checkpoints to keep
    checkpoint_freq: int
        Checkpoint freq wrt training iterations
    reuse_actors:bool
        Reuse actors for tuning
    callbacks:
        callbacks integration for ray tune

    It has the following methods:
    Methods
    -------------------------------------
        train_tune_model: It takes in the params dictionary and fits in sklearn style to our trainable class
        restore_agent: It restores previously errored or stopped trials or experiments
        infer_results: It returns the results dataframe and trial informations
        get_test_agent: It returns the testing agent for inference

    Example
    ---------------------------------------
    def sample_ppo_params():
        return {
            "entropy_coeff": tune.loguniform(0.00000001, 0.1),
            "lr": tune.loguniform(5e-5, 0.001),
            "sgd_minibatch_size": tune.choice([ 32, 64, 128, 256, 512]),
            "lambda": tune.choice([0.1,0.3,0.5,0.7,0.9,1.0]),
        }
        #Tree Parzen Estimator
    optuna_search = OptunaSearch(
        metric="episode_reward_mean",
        mode="max")
    drl_agent = DRLlibv2(
        trainable="PPO",
        train_env=env(train_env_config),
        train_env_name="StockTrading_train",
        framework="torch",
        num_workers=1,
        log_level="DEBUG",
        run_name = 'test',
        local_dir = "test",
        params = sample_ppo_params(),
        num_samples = 1,
        num_gpus=1,
        training_iterations=10,
        search_alg = optuna_search,
        checkpoint_freq=5
    )
    #Tune or train the model
    res = drl_agent.train_tune_model()

    #Get the tune results
    results_df, best_result = drl_agent.infer_results()

    #Get the best testing agent
    test_agent = drl_agent.get_test_agent(test_env_instance,'StockTrading_testenv')
    """

    def __init__(
        self,
        trainable: Union[str, Any],
        params: dict,
        train_env=None,
        train_env_name: str='',
        run_name: str = "tune_run",
        framework: str = "torch",
        local_dir: str = "tune_results",
        num_workers: int = 1,
        search_alg=None,
        concurrent_trials: int = 0,
        num_samples: int = 0,
        scheduler=None,
        log_level: str = "WARN",
        num_gpus: Union[float, int] = 0,
        num_cpus: Union[float, int] = 2,
        dataframe_save: str = "tune.csv",
        metric: str = "episode_reward_mean",
        mode: Union[str, List[str]] = "max",
        max_failures: int = 0,
        training_iterations: int = 100,
        checkpoint_num_to_keep: Union[None, int] = None,
        checkpoint_freq: int = 0,
        reuse_actors: bool = False,
        callbacks:Optional[List["Callback"]]=None
    ):

        if train_env is not None:register_env(train_env_name, lambda config: train_env)

        self.params = params
        self.params["framework"] = framework
        self.params["log_level"] = log_level
        self.params["num_gpus"] = num_gpus
        self.params["num_workers"] = num_workers
        self.params["env"] = train_env_name

        self.run_name = run_name
        self.local_dir = local_dir
        self.search_alg = search_alg
        if concurrent_trials != 0:
            self.search_alg = ConcurrencyLimiter(
                self.search_alg, max_concurrent=concurrent_trials
            )
        self.scheduler = scheduler
        self.num_samples = num_samples
        self.trainable = trainable
        # self.trainable = tune.with_resources(self.trainable(),{"cpu":num_cpus,"gpu":num_gpus})
        if isinstance(self.trainable, str):
            self.trainable.upper()
        self.num_cpus = num_cpus
        self.num_gpus = num_gpus
        self.dataframe_save = dataframe_save
        self.metric = metric
        self.mode = mode
        self.max_failures = max_failures
        self.training_iterations = training_iterations
        self.checkpoint_freq = checkpoint_freq
        self.checkpoint_num_to_keep = checkpoint_num_to_keep
        self.reuse_actors = reuse_actors
        self.callbacks = callbacks

    def train_tune_model(self):
        """
        Tuning and training the model
        Returns the results object
        """
        ray.init(
            num_cpus=self.num_cpus, num_gpus=self.num_gpus, ignore_reinit_error=True
        )

        tuner = tune.Tuner(
            self.trainable,
            param_space=self.params,
            tune_config=TuneConfig(
                search_alg=self.search_alg,
                num_samples=self.num_samples,
                metric=self.metric,
                mode=self.mode,
                reuse_actors=self.reuse_actors,
            ),
            run_config=RunConfig(
                name=self.run_name,
                local_dir=self.local_dir,
                callbacks=self.callbacks,
                failure_config=FailureConfig(
                    max_failures=self.max_failures, fail_fast=False
                ),
                stop={"training_iteration": self.training_iterations},
                checkpoint_config=CheckpointConfig(
                    num_to_keep=self.checkpoint_num_to_keep,
                    checkpoint_score_attribute=self.metric,
                    checkpoint_score_order=self.mode,
                    checkpoint_frequency=self.checkpoint_freq,
                    checkpoint_at_end=True,
                ),
                verbose=3,
            ),
        )

        self.results = tuner.fit()
        # if self.search_alg is not None: self.search_alg.save_to_dir(self.local_dir)
        # ray.shutdown()
        return self.results

    def infer_results(self, to_dataframe: str = None, mode: str = "a"):
        """
        Get tune results in a dataframe and best results object
        """
        results_df = self.results.get_dataframe()

        if to_dataframe is None:
            to_dataframe = self.dataframe_save

        results_df.to_csv(to_dataframe, mode=mode)

        best_result = self.results.get_best_result()
        # best_result = self.results.get_best_result()
        # best_metric = best_result.metrics
        # best_checkpoint = best_result.checkpoint
        # best_trial_dir = best_result.log_dir
        # results_df = self.results.get_dataframe()

        return results_df, best_result

    def restore_agent(
        self,
        checkpoint_path: str = "",
        restore_search: bool = False,
        resume_unfinished: bool = True,
        resume_errored: bool = False,
        restart_errored: bool = False,
    ):
        """
        Restore errored or stopped trials
        """
        # if restore_search:
        # self.search_alg = self.search_alg.restore_from_dir(self.local_dir)
        if checkpoint_path == "":
            checkpoint_path = self.results.get_best_result().checkpoint._local_path

        restored_agent = tune.Tuner.restore(
            checkpoint_path,
            restart_errored=restart_errored,
            resume_unfinished=resume_unfinished,
            resume_errored=resume_errored,
        )
        print(restored_agent)
        self.results = restored_agent.fit()

        # self.search_alg.save_to_dir(self.local_dir)
        return self.results

    def get_test_agent(self, test_env, test_env_name: str, checkpoint=None):
        """
        Get test agent
        """
        if test_env is not None:register_env(test_env_name, lambda config: test_env)

        if checkpoint is None:
            checkpoint = self.results.get_best_result().checkpoint

        testing_agent = Algorithm.from_checkpoint(checkpoint)
        # testing_agent.config['env'] = test_env_name

        return testing_agent


## BUILDING THE ENVIRONMENT

* Let's build an environment to test our algorithms

In [2]:
def get_train_env(start_date, end_date, ticker_list, data_source, time_interval, 
          technical_indicator_list, env, model_name, if_vix = True,
          **kwargs):
    
    #fetch data
    DP = DataProcessor(data_source, **kwargs)
    data = DP.download_data(ticker_list, start_date, end_date, time_interval)
    data = DP.clean_data(data)
    data = DP.add_technical_indicator(data, technical_indicator_list)
    if if_vix:
        data = DP.add_vix(data)
    price_array, tech_array, turbulence_array = DP.df_to_array(data, if_vix)
    train_env_config = {'price_array':price_array,
              'tech_array':tech_array,
              'turbulence_array':turbulence_array,
              'if_train':True}
    
    return train_env_config

def calculate_sharpe(episode_reward:list):
  perf_data = pd.DataFrame(data=episode_reward,columns=['reward'])
  perf_data['daily_return'] = perf_data['reward'].pct_change(1)
  if perf_data['daily_return'].std() !=0:
    sharpe = (252**0.5)*perf_data['daily_return'].mean()/ \
          perf_data['daily_return'].std()
    return sharpe
  else:
    return 0

def get_test_config(start_date, end_date, ticker_list, data_source, time_interval, 
         technical_indicator_list, env, model_name, if_vix = True,
         **kwargs):
  
  DP = DataProcessor(data_source, **kwargs)
  data = DP.download_data(ticker_list, start_date, end_date, time_interval)
  data = DP.clean_data(data)
  data = DP.add_technical_indicator(data, technical_indicator_list)
  
  if if_vix:
      data = DP.add_vix(data)
  
  price_array, tech_array, turbulence_array = DP.df_to_array(data, if_vix)
  test_env_config = {'price_array':price_array,
            'tech_array':tech_array,
            'turbulence_array':turbulence_array,'if_train':False}
  return test_env_config

In [3]:
TRAIN_START_DATE = '2014-01-01'
TRAIN_END_DATE = '2019-07-30'

VAL_START_DATE = '2019-08-01'
VAL_END_DATE = '2021-07-30'

TEST_START_DATE = '2021-08-01'
TEST_END_DATE = '2023-02-01'

In [4]:
from finrl.config_tickers import DOW_30_TICKER
technical_indicator_list =config.INDICATORS

model_name = 'PPO'
env = StockTradingEnv_numpy
ticker_list = DOW_30_TICKER
data_source = 'yahoofinance'
time_interval = '1D'

In [11]:
import warnings 
warnings.filterwarnings("ignore", category=FutureWarning)

train_env_config = get_train_env(TRAIN_START_DATE, VAL_END_DATE, 
                     ticker_list, data_source, time_interval, 
                        technical_indicator_list, env, model_name)

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%********

In [12]:
# from ray.tune.registry import register_env
from ray.tune import register_env
env_name = 'StockTrading_train_env'

def reg_env(config):
    return env(config)
register_env(env_name, lambda : env(train_env_config))

train_env_instance = env(train_env_config)

In [13]:
%load_ext autoreload
%autoreload 2

from drllibv2 import DRLlibv2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## GETTING HYPERPARAMETERS

The hyperparameters configuration can be generated from the Config file of all the algorithms.

In [25]:
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.algorithms.a2c import A2CConfig
from ray.rllib.algorithms.td3 import TD3Config 
from ray.rllib.algorithms.dqn import DQNConfig 

PPOConfig().to_dict()

{'extra_python_environs_for_driver': {},
 'extra_python_environs_for_worker': {},
 'num_gpus': 0,
 'num_cpus_per_worker': 1,
 'num_gpus_per_worker': 0,
 '_fake_gpus': False,
 'custom_resources_per_worker': {},
 'placement_strategy': 'PACK',
 'eager_tracing': False,
 'eager_max_retraces': 20,
 'tf_session_args': {'intra_op_parallelism_threads': 2,
  'inter_op_parallelism_threads': 2,
  'gpu_options': {'allow_growth': True},
  'log_device_placement': False,
  'device_count': {'CPU': 1},
  'allow_soft_placement': True},
 'local_tf_session_args': {'intra_op_parallelism_threads': 8,
  'inter_op_parallelism_threads': 8},
 'env': None,
 'env_config': {},
 'observation_space': None,
 'action_space': None,
 'env_task_fn': None,
 'render_env': False,
 'clip_rewards': None,
 'normalize_actions': True,
 'clip_actions': False,
 'disable_env_checking': False,
 'num_envs_per_worker': 1,
 'sample_collector': ray.rllib.evaluation.collectors.simple_list_collector.SimpleListCollector,
 'sample_async': Fa

## ONLY TRAINING WITHOUT TUNING

* To just train your model, pick hard-coded hyperparameters

### ALSO, INCREASE THE NUMBER OF TRAINING ITERATIONS AND NUM SAMPLES = 1, AS YOU HAVE ONE SET OF HYPERPARAMETERS

In [8]:
def sample_ppo_params():
    return {
      "entropy_coeff": 0.0000001,
      "lr": 5e-5,
      "sgd_minibatch_size": 64,
      "lambda":0.9,"framework":'torch'}

drl_agent = DRLlibv2(
    trainable=model_name,
    train_env=env(train_env_config),
    train_env_name="StockTrading_train",
    framework="torch",
    num_workers=1,
    log_level="WARN",
    run_name = 'FINRL_TEST',
    local_dir = "FINRL_TEST",
    params = sample_ppo_params(),
    num_samples = 1,
    num_gpus=1,
    training_iterations=5,
    checkpoint_freq=5
)

## TUNING WITH MLP

* In the model key, you have fcnet, which signifies MLP model.

### ALSO, INCREASE THE NUMBER OF TRAINING ITERATIONS AND NUM SAMPLES

* Training iterations will increase the number of training loops, and num samples is the number of hyperparameter configuration to try
* Search algorithm picks the hyperparameter configuration based on Bayesian methods or Random methods. You can find more info [here](https://docs.ray.io/en/latest/tune/api/suggestion.html#tune-search-alg)

* Scheduler stops inferior trials. You can find more info [here](https://docs.ray.io/en/latest/tune/api/schedulers.html)

In [None]:

def sample_ppo_params():
  return {
      "entropy_coeff": tune.loguniform(0.00000001, 0.1),
      "lr": tune.loguniform(5e-5, 0.001),
      "sgd_minibatch_size": tune.choice([ 32, 64, 128, 256, 512]),
      "lambda": tune.choice([0.1,0.3,0.5,0.7,0.9,1.0]),
      "framework":'torch',
      'model':{
        'fcnet_hiddens': [256, 256]
      }
  }

search_alg = OptunaSearch(
        metric="episode_reward_mean",
    mode="max")
scheduler_ = ASHAScheduler(
        metric="episode_reward_mean",
        mode="max",
        max_t=5,
        grace_period=1,
        reduction_factor=2,
    )

drl_agent = DRLlibv2(
    trainable=model_name,
    train_env=env(train_env_config),
    train_env_name="StockTrading_train",
    framework="torch",
    num_workers=1,
    log_level="WARN",
    run_name = 'FINRL_TEST',
    local_dir = "FINRL_TEST",
    params = sample_ppo_params(),
    num_samples = 1,
    num_gpus=1,
    training_iterations=5,
    checkpoint_freq=5,
    scheduler=scheduler_,
    search_alg=search_alg
)

## TUNING WITH LSTM

* To tune with LSTM, just pass `use_lstm` = True, and tune the lstm_cell_size

In [None]:
def sample_ppo_params():
  return {
      "entropy_coeff": tune.loguniform(0.00000001, 0.1),
      "lr": tune.loguniform(5e-5, 0.001),
      "sgd_minibatch_size": tune.choice([ 32, 64, 128, 256, 512]),
      "lambda": tune.choice([0.1,0.3,0.5,0.7,0.9,1.0]),
      "framework":'torch',
      'model':{
        'use_lstm':True,
        'lstm_cell_size':tune.choice([128,256,512])
      },
  }

search_alg = OptunaSearch(
        metric="episode_reward_mean",
    mode="max")
scheduler_ = ASHAScheduler(
        metric="episode_reward_mean",
        mode="max",
        max_t=5,
        grace_period=1,
        reduction_factor=2,
    )

drl_agent = DRLlibv2(
    trainable=model_name,
    train_env=env(train_env_config),
    train_env_name="StockTrading_train",
    framework="torch",
    num_workers=1,
    log_level="WARN",
    run_name = 'FINRL_TEST',
    local_dir = "FINRL_TEST",
    params = sample_ppo_params(),
    num_samples = 1,
    num_gpus=1,
    training_iterations=5,
    checkpoint_freq=5,
    scheduler=scheduler_,
    search_alg=search_alg
)

## TUNING WITH TRANSFORMER ARCHITECTURE (GTrXL)

* To tune with transformers, just pass `use_attention=True`

In [None]:
def sample_ppo_params():
  return {
      "entropy_coeff": tune.loguniform(0.00000001, 0.1),
      "lr": tune.loguniform(5e-5, 0.001),
      "sgd_minibatch_size": tune.choice([ 32, 64, 128, 256, 512]),
      "lambda": tune.choice([0.1,0.3,0.5,0.7,0.9,1.0]),
      'model':{
        'use_attention': True,
        'attention_num_transformer_units': 1,
        'attention_dim': 64,
        'attention_num_heads': 1,
        'attention_head_dim': 32,
        'attention_memory_inference': 50,
        'attention_memory_training': 50,
        'attention_position_wise_mlp_dim': 32,
        'attention_init_gru_gate_bias': 2.0,
        'attention_use_n_prev_actions': 0,
        'attention_use_n_prev_rewards': 0,
      }
  }

search_alg = OptunaSearch(
        metric="episode_reward_mean",
    mode="max")
scheduler_ = ASHAScheduler(
        metric="episode_reward_mean",
        mode="max",
        max_t=5,
        grace_period=1,
        reduction_factor=2,
    )

drl_agent = DRLlibv2(
    trainable=model_name,
    train_env=env(train_env_config),
    train_env_name="StockTrading_train",
    framework="torch",
    num_workers=1,
    log_level="WARN",
    run_name = 'FINRL_TEST',
    local_dir = "FINRL_TEST",
    params = sample_ppo_params(),
    num_samples = 1,
    num_gpus=1,
    training_iterations=5,
    checkpoint_freq=5,
    scheduler=scheduler_,
    search_alg=search_alg
)

## TRAIN AND TUNE THE MODEL

In [10]:
res = drl_agent.train_tune_model()

0,1
Current time:,2023-02-25 21:18:08
Running for:,00:02:03.10
Memory:,4.7/7.4 GiB

Trial name,status,loc,iter,total time (s),ts,reward,episode_reward_max,episode_reward_min,episode_len_mean
PPO_StockTrading_train_85c44_00000,TERMINATED,172.21.182.6:826,5,112.754,20000,977.338,1144.1,783.01,1907


2023-02-25 21:18:08,638	INFO tune.py:762 -- Total run time: 123.82 seconds (123.09 seconds for the tuning loop).


In [None]:
results_df, best_result = drl_agent.infer_results()

## GENERATE TEST ENVIRONMENT

In [14]:
test_env_config = get_test_config(TEST_START_DATE, TEST_END_DATE, ticker_list, data_source, time_interval, 
                        technical_indicator_list, env, model_name)

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%********

In [15]:
test_env_instance = env(test_env_config)

In [12]:

test_agent = drl_agent.get_test_agent(test_env_instance,'StockTrading_testenv')

2023-02-25 21:18:30,757	INFO algorithm.py:501 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.


## TESTING FOR MLP BASED AGENT

In [22]:
obs = test_env_instance.reset()

episode_returns = list()  # the cumulative_return / initial_account
episode_total_assets = list()
episode_total_assets.append(test_env_instance.initial_total_asset)
done = False
while not done:
    action  = test_agent.compute_single_action(observation=obs)
    obs, reward, done, _ = test_env_instance.step(action)
    # print(action)
    total_asset = (
        test_env_instance.amount
        + (test_env_instance.price_ary[test_env_instance.day] * test_env_instance.stocks).sum()
    )
    episode_total_assets.append(total_asset)
    episode_return = total_asset / test_env_instance.initial_total_asset
    episode_returns.append(episode_return)

## TESTING FOR LSTM BASED AGENT

* THE ONLY THING THAT YOU HAVE TO TAKE CARE IS THE STATE SPACE SIZE

```python
    init_state = state = [
    np.zeros([lstm_cell_size], np.float32) for _ in range(2)]
```

The testing agent expect the shape of the state as the above

In [None]:
obs = test_env_instance.reset()
lstm_cell_size = res.get_best_result().config['model']['lstm_cell_size']
init_state = state = [
    np.zeros([lstm_cell_size], np.float32) for _ in range(2)]
episode_returns = list()  # the cumulative_return / initial_account
episode_total_assets = list()
episode_total_assets.append(test_env_instance.initial_total_asset)
done = False
while not done:
    action, state, _  = test_agent.compute_single_action(observation=obs,state=state)
    obs, reward, done, _ = test_env_instance.step(action)
    # print(action)
    total_asset = (
        test_env_instance.amount
        + (test_env_instance.price_ary[test_env_instance.day] * test_env_instance.stocks).sum()
    )
    episode_total_assets.append(total_asset)
    episode_return = total_asset / test_env_instance.initial_total_asset
    episode_returns.append(episode_return)

## TESTING FOR TRANSFORMER BASED AGENTS

* HERE ALSO, YOU HAVE THE STATE SPACE FOR THE TRANSFORMER AS DIFFERENT

```python
    init_state = state = [
     np.zeros([100, 64], np.float32) for _ in range(num_transformers) ]
```

Also, the state output is designed as the following

```python
    state = [
        np.concatenate([state[i], [state_out[i]]], axis=0)[1:]
        for i in range(num_transformers)
    ]
```

In [None]:
obs = test_env_instance.reset()
num_transformers = res.get_best_result().config["model"]["attention_num_transformer_units"]

init_state = state = [
     np.zeros([100, 64], np.float32) for _ in range(num_transformers) ]
episode_returns = list()  # the cumulative_return / initial_account
episode_total_assets = list()
episode_total_assets.append(test_env_instance.initial_total_asset)
done = False
while not done:
    action, state_out, _  = test_agent.compute_single_action(observation=obs,state=state)
    obs, reward, done, _ = test_env_instance.step(action)
    # print(action)
    total_asset = (
        test_env_instance.amount
        + (test_env_instance.price_ary[test_env_instance.day] * test_env_instance.stocks).sum()
    )
    episode_total_assets.append(total_asset)
    episode_return = total_asset / test_env_instance.initial_total_asset
    episode_returns.append(episode_return)
    state = [
        np.concatenate([state[i], [state_out[i]]], axis=0)[1:]
        for i in range(num_transformers)
    ]

## USING WEIGHTS AND BIASES FOR MLOPs ##TO-DO

In [8]:
import wandb

wandb.login()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mathe_kunal[0m ([33mrf_project[0m). Use [1m`wandb login --relogin`[0m to force relogin


True

In [20]:
from ray.air.integrations.wandb import WandbLoggerCallback

wandb_callback = WandbLoggerCallback(project="Ray Tune Trial Run",log_config=True,save_checkpoints=True)

def sample_ppo_params():
  return {
      "entropy_coeff": tune.loguniform(0.00000001, 0.1),
      "lr": tune.loguniform(5e-5, 0.001),
      "sgd_minibatch_size": tune.choice([ 32, 64, 128, 256, 512]),
      "lambda": tune.choice([0.1,0.3,0.5,0.7,0.9,1.0]),
      'model':{
        'use_attention': True,
        'attention_num_transformer_units': 1,
        'attention_dim': 64,
        'attention_num_heads': 1,
        'attention_head_dim': 32,
        'attention_memory_inference': 50,
        'attention_memory_training': 50,
        'attention_position_wise_mlp_dim': 32,
        'attention_init_gru_gate_bias': 2.0,
        'attention_use_n_prev_actions': 0,
        'attention_use_n_prev_rewards': 0,
      }
  }

search_alg = OptunaSearch(
        metric="episode_reward_mean",
    mode="max")
scheduler_ = ASHAScheduler(
        metric="episode_reward_mean",
        mode="max",
        max_t=5,
        grace_period=1,
        reduction_factor=2,
    )

test_env_instance = env(test_env_config)

drl_agent = DRLlibv2(
    trainable=model_name,
    train_env=env(train_env_config),
    train_env_name="StockTrading_train",
    framework="torch",
    num_workers=1,
    log_level="DEBUG",
    run_name = 'FINRL_TEST',
    local_dir = "FINRL_TEST",
    params = sample_ppo_params(),
    num_samples = 1,
    num_gpus=1,
    training_iterations=5,
    checkpoint_freq=5,
    scheduler=scheduler_,
    search_alg=search_alg,
    callbacks=[wandb_callback]
)

In [2]:
import wandb
run = wandb.init()
artifact = run.use_artifact('rf_project/Ray Tune Trial Run/checkpoint_PPO_StockTrading_train_env_3e5ba38a:v0', type='model')
artifact_dir = artifact.download()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mathe_kunal[0m ([33mrf_project[0m). Use [1m`wandb login --relogin`[0m to force relogin


[34m[1mwandb[0m:   6 of 6 files downloaded.  


In [24]:
# from ray.rllib.algorithms import Algorithm

# chkpt = "/home/athekunal/Ray for FinRL/artifacts/checkpoint_PPO_StockTrading_train_env_3e5ba38a:v0/"

# # testing_agent = drl_agent.get_test_agent(test_env=test_env_instance,test_env_name="StockTest",checkpoint=chkpt)

# register_env("StockTest", lambda: test_env_instance)
# test_agent = Algorithm.from_checkpoint(checkpoint=chkpt)