In [6]:
#!rm -rf /root/ray_results/ /root/ray_results/PPO

In [1]:
!pip install /tensortrade -U

Processing /tensortrade
  Preparing metadata (setup.py) ... [?25ldone


Building wheels for collected packages: tensortrade
  Building wheel for tensortrade (setup.py) ... [?25ldone
[?25h  Created wheel for tensortrade: filename=tensortrade-1.0.4.dev1-py3-none-any.whl size=136444 sha256=62c277a18cdb6484337d47b597627e61e392b31bad96299ed07c13aa95e96a94
  Stored in directory: /tmp/pip-ephem-wheel-cache-6e5mlgm9/wheels/5c/80/0b/07e46799e19c54c3244190ea08f82534ab3fbdbd4fad3de846
Successfully built tensortrade
Installing collected packages: tensortrade
  Attempting uninstall: tensortrade
    Found existing installation: tensortrade 1.0.4.dev1
    Uninstalling tensortrade-1.0.4.dev1:
      Successfully uninstalled tensortrade-1.0.4.dev1
Successfully installed tensortrade-1.0.4.dev1
[0m

In [2]:
from tensortrade.data.cdd import CryptoDataDownload

import pandas as pd
import numpy as np

def prepare_data(df):
    df['volume'] = np.int64(df['volume'])
    df['date'] = pd.to_datetime(df['date'])
    df.sort_values(by='date', ascending=True, inplace=True)
    df.reset_index(drop=True, inplace=True)
    df['date'] = df['date'].dt.strftime('%Y-%m-%d %I:%M %p')
    return df

def fetch_data():
    cdd = CryptoDataDownload()
    bitfinex_data = cdd.fetch("Bitfinex", "USD", "BTC", "1h")
    bitfinex_data = bitfinex_data[['date', 'open', 'high', 'low', 'close', 'volume']]
    bitfinex_data = prepare_data(bitfinex_data)
    return bitfinex_data

def load_csv(filename):
    df = pd.read_csv('data/' + filename, skiprows=1)
    df.drop(columns=['symbol', 'volume_btc'], inplace=True)

    # Fix timestamp form "2019-10-17 09-AM" to "2019-10-17 09-00-00 AM"
    df['date'] = df['date'].str[:14] + '00-00 ' + df['date'].str[-2:]

    return prepare_data(df)

2023-02-27 06:23:26.262801: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-27 06:23:28.261624: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-27 06:23:28.261668: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-27 06:23:31.551582: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-

In [3]:
import ta

def rsi(price: 'pd.Series[pd.Float64Dtype]', period: float) -> 'pd.Series[pd.Float64Dtype]':
    r = price.diff()
    upside = np.minimum(r, 0).abs()
    downside = np.maximum(r, 0).abs()
    rs = upside.ewm(alpha=1 / period).mean() / downside.ewm(alpha=1 / period).mean()
    return 100*(1 - (1 + rs) ** -1)

def macd(price: 'pd.Series[pd.Float64Dtype]', fast: float, slow: float, signal: float) -> 'pd.Series[pd.Float64Dtype]':
    fm = price.ewm(span=fast, adjust=False).mean()
    sm = price.ewm(span=slow, adjust=False).mean()
    md = fm - sm
    signal = md - md.ewm(span=signal, adjust=False).mean()
    return signal

def generate_features(data):
    # Naming convention across most technical indicator libraries
    data = data.rename(columns={'date': 'Date', 
                                'open': 'Open', 
                                'high': 'High', 
                                'low': 'Low', 
                                'close': 'Close', 
                                'volume': 'Volume'})
    data = data.set_index('Date')

    # Custom indicators
    features = pd.DataFrame.from_dict({
        'dfast': data['Close'].rolling(window=10).std().abs(),
        'dmedium': data['Close'].rolling(window=50).std().abs(),
        'dslow': data['Close'].rolling(window=100).std().abs(),
        'fast': data['Close'].rolling(window=10).mean(),
        'medium': data['Close'].rolling(window=50).mean(),
        'slow': data['Close'].rolling(window=100).mean(),
        'ema_fast': ta.trend.ema_indicator(data['Close'], window=5, fillna=True),
        'ema_medium': ta.trend.ema_indicator(data['Close'], window=10, fillna=True),
        'ema_slow': ta.trend.ema_indicator(data['Close'], window=64, fillna=True),
        'lr': np.log(data['Close']).diff().fillna(0),
        'rsi_5': rsi(data['Close'], period=5),
        'rsi_10': rsi(data['Close'], period=10),
        'rsi_100': rsi(data['Close'], period=100),
        'rsi_7': rsi(data['Close'], period=7),
        'rsi_14': rsi(data['Close'], period=14),
        'rsi_28': rsi(data['Close'], period=28),
        'macd_normal': macd(data['Close'], fast=12, slow=26, signal=9),
        'macd_short': macd(data['Close'], fast=10, slow=50, signal=5),
        'macd_long': macd(data['Close'], fast=200, slow=100, signal=50),
    })

    # Generate all default indicators from ta library
    ta.add_all_ta_features(data, 
                           'Open', 
                           'High', 
                           'Low', 
                           'Close', 
                           'Volume', 
                           fillna=True)

    # Concatenate both manually and automatically generated features
    data = pd.concat([data, features], axis='columns').fillna(method='pad')

    # Remove potential column duplicates
    data = data.loc[:,~data.columns.duplicated()]

    # Revert naming convention
    data = data.rename(columns={'Date': 'date', 
                                'Open': 'open', 
                                'High': 'high', 
                                'Low': 'low', 
                                'Close': 'close', 
                                'Volume': 'volume'})

    # A lot of indicators generate NaNs at the beginning of DataFrames, so remove them
    data = data.iloc[200:]
    data = data.reset_index(drop=True)

    return data

In [4]:
from sklearn.model_selection import train_test_split

def split_data(data):
    X = data.copy()
    y = X['close'].pct_change()

    X_train_test, X_valid, y_train_test, y_valid = \
        train_test_split(data, data['close'].pct_change(), train_size=0.67, test_size=0.33, shuffle=False)

    X_train, X_test, y_train, y_test = \
        train_test_split(X_train_test, y_train_test, train_size=0.50, test_size=0.50, shuffle=False)

    return X_train, X_test, X_valid, y_train, y_test, y_valid

In [5]:
data = fetch_data()
data

Unnamed: 0,date,open,high,low,close,volume
0,2018-05-15 06:00 AM,8723.8,8793.0,8714.9,8739.0,8988053
1,2018-05-15 07:00 AM,8739.0,8754.8,8719.3,8743.0,2288904
2,2018-05-15 08:00 AM,8743.0,8743.1,8653.2,8723.7,8891773
3,2018-05-15 09:00 AM,8723.7,8737.8,8701.2,8708.1,2054868
4,2018-05-15 10:00 AM,8708.1,8855.7,8695.8,8784.4,17309722
...,...,...,...,...,...,...
41933,2023-02-25 08:00 PM,22992.0,23002.0,22810.0,22935.0,5504830
41934,2023-02-25 09:00 PM,22938.0,22975.0,22872.0,22955.0,328115
41935,2023-02-25 10:00 PM,22955.0,23187.0,22926.0,23141.0,1568743
41936,2023-02-25 11:00 PM,23137.0,23200.0,23123.0,23178.0,728891


In [6]:
data

Unnamed: 0,date,open,high,low,close,volume
0,2018-05-15 06:00 AM,8723.8,8793.0,8714.9,8739.0,8988053
1,2018-05-15 07:00 AM,8739.0,8754.8,8719.3,8743.0,2288904
2,2018-05-15 08:00 AM,8743.0,8743.1,8653.2,8723.7,8891773
3,2018-05-15 09:00 AM,8723.7,8737.8,8701.2,8708.1,2054868
4,2018-05-15 10:00 AM,8708.1,8855.7,8695.8,8784.4,17309722
...,...,...,...,...,...,...
41933,2023-02-25 08:00 PM,22992.0,23002.0,22810.0,22935.0,5504830
41934,2023-02-25 09:00 PM,22938.0,22975.0,22872.0,22955.0,328115
41935,2023-02-25 10:00 PM,22955.0,23187.0,22926.0,23141.0,1568743
41936,2023-02-25 11:00 PM,23137.0,23200.0,23123.0,23178.0,728891


In [7]:
dataset = generate_features(data)
dataset

  dip[idx] = 100 * (self._dip[idx] / value)
  din[idx] = 100 * (self._din[idx] / value)


Unnamed: 0,open,high,low,close,volume,volume_adi,volume_obv,volume_cmf,volume_fi,volume_em,...,lr,rsi_5,rsi_10,rsi_100,rsi_7,rsi_14,rsi_28,macd_normal,macd_short,macd_long
0,7897.3,7898.8,7849.8,7877.4,9341499,-1.219515e+08,-153103304,-0.175983,-1.548039e+08,-1.586737e+04,...,-0.002523,54.694594,62.862413,58.177953,58.867381,65.249906,64.898857,11.190548,10.871904,31.873058
1,7877.4,7889.7,7661.0,7700.0,23679375,-1.375548e+08,-176782679,-0.228723,-7.327921e+08,-9.556783e+04,...,-0.022778,83.970505,78.146122,60.860099,80.595405,76.335665,71.316639,1.333779,-5.426751,34.355233
2,7700.0,7700.1,7548.1,7605.4,42144843,-1.479246e+08,-218927522,-0.216859,-1.197665e+09,-5.454997e+04,...,-0.012362,88.796304,82.430355,62.167022,85.395126,79.999761,73.950511,-10.060459,-21.497215,37.504922
3,7605.4,7623.6,7441.8,7511.1,38711817,-1.571235e+08,-257639339,-0.221424,-1.548073e+09,-4.292364e+04,...,-0.012477,91.852619,85.564709,63.397644,88.657814,82.850341,76.208878,-21.778972,-36.146245,41.269618
4,7511.1,7551.6,7403.0,7489.1,23046091,-1.534634e+08,-280685430,-0.149460,-1.399351e+09,-3.572163e+04,...,-0.002933,92.453006,86.202747,63.676078,89.307939,83.443225,76.697643,-28.422775,-41.976877,44.917996
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
41733,22992.0,23002.0,22810.0,22935.0,5504830,8.977649e+09,-6584666836,-0.008243,-5.595294e+07,-2.319418e+05,...,-0.002221,76.926612,69.413166,55.640594,72.068686,68.114890,64.772798,19.185150,13.302583,159.437835
41734,22938.0,22975.0,22872.0,22955.0,328115,8.977850e+09,-6584338721,-0.007194,-4.702219e+07,5.493501e+05,...,0.000872,66.279162,65.609444,55.493530,65.567070,65.815576,63.951022,18.141415,12.997201,158.667008
41735,22955.0,23187.0,22926.0,23141.0,1568743,8.978866e+09,-6582769978,0.061520,1.379006e+06,2.212791e+06,...,0.008070,25.403834,41.889543,54.149051,33.134369,49.186453,56.979084,29.587493,31.213146,155.979034
41736,23137.0,23200.0,23123.0,23178.0,728891,8.979178e+09,-6582041087,0.088085,5.034714e+06,1.109219e+06,...,0.001598,22.026133,38.789902,53.886722,29.722317,46.660841,55.725811,38.685654,43.510681,152.916297


In [18]:
X_train, X_test, X_valid, y_train, y_test, y_valid = \
    split_data(data)

import os
cwd = os.getcwd()
train_csv = os.path.join(cwd, 'train.csv')
test_csv = os.path.join(cwd, 'test.csv')
valid_csv = os.path.join(cwd, 'valid.csv')
X_train.to_csv(train_csv, index=False)
X_test.to_csv(test_csv, index=False)
X_valid.to_csv(valid_csv, index=False)

In [19]:
# Things to understand here:
# Writing a Renderer

import matplotlib.pyplot as plt

from tensortrade.env.generic import Renderer


class PositionChangeChart(Renderer):
    def __init__(self, color: str = "orange"):
        self.color = "orange"

    def render(self, env, **kwargs):
        history = pd.DataFrame(env.observer.renderer_history)

        actions = list(history.action)
        price = list(history.close)

        buy = {}
        sell = {}

        for i in range(len(actions) - 1):
            a1 = actions[i]
            a2 = actions[i + 1]

            if a1 != a2:
                if a1 == 0 and a2 == 1:
                    buy[i] = price[i]
                else:
                    sell[i] = price[i]

        buy = pd.Series(buy)
        sell = pd.Series(sell)

        fig, axs = plt.subplots(1, 2, figsize=(15, 5))

        fig.suptitle("Performance")

        axs[0].plot(np.arange(len(price)), price, label="price", color=self.color)
        axs[0].scatter(buy.index, buy.values, marker="^", color="green")
        axs[0].scatter(sell.index, sell.values, marker="^", color="red")
        axs[0].set_title("Trading Chart")

        performance_df = pd.DataFrame().from_dict(env.action_scheme.portfolio.performance, orient='index')
        performance_df.plot(ax=axs[1])
        axs[1].set_title("Net Worth")

        plt.show()

In [20]:
# Things to understand here:
# execution_order
# Types of execution logic
# Exchange
# DataFeed
# renderer_feed
# default (env)

import ray
import numpy as np
import pandas as pd

from ray import tune
from ray.tune.registry import register_env

import tensortrade.env.default as default

from tensortrade.env.default.rewards import PBR, RiskAdjustedReturns
from tensortrade.env.default.rewards import SimpleProfit
from tensortrade.env.default.actions import BSH, ManagedRiskOrders
from tensortrade.feed.core import DataFeed, Stream
from tensortrade.feed.core.base import NameSpace
from tensortrade.oms.exchanges import Exchange, ExchangeOptions
from tensortrade.oms.instruments import USD, BTC
from tensortrade.oms.services.execution.simulated import execute_order
from tensortrade.oms.wallets import Wallet, Portfolio

def create_env(config):
    data = pd.read_csv(filepath_or_buffer=config["csv_filename"], 
                       parse_dates=['date']).fillna(
                           method='backfill').fillna(method='ffill')

    # TODO: adjust according to your commission percentage, if present
    commission = 0.01
    price = Stream.source(list(data["close"]), 
                          dtype="float").rename("USD-BTC")
    bitstamp_options = ExchangeOptions(commission=commission)
    bitstamp = Exchange("bitstamp", 
                        service=execute_order, 
                        options=bitstamp_options)(price)

    cash = Wallet(bitstamp, 10000 * USD)
    asset = Wallet(bitstamp, 0 * BTC)

    portfolio = Portfolio(USD, [cash, asset])

    '''
    # Custom indicators
    features = pd.DataFrame.from_dict({
        'dfast': data['close'].rolling(window=10).std().abs(),
        'dmedium': data['close'].rolling(window=50).std().abs(),
        'dslow': data['close'].rolling(window=100).std().abs(),
        'fast': data['close'].rolling(window=10).mean(),
        'medium': data['close'].rolling(window=50).mean(),
        'slow': data['close'].rolling(window=100).mean(),
        'ema_fast': ta.trend.ema_indicator(data['close'], window=5, fillna=True),
        'ema_medium': ta.trend.ema_indicator(data['close'], window=10, fillna=True),
        'ema_slow': ta.trend.ema_indicator(data['close'], window=64, fillna=True),
        'lr': np.log(data['close']).diff().fillna(0),
        'rsi_5': rsi(data['close'], period=5),
        'rsi_10': rsi(data['close'], period=10),
        'rsi_100': rsi(data['close'], period=100),
        'rsi_7': rsi(data['close'], period=7),
        'rsi_14': rsi(data['close'], period=14),
        'rsi_28': rsi(data['close'], period=28),
        'macd_normal': macd(data['close'], fast=12, slow=26, signal=9),
        'macd_short': macd(data['close'], fast=10, slow=50, signal=5),
        'macd_long': macd(data['close'], fast=200, slow=100, signal=50),
    })

    ta.add_all_ta_features(data, 
                           'open', 
                           'high', 
                           'low', 
                           'close', 
                           'volume', 
                           fillna=True)
    '''

    with NameSpace("bitstamp"):
        #data = pd.concat([data, features], axis='columns')
        automatic_features = [
            Stream.source(list(data[c]), 
                          dtype="float").rename(c) for c in data.columns[1:]
        ]

    feed = DataFeed(automatic_features)
    feed.compile()

    reward_scheme = PBR(price=price)

    action_scheme = BSH(
        cash=cash,
        asset=asset
    ).attach(reward_scheme)

    renderer_feed = DataFeed([
        Stream.source(list(data["date"])).rename("date"),
        Stream.source(list(data["open"]), dtype="float").rename("open"),
        Stream.source(list(data["high"]), dtype="float").rename("high"),
        Stream.source(list(data["low"]), dtype="float").rename("low"),
        Stream.source(list(data["close"]), dtype="float").rename("close"), 
        Stream.source(list(data["volume"]), dtype="float").rename("volume"), 
        Stream.sensor(action_scheme, 
                      lambda s: s.action, dtype="float").rename("action")
    ])

    environment = default.create(
        feed=feed,
        portfolio=portfolio,
        action_scheme=action_scheme,
        reward_scheme=reward_scheme,
        renderer_feed=renderer_feed,
        renderer=[
            PositionChangeChart(),
            default.renderers.PlotlyTradingChart(),
        ],
        window_size=config["window_size"],
        max_allowed_loss=0.9
    )
    return environment

ray.init(num_cpus=3,
         include_dashboard=True,
         address=None,  # set `address=None` to train on laptop
         ignore_reinit_error=True)

register_env("TradingEnv", create_env)

In [21]:
from ray.tune.schedulers import ASHAScheduler
from ray.tune.suggest import ConcurrencyLimiter
from ray.tune.suggest.optuna import OptunaSearch

LR = tune.loguniform(1e-5, 1e-2)
GAMMA = tune.uniform(0.8, 0.9999)
LAMBDA = tune.uniform(0.1, 0.8)
VF_LOSS_COEFF = tune.uniform(0.01, 1.0)
ENTROPY_COEFF = tune.uniform(1e-8, 1e-1)

checkpoint_metric = 'episode_reward_mean'

# Specific configuration keys that will be used during training
env_config_training = {
    "window_size": 14,  # The number of past samples we want to look at (in hours)
    "reward_window_size": 7,  # The number of hours we want to look at in the future to calculate the rewards based on the actions taken
    "max_allowed_loss": 0.90,  # If it goes past 90% loss during the iteration, we don't want to waste time on a "loser".
    "csv_filename": train_csv  # The variable that will be used to differentiate training and validation datasets
}
# Specific configuration keys that will be used during evaluation (only the overridden ones)
env_config_evaluation = {
    "max_allowed_loss": 1.00,  # During validation runs we want to see how bad it would go. Even up to 100% loss.
    "csv_filename": test_csv,  # The variable that will be used to differentiate training and validation datasets
}

search_alg = OptunaSearch()
search_alg = ConcurrencyLimiter(search_alg, max_concurrent=4)

scheduler = ASHAScheduler()

import time
start = time.time()
analysis = tune.run(
    "PPO",
    stop={
        "episode_reward_mean": 5000,
        "training_iteration": 35,
    },
    config={
        "env": "TradingEnv",
        "env_config": env_config_training,
        "log_level": "ERROR",
        #"log_level": "INFO",
        #"log_level": "DEBUG",
        "framework": "torch",
        "ignore_worker_failures": True,
        "num_workers": 2,
        "num_gpus": 0,
        "clip_rewards": True,
        "lr": LR,
        "lr_schedule": [
            [0, 1e-1],
            [int(1e2), 1e-2],
            [int(1e3), 1e-3],
            [int(1e4), 1e-4],
            [int(1e5), 1e-5],
            [int(1e6), 1e-6],
            [int(1e7), 1e-7]
        ],
        "model": {
            "_use_default_native_models": True,
            "use_attention": True,
            "max_seq_len": 10,
            "attention_num_transformer_units": 1,
            "attention_dim": 32,
            "attention_memory_inference": 10,
            "attention_memory_training": 10,
            "attention_num_heads": 1,
            "attention_head_dim": 32,
            "attention_position_wise_mlp_dim": 32,
        },
        "gamma": GAMMA,
        "observation_filter": "MeanStdFilter",
        "lambda": LAMBDA,
        "num_envs_per_worker": 20,
        "vf_share_layers": True,
        "vf_loss_coeff": VF_LOSS_COEFF,
        "entropy_coeff": ENTROPY_COEFF,
        "num_sgd_iter": 10,
        "evaluation_interval": 1,  # Run evaluation on every iteration
        "evaluation_config": {
            "env_config": env_config_evaluation,  # The dictionary we built before (only the overriding keys to use in evaluation)
            "explore": False,  # We don't want to explore during evaluation. All actions have to be repeatable.
        },
    },
    metric=checkpoint_metric,
    mode="max",
    search_alg=search_alg,
    scheduler=scheduler,
    num_samples=1,  # Samples per hyperparameter combination. More averages out randomness. Less runs faster
    #resources_per_trial={"cpu": 1},
    keep_checkpoints_num=10,  # Keep the last 10 checkpoints
    checkpoint_freq=1,  # Do a checkpoint on each iteration (slower but you can pick more finely the checkpoint to use later)
    resume="AUTO",
)
taken = time.time() - start
print(f"Time taken: {taken:.2f} seconds.")
print(f"Best config: {analysis.best_config}")

In [22]:
dfs = analysis.trial_dataframes

ax = None  # This plots everything on the same plot
for d in dfs.values():
    ax = d.episode_reward_mean.plot(ax=ax, legend=False)

In [23]:
import ray.rllib.agents.ppo as ppo

# Get checkpoint
checkpoints = analysis.get_trial_checkpoints_paths(
    trial=analysis.get_best_trial(checkpoint_metric, mode='max'),
    metric=checkpoint_metric
)
checkpoint_path = checkpoints[0][0]

env_config_validation = {
    "window_size": 14,  # The number of past samples we want to look at (in hours)
    "reward_window_size": 7,  # The number of hours we want to look at in the future to calculate the rewards based on the actions taken
    "max_allowed_loss": 1.0,  # If it goes past 10% loss during the iteration, we don't want to waste time on a "loser".
    "csv_filename": valid_csv  # The variable that will be used to differentiate training and validation datasets
}

config = analysis.get_best_config(checkpoint_metric, mode='max')
config['env_config'] = env_config_validation

# Restore agent
agent = ppo.PPOTrainer(
    env="TradingEnv",
    config=config,
)

agent.restore(checkpoint_path)

In [24]:
# See how the model is wrapped by LSTM
agent.get_policy().model

In [17]:
# Instantiate the environment
env = create_env(env_config_validation)

# Run until episode ends
done = False
obs = env.reset()
total_reward = 0

# start with all zeros as state
num_transformers = config["model"][
    "attention_num_transformer_units"]
attention_dim = config["model"]["attention_dim"]
memory = config["model"]["attention_memory_inference"]
init_state = state = [
    np.zeros([memory, attention_dim], np.float32)
    for _ in range(num_transformers)
]

# run one iteration until done
print(f"TradingEnv with {agent.config['env_config']}")
while not done:
    action, state_out, _ = agent.compute_single_action(
        obs, state)
    next_obs, reward, done, info = env.step(action)
    print(f"Obs: {obs}, Action: {action}, Reward: {reward}")
    obs = next_obs
    total_reward += reward
    state = [
        np.concatenate([state[i], [state_out[i]]], axis=0)[1:]
        for i in range(num_transformers)
    ]
print(f"Total reward in test episode: {total_reward}")

env.render()