In [119]:
!pip install ray['rllib']

from gym import spaces
import pandas as pd
import numpy as np
import random
import tqdm
import tensorflow as tf
import ray.rllib
import time
import gym, ray
import datetime

from ray.rllib.models import ModelCatalog
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.preprocessors import get_preprocessor
from ray.rllib.models.tf.recurrent_net import RecurrentNetwork
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.tune.registry import register_env
import matplotlib.pyplot as plt

from tensorflow.keras.models import *
from tensorflow.keras.layers import *

tf.random.set_seed(42)



In [109]:
df = pd.read_csv("dataset.csv")
df = df.iloc[:, 1:]

df['Time'] = pd.to_datetime(df['Time'])
df['Time'] = [(s-datetime.datetime(2015,5,1)).total_seconds() for s in df['Time']]
df['Time'] = df['Time']/1000

len(df)
df_train = df[:87500]
df_val = df[87500:].reset_index().iloc[:, 1:]

# Environment

In [120]:
INIT_BAL = 100_000
#for now we only support trades with 1 asset

class Leightweight(gym.Env):

  def __init__(self, df, tts):
    super(Leightweight, self).__init__()

    self.ticker_name = 'RTS'
    self.margin_needed = 12_500
    
    #self.multiplicator = 738
    self.multiplicator = 1.47
    #see https://www.moex.com/en/contract.aspx?code=RTS-12.21

    self.current_balance = INIT_BAL
    self.used_margin = 0 
    self.current_pos = 0
    self.prev_pos = 0

    self.time_alive = 0 
    self.df = df 
    self.tts = tts
    self.done = False
    self.current_ts = self.tts
    self.current_price = self.df.loc[self.current_ts, self.ticker_name + '_' + 'Close']
    self.fut_in_batch = 1
    self.pos_his = [0 for _ in range(self.tts)]

    self.actions_num = 5
    self.shift = (self.actions_num - 1) // 2
    self.action_space = spaces.Discrete(self.actions_num)

    self.observation_space = spaces.Dict({'rem_margin': spaces.Box(-10e6, 
                                                                    10e6, 
                                                                    shape = (1, ), 
                                                                    dtype = np.float32),
                                          'pos_history': spaces.Box(np.ones((self.tts,)) * np.array([-5]), 
                                                                    np.ones((self.tts,)) * np.array([5]), 
                                                                    shape = (self.tts, ), 
                                                                    dtype = np.int_),
                                          'dataframe': spaces.Box(- 10e3, 
                                                                  10e12, 
                                                                  shape = (self.tts, self.df.shape[1]), 
                                                                  dtype = np.float32)})
    
    

  def step(self, action):
    assert action in list(range(5))
    self.time_alive = self.time_alive + 1
    self.current_ts = self.current_ts + 1
    self.prev_price = self.current_price 
    self.prev_pos = self.current_pos
    self.current_price = self.df.loc[self.current_ts, self.ticker_name + '_' + 'Close']
    self.current_pos = action - self.shift
    self.pos_his = self.pos_his[1:] + [self.current_pos]
    change_in_pos = self.current_pos - self.prev_pos
    delta = self.current_pos * self.fut_in_batch * self.multiplicator * (self.current_price - self.prev_price) - 2 * np.abs(change_in_pos) * self.fut_in_batch
    self.current_balance += delta

    self.used_margin = np.abs(self.current_pos) * self.fut_in_batch * self.margin_needed

    if self.current_ts >= len(self.df) - self.tts -1:
      self.done = True

    penalty = 0 
    st = self.pos_his[-1]
    for i in range(self.tts-1):
      if self.pos_his[-2-i] == st:
        penalty +=1
      else:
        break

    self.reward = delta - 20*(penalty) - np.exp(8 * np.abs(np.mean(np.clip(self.pos_his, -1, 1))))

    next_obs_df = self.df[self.current_ts - self.tts + 1: self.current_ts + 1].to_numpy().astype(dtype = np.float32)

    rem_mar = np.reshape(np.array([self.current_balance - self.used_margin-5000]),
                                      newshape = (1,)).astype(dtype = np.float32)


    obs_cur_pos = np.reshape(np.array(self.pos_his),
                              newshape = (self.tts, )).astype(dtype = np.intc)


    if not self.observation_space['rem_margin'].contains(rem_mar):
        print("rem_margin>>", rem_mar, 'dn/c')
    if not self.observation_space['pos_history'].contains(obs_cur_pos):
        print("pos_history>>", obs_cur_pos, 'dn/c')
    if not self.observation_space['dataframe'].contains(next_obs_df):
        print("dataframe>>", dataframe, 'dn/c')

    self.obs = { "rem_margin"  : rem_mar,
                 "pos_history" : obs_cur_pos, 
                 "dataframe"   : next_obs_df} 

    self.reward *= 0.01

    return [self.obs, 
            self.reward, 
            self.done,
            {}]

  def reset(self):
      self.current_balance = INIT_BAL
      self.used_margin = 0
      self.current_pos = 0
      self.time_alive = 0 

      self.current_ts = random.randint(2 * self.tts, len(self.df) - 2 * self.tts)

      self.current_price = self.df.loc[self.current_ts, self.ticker_name + '_' + 'Close']

      next_obs_df = self.df[self.current_ts - self.tts + 1: self.current_ts + 1].to_numpy().astype(dtype = np.float32)

      rem_mar = np.reshape(np.array([self.current_balance - self.used_margin-5000]),
                            newshape = (1, )).astype(dtype = np.float32)


      obs_cur_pos = np.reshape(np.array(self.pos_his),
                                newshape = (self.tts, )).astype(dtype = np.intc)

      if not self.observation_space['rem_margin'].contains(rem_mar):
          print("rem_margin>>", rem_mar, 'dn/c')
      if not self.observation_space['pos_history'].contains(obs_cur_pos):
          print("pos_history>>", obs_cur_pos, 'dn/c')
      if not self.observation_space['dataframe'].contains(next_obs_df):
          print("dataframe>>", dataframe, 'dn/c')

      self.obs = { "rem_margin": rem_mar,
                   "pos_history" : obs_cur_pos, 
                   "dataframe" : next_obs_df} 

      return self.obs

  def render(self, mode = 'human', close = False):
      print(f'Step: {self.current_ts}')
      print(f'Balance: {self.current_balance}')



In [121]:
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer

class T2V(Layer):
    
    def __init__(self, output_dim=None, **kwargs):
        self.output_dim = output_dim
        super(T2V, self).__init__(**kwargs)
    
    def build(self, input_shape):
        self.W = self.add_weight(name='W', shape=(input_shape[-1], self.output_dim), 
                                 initializer='uniform', trainable=True)
        self.P = self.add_weight(name='P', shape=(input_shape[1], self.output_dim), 
                                 initializer='uniform', trainable=True)
        self.w = self.add_weight(name='w', shape=(input_shape[1], 1), 
                                 initializer='uniform', trainable=True)
        self.p = self.add_weight(name='p', shape=(input_shape[1], 1), 
                                 initializer='uniform', trainable=True)
        super(T2V, self).build(input_shape)
    
    def call(self, x):
        original = self.w * x + self.p
        sin_trans = K.sin(K.dot(x, self.W) + self.P)
        return K.concatenate([sin_trans, original], -1)

In [130]:
def make_model(cell_size = 96):

    inp = Input(shape=(128, 30))
    x = BatchNormalization()(inp)
    time = Lambda(lambda x: x[:, :, :1])(x)
    feat = Lambda(lambda x: x[:, :, 1:])(x)
    x = T2V(4)(time)
    x = Concatenate()([x, feat])

    x = Conv1D(32, kernel_size = 3, padding = "same")(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv1D(32, kernel_size = 3, padding = "same")(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv1D(64, kernel_size = 3, padding = "same")(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    

    x = LSTM(cell_size, return_sequences = False)(x)
    x = BatchNormalization()(x)

    x = Dense(96, activation = 'relu')(x)
    x = BatchNormalization()(x)
    x = Dense(64, activation = 'relu', name = "extr")(x)
    x = BatchNormalization()(x)
    x = Dense(32, activation = 'relu')(x)
    out = Dense(1)(x)
    model = Model(inp, out)
    model.compile(loss='mse', 
                  optimizer=tf.keras.optimizers.Adam(), 
                  metrics = ['mae'])
    return model

In [131]:
tf.keras.backend.clear_session()
model = make_model()

feats = tf.keras.Model(model.inputs, model.get_layer("extr").output)
feats.trainable = False
feats.summary()



Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 128, 30)]    0           []                               
                                                                                                  
 batch_normalization (BatchNorm  (None, 128, 30)     120         ['input_1[0][0]']                
 alization)                                                                                       
                                                                                                  
 lambda (Lambda)                (None, 128, 1)       0           ['batch_normalization[0][0]']    
                                                                                                  
 t2v (T2V)                      (None, 128, 5)       772         ['lambda[0][0]']           

In [114]:
import os
from ray import tune
from ray.rllib.agents.dqn.distributional_q_tf_model import \
    DistributionalQTFModel
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.visionnet import VisionNetwork as MyVisionNetwork
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.models.modelv2 import restore_original_dimensions

tf1, tf, tfv = try_import_tf()



class MyKerasModel(TFModelV2):
    """Custom model for policy gradient algorithms."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        super(MyKerasModel, self).__init__(obs_space, action_space,
                                           num_outputs, model_config, name)
      

        tts = 128
        feat = 30
        
        self.dat = tf.keras.layers.Input(
            shape=[64], name="df")
        self.rem_mar = tf.keras.layers.Input(
            shape = [1], name = "Margin")
        self.cur_pos = tf.keras.layers.Input(
            shape = [tts], name = "Positions")
           

        conc = tf.keras.layers.Concatenate()([self.dat, self.cur_pos])

        conc = tf.keras.layers.LayerNormalization()(conc)

        conc = tf.keras.layers.Dense(66,
                                     activation = tf.nn.relu,
                                     kernel_initializer=normc_initializer(0.01))(conc)
        conc = tf.keras.layers.LayerNormalization()(conc)
        
        conc = tf.keras.layers.Dense(33,
                                     activation = tf.nn.relu,
                                     kernel_initializer=normc_initializer(0.01))(conc)
        
        layer_1 = tf.keras.layers.Dense(
            16,
            activation=tf.nn.relu,
            kernel_initializer=normc_initializer(1.0))(conc)

        layer_1 = tf.keras.layers.LayerNormalization()(layer_1)
        
        layer_out = tf.keras.layers.Dense(
            num_outputs,
            name="Output",
            activation=None,
            kernel_initializer=normc_initializer(0.01))(layer_1)


        layer_2 = tf.keras.layers.Dense(
            16,
            activation=tf.nn.relu,
            kernel_initializer=normc_initializer(1.0))(conc)

        layer_2 = tf.keras.layers.LayerNormalization()(layer_2)

        value_out = tf.keras.layers.Dense(
            1,
            name="Value",
            activation=None,
            kernel_initializer=normc_initializer(0.01))(layer_2)
        self.base_model = tf.keras.Model([self.dat, self.rem_mar, self.cur_pos], [layer_out, value_out])
        #self.register_variables(self.base_model.variables)
        self.base_model.summary()

    def forward(self, input_dict, state, seq_lens):
        featuress = feats.predict_on_batch(input_dict["obs"]["dataframe"])
        model_out, self._value_out = self.base_model([featuress,
                                                      input_dict["obs"]["rem_margin"],
                                                      input_dict["obs"]["pos_history"]])
        return model_out, state

    def value_function(self):
        return tf.reshape(self._value_out, [-1])

In [115]:
ray.shutdown()
ray.init()

ModelCatalog.register_custom_model(
    "keras_model", MyKerasModel)
train_env = "train_env"
register_env(train_env, lambda config: Leightweight(df_train, 128))

tf.compat.v1.enable_eager_execution()

config = ray.rllib.agents.ppo.appo.DEFAULT_CONFIG.copy()
config['log_level'] = 'WARN'
config['num_gpus'] = 0
config['num_workers'] = 1
config['model']['custom_model'] = 'keras_model'
config['horizon'] = 100
config['soft_horizon'] = False
config['lr'] = 0.0000001

agent = ray.rllib.agents.ppo.APPOTrainer(config, env=train_env)
agent.get_policy().config["explore"] = True



[2m[36m(RolloutWorker pid=11125)[0m 2021-12-19 14:02:31.063884: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
[2m[36m(RolloutWorker pid=11125)[0m Instructions for updating:
[2m[36m(RolloutWorker pid=11125)[0m Colocations handled automatically by placer.


[2m[36m(RolloutWorker pid=11125)[0m Model: "model"
[2m[36m(RolloutWorker pid=11125)[0m __________________________________________________________________________________________________
[2m[36m(RolloutWorker pid=11125)[0m  Layer (type)                   Output Shape         Param #     Connected to                     
[2m[36m(RolloutWorker pid=11125)[0m  df (InputLayer)                [(None, 64)]         0           []                               
[2m[36m(RolloutWorker pid=11125)[0m                                                                                                   
[2m[36m(RolloutWorker pid=11125)[0m  Positions (InputLayer)         [(None, 128)]        0           []                               
[2m[36m(RolloutWorker pid=11125)[0m                                                                                                   
[2m[36m(RolloutWorker pid=11125)[0m  concatenate (Concatenate)      (None, 192)          0           ['df[0][0]',  



[2m[36m(RolloutWorker pid=11125)[0m Model: "model_1"
[2m[36m(RolloutWorker pid=11125)[0m __________________________________________________________________________________________________
[2m[36m(RolloutWorker pid=11125)[0m  Layer (type)                   Output Shape         Param #     Connected to                     
[2m[36m(RolloutWorker pid=11125)[0m  df (InputLayer)                [(None, 64)]         0           []                               
[2m[36m(RolloutWorker pid=11125)[0m                                                                                                   
[2m[36m(RolloutWorker pid=11125)[0m  Positions (InputLayer)         [(None, 128)]        0           []                               
[2m[36m(RolloutWorker pid=11125)[0m                                                                                                   
[2m[36m(RolloutWorker pid=11125)[0m  concatenate_1 (Concatenate)    (None, 192)          0           ['df[0][0]',

[2m[36m(RolloutWorker pid=11125)[0m   updates=self.state_updates,
[2m[36m(RolloutWorker pid=11125)[0m 2021-12-19 14:02:41,047	ERROR worker.py:431 -- Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::RolloutWorker.__init__()[39m (pid=11125, ip=172.28.0.2)
[2m[36m(RolloutWorker pid=11125)[0m   File "/usr/local/lib/python3.7/dist-packages/ray/rllib/evaluation/rollout_worker.py", line 591, in __init__
[2m[36m(RolloutWorker pid=11125)[0m     seed=seed)
[2m[36m(RolloutWorker pid=11125)[0m   File "/usr/local/lib/python3.7/dist-packages/ray/rllib/evaluation/rollout_worker.py", line 1552, in _build_policy_map
[2m[36m(RolloutWorker pid=11125)[0m     conf, merged_conf)
[2m[36m(RolloutWorker pid=11125)[0m   File "/usr/local/lib/python3.7/dist-packages/ray/rllib/policy/policy_map.py", line 134, in create_policy
[2m[36m(RolloutWorker pid=11125)[0m     observation_space, action_space, merged_config)
[2m[36m(RolloutW

RayActorError: ignored