<a href="https://colab.research.google.com/github/brady-at-claradata/ElegantRL/blob/master/rayppo_gpu.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install ray[all] torch tensortrade stockstats yfinance pandas 


Collecting ray[all]
  Downloading ray-1.5.0-cp37-cp37m-manylinux2014_x86_64.whl (51.5 MB)
[K     |████████████████████████████████| 51.5 MB 39 kB/s 
Collecting tensortrade
  Downloading tensortrade-1.0.3.tar.gz (32.6 MB)
[K     |████████████████████████████████| 32.6 MB 23 kB/s 
[?25hCollecting stockstats
  Downloading stockstats-0.3.2-py2.py3-none-any.whl (13 kB)
Collecting yfinance
  Downloading yfinance-0.1.63.tar.gz (26 kB)
Collecting pyyaml>=5.1.2
  Downloading PyYAML-5.4.1-cp37-cp37m-manylinux1_x86_64.whl (636 kB)
[K     |████████████████████████████████| 636 kB 37.5 MB/s 
[?25hCollecting stochastic>=0.6.0
  Downloading stochastic-0.6.0-py3-none-any.whl (49 kB)
[K     |████████████████████████████████| 49 kB 6.1 MB/s 
Collecting ipython>=7.12.0
  Downloading ipython-7.25.0-py3-none-any.whl (786 kB)
[K     |████████████████████████████████| 786 kB 34.9 MB/s 
Collecting plotly>=4.5.0
  Downloading plotly-5.1.0-py2.py3-none-any.whl (20.6 MB)
[K     |█████████████████████████

In [1]:
import ray
import numpy as np

from ray import tune
from ray.tune.registry import register_env

import tensortrade.env.default as default

from tensortrade.feed.core import DataFeed, Stream
from tensortrade.oms.instruments import Instrument, USD
from tensortrade.oms.exchanges import Exchange
from tensortrade.oms.services.execution.simulated import execute_order
from tensortrade.oms.wallets import Wallet, Portfolio

import yfinance as yf
import pandas as pd 
from datetime import datetime, timedelta
from stockstats import StockDataFrame as Sdf  # for Sdf.retype

class FeatureEngineer:
    """Provides methods for preprocessing the stock price data
    from finrl.preprocessing.preprocessors import FeatureEngineer

    Attributes
    ----------
        use_technical_indicator : boolean
            we technical indicator or not
        tech_indicator_list : list
            a list of technical indicator names (modified from config.py)
        use_turbulence : boolean
            use turbulence index or not
        user_defined_feature:boolean
            user user defined features or not

    Methods
    -------
    preprocess_data()
        main method to do the feature engineering

    """

    def __init__(
            self,
            use_technical_indicator=True,
            tech_indicator_list=None,  # config.TECHNICAL_INDICATORS_LIST,
            use_turbulence=False,
            user_defined_feature=False,
    ):
        self.use_technical_indicator = use_technical_indicator
        self.tech_indicator_list = tech_indicator_list
        self.use_turbulence = use_turbulence
        self.user_defined_feature = user_defined_feature

    def preprocess_data(self, df):
        """main method to do the feature engineering
        @:param config: source dataframe
        @:return: a DataMatrices object
        """

        if self.use_technical_indicator:
            # add technical indicators using stockstats
            df = self.add_technical_indicator(df)
            print("Successfully added technical indicators")

        # add turbulence index for multiple stock
        if self.use_turbulence:
            df = self.add_turbulence(df)
            print("Successfully added turbulence index")

        # add user defined feature
        if self.user_defined_feature:
            df = self.add_user_defined_feature(df)
            print("Successfully added user defined features")

        # fill the missing values at the beginning and the end
        df = df.fillna(method="bfill").fillna(method="ffill")
        return df

    def add_technical_indicator(self, data):
        """
        calculate technical indicators
        use stockstats package to add technical inidactors
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        from stockstats import StockDataFrame as Sdf  # for Sdf.retype

        df = data.copy()
        df = df.sort_values(by=['tic', 'date'])
        stock = Sdf.retype(df.copy())
        unique_ticker = stock.tic.unique()

        for indicator in self.tech_indicator_list:
            indicator_df = pd.DataFrame()
            for i in range(len(unique_ticker)):
                try:
                    temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
                    temp_indicator = pd.DataFrame(temp_indicator)
                    temp_indicator['tic'] = unique_ticker[i]
                    temp_indicator['date'] = df[df.tic == unique_ticker[i]]['date'].to_list()
                    indicator_df = indicator_df.append(
                        temp_indicator, ignore_index=True
                    )
                except Exception as e:
                    print(e)
            df = df.merge(indicator_df[['tic', 'date', indicator]], on=['tic', 'date'], how='left')
        df = df.sort_values(by=['date', 'tic'])
        return df

    def add_turbulence(self, data):
        """
        add turbulence index from a precalcualted dataframe
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        df = data.copy()
        turbulence_index = self.calculate_turbulence(df)
        df = df.merge(turbulence_index, on="date")
        df = df.sort_values(["date", "tic"]).reset_index(drop=True)
        return df

    @staticmethod
    def add_user_defined_feature(data):
        """
         add user defined features
        :param data: (df) pandas dataframe
        :return: (df) pandas dataframe
        """
        df = data.copy()
        df["daily_return"] = df.close.pct_change(1)
        # df['return_lag_1']=df.close.pct_change(2)
        # df['return_lag_2']=df.close.pct_change(3)
        # df['return_lag_3']=df.close.pct_change(4)
        # df['return_lag_4']=df.close.pct_change(5)
        return df

    @staticmethod
    def calculate_turbulence(data):
        """calculate turbulence index based on dow 30"""
        # can add other market assets
        df = data.copy()
        df_price_pivot = df.pivot(index="date", columns="tic", values="close")
        # use returns to calculate turbulence
        df_price_pivot = df_price_pivot.pct_change()

        unique_date = df.date.unique()
        # start after a year
        start = 252
        turbulence_index = [0] * start
        # turbulence_index = [0]
        count = 0
        for i in range(start, len(unique_date)):
            current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
            # use one year rolling window to calcualte covariance
            hist_price = df_price_pivot[
                (df_price_pivot.index < unique_date[i])
                & (df_price_pivot.index >= unique_date[i - 252])
                ]
            # Drop tickers which has number missing values more than the "oldest" ticker
            filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1)

            cov_temp = filtered_hist_price.cov()
            current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0)
            temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
                current_temp.values.T
            )
            if temp > 0:
                count += 1
                if count > 2:
                    turbulence_temp = temp[0][0]
                else:
                    # avoid large outlier because of the calculation just begins
                    turbulence_temp = 0
            else:
                turbulence_temp = 0
            turbulence_index.append(turbulence_temp)

        turbulence_index = pd.DataFrame(
            {"date": df_price_pivot.index, "turbulence": turbulence_index}
        )
        return turbulence_index


In [2]:
tickers = [
'AAPL', 'ADBE', 'ADI', 'ADP', 'ADSK', 'ALGN',  'AMAT', 'AMD', 'AMGN',
'AMZN', 'ASML', 'ATVI', 'BIIB', 'BKNG', 'BMRN', 'CDNS', 'CERN', 'CHKP', 'CMCSA',
'COST', 'CSCO', 'CSX', 'CTAS', 'CTSH', 'CTXS', 'DLTR', 'EA', 'EBAY', 'FAST',
'FISV', 'GILD', 'HAS', 'HSIC', 'IDXX', 'ILMN', 'INCY', 'INTC', 'INTU', 'ISRG',
'JBHT', 'KLAC', 'LRCX', 'MAR', 'MCHP', 'MDLZ', 'MNST', 'MSFT', 'MU', 'MXIM',
'NLOK', 'NTAP', 'NTES', 'NVDA', 'ORLY', 'PAYX', 'PCAR', 'PEP', 'QCOM', 'REGN',
'ROST', 'SBUX', 'SIRI', 'SNPS', 'SWKS', 'TTWO', 'TXN', 'VRSN', 'VRTX', 'WBA',
'WDC', 'WLTW', 'XEL', 'XLNX']  # finrl.config.NAS_74_TICKER

tech_indicator_list = [
'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'cci_30', 'dx_30',
'close_30_sma', 'close_60_sma']  # finrl.config.TECHNICAL_INDICATORS_LIST

# _ = remove_old_data()

start_date = '2008-01-01'
start_eval_date = (datetime.now() - timedelta(days=365*1)).strftime("%F")
end_eval_date = datetime.now().strftime('%F')

def get_data(start_date, end_date, tickers, tech_indicator_list):

    data_df = pd.DataFrame()
    for tic in tickers:
        temp_df = yf.download(tic, start=start_date, end=end_eval_date)
        temp_df["tic"] = tic
        data_df = data_df.append(temp_df)
    # reset the index, we want to use numbers as index instead of dates
    data_df = data_df.reset_index()
    # convert the column names to standardized names
    data_df.columns = [
        "date",
        "open",
        "high",
        "low",
        "close",
        "adjcp",
        "volume",
        "tic",
    ]
    # use adjusted close price instead of close price
    data_df["close"] = data_df["adjcp"]
    # drop the adjusted close price column
    data_df = data_df.drop("adjcp", 1)
    # create day of the week column (monday = 0)
    data_df["day"] = data_df["date"].dt.dayofweek
    # convert date to standard string format, easy to filter
    data_df["date"] = data_df.date.apply(lambda x: x.strftime("%Y-%m-%d"))
    # drop missing data
    data_df = data_df.dropna()
    data_df = data_df.reset_index(drop=True)
    print("Shape of DataFrame: ", data_df.shape)
    # print("Display DataFrame: ", data_df.head())

    data_df = data_df.sort_values(by=['date', 'tic']).reset_index(drop=True)



    fe = FeatureEngineer(use_turbulence=True,
                            user_defined_feature=False,
                            use_technical_indicator=True,
                            tech_indicator_list=tech_indicator_list, )

    processed_df = fe.preprocess_data(data_df)
    return processed_df


In [6]:
def create_env(config): 


  df = get_data(start_date, end_eval_date, tickers, tech_indicator_list)

  USD.precision = 6
  features = []
  for key, group in df.groupby('tic'):
      for column in group.columns:
          if column in {'date','tic'} : 
              continue
          s = Stream.source(group[column].values.tolist(), dtype=float).rename(key+'_'+column)
          features.append(s)

  feed = DataFeed(features)
  feed.compile()

  prices = []
  for key, group in df.groupby('tic'):
      s = Stream.source(group['close'].values.tolist(), dtype=float).rename('USD/' + key)
      prices.append(s)

  exchange = Exchange("alpaca", service=execute_order)(*prices)

  stonks = [Instrument(x, 6, x) for x in tickers]
  cash = 2000.
  starting_cash = Wallet(exchange, cash * USD )
  #initialize no owned stocks
  wallets = [starting_cash] + [Wallet(exchange, 0*x) for x in stonks]

  portfolio = Portfolio(
      base_instrument=USD,
      wallets=wallets
  )
  env = default.create(
    feed=feed,
    portfolio=portfolio,
    action_scheme="managed-risk",
    reward_scheme="risk-adjusted",
    window_size=25,
    max_allowed_loss=0.6
  )
  return env   

register_env("TradingEnv", create_env)


analysis = tune.run(
    "PPO",
    stop={
      "episode_reward_mean": 500
    },
    reuse_actors=True,
    config={
        "env": "TradingEnv",
        "env_config": {
            "window_size": 25
        },
        "log_level": "DEBUG",
        "framework": "torch",
        "ignore_worker_failures": True,
        "num_workers": 1,
        "num_gpus": 1,
        "clip_rewards": True,
        "lr": 8e-6,
        "lr_schedule": [
            [0, 1e-1],
            [int(1e2), 1e-2],
            [int(1e3), 1e-3],
            [int(1e4), 1e-4],
            [int(1e5), 1e-5],
            [int(1e6), 1e-6],
            [int(1e7), 1e-7]
        ],
        "gamma": 0,
        "observation_filter": "MeanStdFilter",
        "lambda": 0.72,
        "vf_loss_coeff": 0.5,
        "entropy_coeff": 0.01
    },
    checkpoint_at_end=True
)


Trial name,status,loc
PPO_TradingEnv_f49fd_00000,PENDING,


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%********

[2m[36m(pid=3257)[0m 2021-07-28 11:25:34,460	DEBUG rollout_worker.py:1303 -- Creating policy for default_policy
[2m[36m(pid=3257)[0m 2021-07-28 11:25:34,461	DEBUG catalog.py:708 -- Created preprocessor <ray.rllib.models.preprocessors.NoPreprocessor object at 0x7f40952c9f90>: Box(-inf, inf, (25, 1095), float32) -> (25, 1095)
[2m[36m(pid=3257)[0m 2021-07-28 11:25:34,819	INFO torch_policy.py:139 -- TorchPolicy (worker=1) running on CPU.
[2m[36m(pid=3257)[0m 2021-07-28 11:25:35,165	DEBUG rollout_worker.py:572 -- Creating policy evaluation worker 1 on CPU (please ignore any CUDA init errors)
[2m[36m(pid=3257)[0m 2021-07-28 11:25:35,166	DEBUG rollout_worker.py:713 -- Created rollout worker with env <ray.rllib.env.base_env._VectorEnvToBaseEnv object at 0x7f40a5602250> (<TradingEnv instance>), policies {'default_policy': <ray.rllib.policy.policy_template.PPOTorchPolicy object at 0x7f40a55fe950>}
[2m[36m(pid=3258)[0m 2021-07-28 11:25:35,176	DEBUG rollout_worker.py:1303 -- Crea

Result for PPO_TradingEnv_f49fd_00000:
  agent_timesteps_total: 4000
  custom_metrics: {}
  date: 2021-07-28_11-27-45
  done: true
  episode_len_mean: 3415.0
  episode_media: {}
  episode_reward_max: 2901253642.015273
  episode_reward_mean: 2901253642.015273
  episode_reward_min: 2901253642.015273
  episodes_this_iter: 1
  episodes_total: 1
  experiment_id: 6f9531363cee41fd82fca399955dcab6
  hostname: 87106c5b19d3
  info:
    learner:
      default_policy:
        custom_metrics: {}
        learner_stats:
          allreduce_latency: 0.0
          cur_kl_coeff: 0.20000000000000004
          cur_lr: 0.10000000000000002
          entropy: 9.479744375745456
          entropy_coeff: 0.01
          kl: 0.0036417181546897306
          policy_loss: -0.13502408539643512
          total_loss: 0.21838097074069082
          vf_explained_var: 0.10256961733102798
          vf_loss: 0.8949483097220461
        model: {}
    num_agent_steps_sampled: 4000
    num_steps_sampled: 4000
    num_steps_train

[2m[36m(pid=3258)[0m 2021-07-28 11:27:45,712	DEBUG trainer.py:661 -- synchronized filters: {'default_policy': MeanStdFilter((25, 1095), True, True, None, (n=4002, mean_mean=984446.9169048974, mean_std=774259.1582398375), (n=0, mean_mean=0.0, mean_std=0.0))}


Trial name,status,loc,iter,total time (s),ts,reward,episode_reward_max,episode_reward_min,episode_len_mean
PPO_TradingEnv_f49fd_00000,RUNNING,172.28.0.2:3258,1,126.854,4000,2901250000.0,2901250000.0,2901250000.0,3415


Trial name,status,loc,iter,total time (s),ts,reward,episode_reward_max,episode_reward_min,episode_len_mean
PPO_TradingEnv_f49fd_00000,TERMINATED,,1,126.854,4000,2901250000.0,2901250000.0,2901250000.0,3415


2021-07-28 11:27:46,563	INFO tune.py:550 -- Total run time: 376.75 seconds (376.56 seconds for the tuning loop).


In [7]:
analysis


<ray.tune.analysis.experiment_analysis.ExperimentAnalysis at 0x7fb0c4240ed0>

In [8]:
analysis.best_result


ValueError: ignored

In [9]:

import ray.rllib.agents.ppo as ppo

analysis.default_mode = 'restore'
checkpoints = analysis.get_trial_checkpoints_paths(
    trial=analysis.get_best_trial("episode_reward_mean"),
    metric="episode_reward_mean"
)
checkpoint_path = checkpoints[0][0]

#checkpoint_path = '/root/ray_results/PPO/PPO_TradingEnv_eba7e_00000_0_2021-07-28_01-29-50/checkpoint_000014/checkpoint-14'

# Restore agent
agent = ppo.PPOTrainer(
    env="TradingEnv",
    config={
        "env_config": {
            "window_size": 25
        },
        "framework": "torch",
        "log_level": "DEBUG",
        "ignore_worker_failures": True,
        "num_workers": 4,
        "num_gpus": 0,
        "clip_rewards": True,
        "lr": 8e-6,
        "lr_schedule": [
            [0, 1e-1],
            [int(1e2), 1e-2],
            [int(1e3), 1e-3],
            [int(1e4), 1e-4],
            [int(1e5), 1e-5],
            [int(1e6), 1e-6],
            [int(1e7), 1e-7]
        ],
        "gamma": 0,
        "observation_filter": "MeanStdFilter",
        "lambda": 0.72,
        "vf_loss_coeff": 0.5,
        "entropy_coeff": 0.01
    }
)
agent.restore(checkpoint_path)



[2m[36m(pid=3866)[0m [*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[********

[2m[36m(pid=3866)[0m 2021-07-28 11:35:04,730	DEBUG rollout_worker.py:1303 -- Creating policy for default_policy
[2m[36m(pid=3866)[0m 2021-07-28 11:35:04,731	DEBUG catalog.py:708 -- Created preprocessor <ray.rllib.models.preprocessors.NoPreprocessor object at 0x7f5d7c3df510>: Box(-inf, inf, (25, 1095), float32) -> (25, 1095)
[2m[36m(pid=3867)[0m 2021-07-28 11:35:05,124	DEBUG rollout_worker.py:1303 -- Creating policy for default_policy
[2m[36m(pid=3867)[0m 2021-07-28 11:35:05,126	DEBUG catalog.py:708 -- Created preprocessor <ray.rllib.models.preprocessors.NoPreprocessor object at 0x7f4430b475d0>: Box(-inf, inf, (25, 1095), float32) -> (25, 1095)
[2m[36m(pid=3866)[0m 2021-07-28 11:35:05,289	INFO torch_policy.py:139 -- TorchPolicy (worker=1) running on CPU.
[2m[36m(pid=3867)[0m 2021-07-28 11:35:05,698	INFO torch_policy.py:139 -- TorchPolicy (worker=2) running on CPU.
[2m[36m(pid=3866)[0m 2021-07-28 11:35:05,851	DEBUG rollout_worker.py:572 -- Creating policy evaluation 

In [28]:
def create_eval_env(config): 


  df = get_data('2020-01-01', '2020-07-27', tickers, tech_indicator_list)

  USD.precision = 6
  features = []
  for key, group in df.groupby('tic'):
      for column in group.columns:
          if column in {'date','tic'} : 
              continue
          s = Stream.source(group[column].values.tolist(), dtype=float).rename(key+'_'+column)
          features.append(s)

  feed = DataFeed(features)
  feed.compile()

  prices = []
  for key, group in df.groupby('tic'):
      s = Stream.source(group['close'].values.tolist(), dtype=float).rename('USD/' + key)
      prices.append(s)

  exchange = Exchange("alpaca", service=execute_order)(*prices)

  stonks = [Instrument(x, 6, x) for x in tickers]
  cash = 2000.
  starting_cash = Wallet(exchange, cash * USD )
  #initialize no owned stocks
  wallets = [starting_cash] + [Wallet(exchange, 0*x) for x in stonks]

  portfolio = Portfolio(
      base_instrument=USD,
      wallets=wallets
  )
  env = default.create(
    feed=feed,
    portfolio=portfolio,
    action_scheme="managed-risk",
    reward_scheme="risk-adjusted",
    renderer='matplot',
    window_size=25,
    max_allowed_loss=0.6
  )
  return env 

# Instantiate the environment
env = create_env({
    "window_size": 25
})

# Run until episode ends
episode_reward = 0
done = False
obs = env.reset()

while not done:
    action = agent.compute_action(obs)
    obs, reward, done, info = env.step(action)
    episode_reward += reward

env.render()

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%********



In [30]:
env.action_scheme.portfolio.net_worth, env.action_scheme.portfolio.initial_net_worth , env.action_scheme.portfolio.net_worth / env.action_scheme.portfolio.initial_net_worth 


(45791.50228293309, 2000.0, 22.895751141466548)

In [25]:
from google.colab import drive
drive.mount('/content/drive')



In [31]:
checkpoint_path = checkpoints[0][0]


In [32]:
checkpoint_path

'/root/ray_results/PPO/PPO_TradingEnv_f49fd_00000_0_2021-07-28_11-21-29/checkpoint_000001/checkpoint-1'

In [None]:
agent.s