In [1]:
import cbpro 
import ray 
import numpy as np 

from datetime import datetime, timedelta
import time

from ray import tune 
from ray.tune.registry import register_env 

import tensortrade.env.default as default

from tensortrade.feed.core import DataFeed, Stream
from tensortrade.oms.instruments import Instrument
from tensortrade.oms.instruments import USD, BTC, ETH
from tensortrade.oms.exchanges import Exchange
from tensortrade.oms.services.execution.simulated import execute_order
from tensortrade.oms.wallets import Wallet, Portfolio

from tensortrade.feed.core import NameSpace




In [2]:
def get_data_range(start, end, granularity, product):
    delta = timedelta(seconds=granularity)
    cur_time = start
    data = np.array([], dtype=np.float32).reshape(0,6)
    while cur_time < end:
        print(cur_time)
        cur_segment = public_client.get_product_historic_rates(product, start=cur_time, end=(cur_time + (delta * 300)), granularity=granularity)
        #print(len(cur_segment))
        cur_time = cur_time + (delta * len(cur_segment))
        #print(cur_time)
        cur_segment = np.flip(np.array(cur_segment), axis=0)
        #print(cur_segment.shape)
        data = np.concatenate((data, cur_segment), axis=0)
        print(data.shape)
        time.sleep(0.34)
    return data 

public_client = cbpro.PublicClient()

now = datetime.now() 
delta = timedelta(days = 120)
start = now - delta
print(start)

granularity = 900 
ETH_USD = get_data_range(start, now, granularity, 'ETH-USD')
print('done')
BTC_USD = get_data_range(start, now, granularity, 'BTC-USD')
print('done')
ETH_BTC = get_data_range(start, now, granularity, 'ETH-BTC')
print('done')

2021-01-07 22:46:39.257865
2021-01-07 22:46:39.257865
(300, 6)
2021-01-11 01:46:39.257865
(600, 6)
2021-01-14 04:46:39.257865
(900, 6)
2021-01-17 07:46:39.257865
(1200, 6)
2021-01-20 10:46:39.257865
(1500, 6)
2021-01-23 13:46:39.257865
(1800, 6)
2021-01-26 16:46:39.257865
(2100, 6)
2021-01-29 19:46:39.257865
(2400, 6)
2021-02-01 22:46:39.257865
(2700, 6)
2021-02-05 01:46:39.257865
(3000, 6)
2021-02-08 04:46:39.257865
(3300, 6)
2021-02-11 07:46:39.257865
(3600, 6)
2021-02-14 10:46:39.257865
(3900, 6)
2021-02-17 13:46:39.257865
(4200, 6)
2021-02-20 16:46:39.257865
(4500, 6)
2021-02-23 19:46:39.257865
(4800, 6)
2021-02-26 22:46:39.257865
(5100, 6)
2021-03-02 01:46:39.257865
(5400, 6)
2021-03-05 04:46:39.257865
(5700, 6)
2021-03-08 07:46:39.257865
(6000, 6)
2021-03-11 10:46:39.257865
(6300, 6)
2021-03-14 13:46:39.257865
(6600, 6)
2021-03-17 16:46:39.257865
(6900, 6)
2021-03-20 19:46:39.257865
(7200, 6)
2021-03-23 22:46:39.257865
(7500, 6)
2021-03-27 01:46:39.257865
(7800, 6)
2021-03-30 04:

In [3]:
def setup_env(config):
    #ETH_USD = config['ETH_USD']
    #BTC_USD = config['BTC_USD']
    #ETH_BTC = config['ETH_BTC']
    #ETH_USD, BTC_USD, ETH_BTC
    coinbase = Exchange("Coinbase", service=execute_order)(
        Stream.source(ETH_USD[:, 4] , dtype="float").rename("USD-ETH"),
        Stream.source(BTC_USD[:, 4], dtype="float").rename("USD-BTC"),
    )
    with NameSpace("coinbase"):
        coinbase_streams = [
            Stream.source(ETH_USD[:, 0] , dtype="float").rename("ETH:date"),
            Stream.source(ETH_USD[:, 1] , dtype="float").rename("ETH:open"),
            Stream.source(ETH_USD[:, 2] , dtype="float").rename("ETH:high"),
            Stream.source(ETH_USD[:, 3] , dtype="float").rename("ETH:low"),
            Stream.source(ETH_USD[:, 4] , dtype="float").rename("ETH:close"),
            Stream.source(ETH_USD[:, 5] , dtype="float").rename("ETH:volume"),
        
            Stream.source(BTC_USD[:, 0] , dtype="float").rename("BTC:date"),
            Stream.source(BTC_USD[:, 1] , dtype="float").rename("BTC:open"),
            Stream.source(BTC_USD[:, 2] , dtype="float").rename("BTC:high"),
            Stream.source(BTC_USD[:, 3] , dtype="float").rename("BTC:low"),
            Stream.source(BTC_USD[:, 4] , dtype="float").rename("BTC:close"),
            Stream.source(BTC_USD[:, 5] , dtype="float").rename("BTC:volume"),
        ]
        
        
    feed = DataFeed(coinbase_streams)

    portfolio = Portfolio(USD, [
        Wallet(coinbase, 5000 * USD),
        Wallet(coinbase, 0.01 * BTC),
        Wallet(coinbase, 0.3 * ETH),
    ])

    renderer_feed = DataFeed([
        Stream.source(ETH_USD[:, 0] , dtype="float").rename("date"),
        Stream.source(ETH_USD[:, 4] , dtype="float").rename("close"),
        Stream.source(ETH_USD[:, 5] , dtype="float").rename("volume"),
    ])
    
    
    from tensortrade.env.default import stoppers

    stopper = stoppers.MaxLossStopper(
        max_allowed_loss=0.3
    )

    from tensortrade.env.default.rewards import RiskAdjustedReturns 

    reward = RiskAdjustedReturns('sortino', window_size = 100)

    env = default.create(
        portfolio=portfolio,
        #action_scheme="managed-risk",
        action_scheme="simple",
        reward_scheme=reward,
        feed=feed,
        stopper=stopper,
        #renderer_feed=renderer_feed,
        #renderer=default.renderers.PlotlyTradingChart(),
        window_size=20
    )

    return env


register_env("TradingEnv", setup_env)


In [4]:
env = setup_env({
        "ETH_USD": ETH_USD,
        "BTC_USD": BTC_USD, 
        "ETH_BTC": ETH_BTC,
    })

dir(env) 
action = 2
#env.step(action)
dir(env)
env.action_space
env.reset()

_, r, _, _ = env.step(1)
print(r)


0.001137379685689127


In [4]:
from ray.rllib.models.tf.layers import NoisyLayer
from ray.rllib.agents.dqn.distributional_q_tf_model import \
    DistributionalQTFModel
from ray.rllib.models import ModelCatalog
import tensorflow as tf 


        
def pctFcn(x):
    x1 = tf.experimental.numpy.diff(x, axis=1)
    x2 = x1 / x[:, 1:, :]
    return x2
        

class CustomDistributionalQModel(DistributionalQTFModel):
    """Custom model for DQN."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name, **kw):
        super(CustomDistributionalQModel, self).__init__(
            obs_space, action_space, num_outputs, model_config, name, **kw)
        
        l1 = model_config['custom_model_config']['l1']
        l2 = model_config['custom_model_config']['l2']
        prob = model_config['custom_model_config']['prob']
        noisy = model_config['custom_model_config']['noisy']
        self.inputs = tf.keras.layers.Input(obs_space.shape, name="observations")
        x = tf.keras.layers.Lambda(pctFcn)(self.inputs)
        x = tf.keras.layers.Flatten()(x)
        x = tf.keras.layers.BatchNormalization()(x)
        
        if noisy: 
            x = NoisyLayer(l1)(x)
        else:
            x = tf.keras.layers.Dense(l1)(x)
        x = tf.keras.layers.Activation('swish')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(prob)(x)
        
        if noisy: 
            x = NoisyLayer(l2)(x)
        else:
            x = tf.keras.layers.Dense(l2)(x)
        x = tf.keras.layers.Activation('swish')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(prob)(x)


        x = tf.keras.layers.Dense(num_outputs, name="my_out")(x)
        
        self.base_model = tf.keras.Model(self.inputs, x)

        
            
        """
        # Define the core model layers which will be used by the other
        # output heads of DistributionalQModel
        self.inputs = tf.keras.layers.Input(
            shape=obs_space.shape, name="observations")
        layer_1 = tf.keras.layers.Dense(
            128,
            name="my_layer1",
            activation=tf.nn.relu,
            kernel_initializer=normc_initializer(1.0))(self.inputs)
        layer_out = tf.keras.layers.Dense(
            num_outputs,
            name="my_out",
            activation=tf.nn.relu,
            kernel_initializer=normc_initializer(1.0))(layer_1)
        self.base_model = tf.keras.Model(self.inputs, layer_out)
        """
    # Implement the core forward method.
    def forward(self, input_dict, state, seq_lens):
        model_out = self.base_model(input_dict["obs"])
        return model_out, state
    
ModelCatalog.register_custom_model("CustomDistributionalQModel", CustomDistributionalQModel)


In [17]:
from ray.rllib.agents import with_common_config

"""
"num_atoms": 1,
"v_min": -10.0,
"v_max": 10.0,
# Whether to use noisy network
"noisy": True,
# control the initial value of noisy nets
"sigma0": 0.5,
# Whether to use dueling dqn
"dueling": True,
# Dense-layer setup for each the advantage branch and the value branch
# in a dueling architecture.
#"hiddens": [512, 256, 128],
# Whether to use double dqn
"double_q": True,
# N-step Q learning
"n_step": 5,
"""




In [2]:
env_cfg = {
        "ETH_USD": ETH_USD,
        "BTC_USD": BTC_USD, 
        "ETH_BTC": ETH_BTC,
    }

custom_model_cfg = {"noisy" : False, 
                    "l1" : tune.randint(256, 1025),
                    "l2" : tune.randint(256, 1025), 
                    "prob" : tune.uniform(0, 1)
                   }

DEFAULT_CONFIG = ({

    # === Model ===
    # Number of atoms for representing the distribution of return. When
    # this is greater than 1, distributional Q-learning is used.
    # the discrete supports are bounded by v_min and v_max
    "n_step": 3,
    "model": {
        "custom_model" : "CustomDistributionalQModel", 
        "custom_model_config" : custom_model_cfg,
    }, 
    # === Exploration Settings ===
    "exploration_config": {
        # The Exploration class to use.
        "type": "EpsilonGreedy",
        # Config for the Exploration class' constructor:
        "initial_epsilon": 1.0,
        "final_epsilon": tune.loguniform(0.05, 0.25),
        "epsilon_timesteps": 20000,  # Timesteps over which to anneal epsilon.

        # For soft_q, use:
        # "exploration_config" = {
        #   "type": "SoftQ"
        #   "temperature": [float, e.g. 1.0]
        # }
    },

    # Minimum env steps to optimize for per train call. This value does
    # not affect learning, only the length of iterations.
    "timesteps_per_iteration": 10,
    # Update the target network every `target_network_update_freq` steps.
    "target_network_update_freq": 15,
    # === Replay buffer ===
    # Size of the replay buffer. Note that if async_updates is set, then
    # each worker will have a replay buffer of this size.
    "buffer_size": 10000,

    # === Optimization ===
    # Learning rate for adam optimizer
    "lr": tune.loguniform(1e-6, 5e-4), 
    # Learning rate schedule
    
    # Size of a batch sampled from replay buffer for training. Note that
    # if async_updates is set, then each worker returns gradients for a
    # batch of this size.
    "train_batch_size": tune.randint(16, 129),
    "evaluation_num_episodes" : 30,
    
    # === Parallelism ===
    # Number of workers for collecting samples with. This only makes sense
    # to increase if your environment is particularly slow to sample, or if
    # you"re using the Async or Ape-X optimizers.
    "num_workers": 1,
    # Whether to compute priorities on workers.
    "worker_side_prioritization": False,
    # Prevent iterations from going lower than this time span
    "num_gpus": 1,
    "num_cpus_for_driver": 1,
    "num_cpus_per_worker": 1,

    
    "env": "TradingEnv",
    #"env_config": env_cfg,

})

DEFAULT_CONFIG

NameError: name 'ETH_USD' is not defined

In [11]:
import tensorflow
print(tensorflow.config.list_physical_devices('GPU'))
print(tensorflow.__version__)

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]
2.4.1


In [33]:
from ray.tune.suggest.hyperopt import HyperOptSearch

search_alg = HyperOptSearch()

stop1 = ray.tune.stopper.TrialPlateauStopper("episode_reward_mean", 0.01, 4, 600)
stop_criteria = ray.tune.stopper.MaximumIterationStopper(10000)
analysis = tune.run(
    "DQN",
    config=DEFAULT_CONFIG,
    stop = [stop_criteria, stop1],
    num_samples = 50,
    metric='episode_reward_mean',
    mode='max',
    search_alg=search_alg,
    local_dir='ray_results',
    verbose=1,
)



2021-05-05 16:42:55,697	INFO tune.py:549 -- Total run time: 17509.88 seconds (17509.30 seconds for the tuning loop).


In [None]:
checkpoints = analysis.get_trial_checkpoints_paths(
    trial=analysis.get_best_trial("episode_reward_mean"),
    metric="episode_reward_mean"
)
checkpoint_path = checkpoints[0][0]
"""
# Restore agent
agent = ppo.PPOTrainer(
    env="TradingEnv",
    config={
        "env_config": {
            "window_size": 25
        },
        "framework": "torch",
        "log_level": "DEBUG",
        "ignore_worker_failures": True,
        "num_workers": 1,
        "num_gpus": 0,
        "clip_rewards": True,
        "lr": 8e-6,
        "lr_schedule": [
            [0, 1e-1],
            [int(1e2), 1e-2],
            [int(1e3), 1e-3],
            [int(1e4), 1e-4],
            [int(1e5), 1e-5],
            [int(1e6), 1e-6],
            [int(1e7), 1e-7]
        ],
        "gamma": 0,
        "observation_filter": "MeanStdFilter",
        "lambda": 0.72,
        "vf_loss_coeff": 0.5,
        "entropy_coeff": 0.01
    }
)
agent.restore(checkpoint_path)



# Instantiate the environment
env = create_env({
    "window_size": 25
})

# Run until episode ends
episode_reward = 0
done = False
obs = env.reset()

while not done:
    action = agent.compute_action(obs)
    obs, reward, done, info = env.step(action)
    episode_reward += reward

env.render()
"""

In [36]:
dir(analysis)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_checkpoints',
 '_configs',
 '_experiment_dir',
 '_experiment_state',
 '_get_trial_paths',
 '_retrieve_rows',
 '_trial_dataframes',
 '_validate_metric',
 '_validate_mode',
 'best_checkpoint',
 'best_config',
 'best_dataframe',
 'best_logdir',
 'best_result',
 'best_result_df',
 'best_trial',
 'dataframe',
 'default_metric',
 'default_mode',
 'fetch_trial_dataframes',
 'get_all_configs',
 'get_best_checkpoint',
 'get_best_config',
 'get_best_logdir',
 'get_best_trial',
 'get_trial_checkpoints_paths',
 'results',
 'results_df',
 'runner_data',
 'stats',
 'trial_dataframes',
 'trials']



In [38]:
dir(analysis)
#analysis.best_config

#analysis.results
analysis.best_result
ana

{'episode_reward_max': 26.45657664656042,
 'episode_reward_min': 26.45657664656042,
 'episode_reward_mean': 26.45657664656042,
 'episode_len_mean': 11535.0,
 'episode_media': {},
 'episodes_this_iter': 0,
 'policy_reward_min': {},
 'policy_reward_max': {},
 'policy_reward_mean': {},
 'custom_metrics': {},
 'hist_stats': {'episode_reward': [26.45657664656042, 26.45657664656042],
  'episode_lengths': [11535, 11535]},
 'sampler_perf': {'mean_raw_obs_processing_ms': 0.10181697646995616,
  'mean_inference_ms': 5.328535632427477,
  'mean_action_processing_ms': 0.04809560211472762,
  'mean_env_wait_ms': 4.400964247493533,
  'mean_env_render_ms': 0.0},
 'off_policy_estimator': {},
 'num_healthy_workers': 1,
 'timesteps_total': 29728,
 'agent_timesteps_total': 29728,
 'timers': {'learn_time_ms': 22.147,
  'learn_throughput': 2799.414,
  'update_time_ms': 9.402},
 'info': {'learner': {'default_policy': {'cur_lr': 7.0480045906151645e-06,
    'mean_q': 0.07622642,
    'min_q': 0.046284016,
    'ma

# PPO

In [None]:
DEFAULT_CONFIG = with_common_config({
    
    "env": "TradingEnv",
    "env_config": env_cfg,

    # Should use a critic as a baseline (otherwise don't use value baseline;
    # required for using GAE).
    "use_critic": True,
    # If true, use the Generalized Advantage Estimator (GAE)
    # with a value function, see https://arxiv.org/pdf/1506.02438.pdf.
    "use_gae": True,
    # The GAE (lambda) parameter.
    "lambda": 1.0,
    # Initial coefficient for KL divergence.
    "kl_coeff": 0.2,
    # Size of batches collected from each worker.
    "rollout_fragment_length": 200,
    # Number of timesteps collected for each SGD round. This defines the size
    # of each SGD epoch.
    "train_batch_size": 4000,
    # Total SGD batch size across all devices for SGD. This defines the
    # minibatch size within each epoch.
    "sgd_minibatch_size": 128,
    # Whether to shuffle sequences in the batch when training (recommended).
    "shuffle_sequences": True,
    # Number of SGD iterations in each outer loop (i.e., number of epochs to
    # execute per train batch).
    "num_sgd_iter": 30,
    # Stepsize of SGD.
    "lr": 5e-5,
    # Learning rate schedule.
    "lr_schedule": None,
    # Coefficient of the value function loss. IMPORTANT: you must tune this if
    # you set vf_share_layers=True inside your model's config.
    "vf_loss_coeff": 1.0,
    "model": {
        # Share layers for value function. If you set this to True, it's
        # important to tune vf_loss_coeff.
        "vf_share_layers": False,
    },
    # Coefficient of the entropy regularizer.
    "entropy_coeff": 0.0,
    # Decay schedule for the entropy regularizer.
    "entropy_coeff_schedule": None,
    # PPO clip parameter.
    "clip_param": 0.3,
    # Clip param for the value function. Note that this is sensitive to the
    # scale of the rewards. If your expected V is large, increase this.
    "vf_clip_param": 100.0,
    # If specified, clip the global norm of gradients by this amount.
    "grad_clip": None,
    # Target value for KL divergence.
    "kl_target": 0.01,
    # Whether to rollout "complete_episodes" or "truncate_episodes".
    "batch_mode": "truncate_episodes",
    # Which observation filter to apply to the observation.
    "observation_filter": "NoFilter",

    # Deprecated keys:
    # Share layers for value function. If you set this to True, it's important
    # to tune vf_loss_coeff.
    # Use config.model.vf_share_layers instead.
    #"vf_share_layers": DEPRECATED_VALUE,
})


In [None]:
stop_criteria = ray.tune.stopper.MaximumIterationStopper(100)
analysis = tune.run(
    "PPO",
    config=DEFAULT_CONFIG, 
    stop = stop_criteria, 
    num_samples = 2, 
)