In [3]:
import tensorflow as tf 
import tensorflow_probability as tfp 
from keras import layers
from keras.models import Model 
from keras.optimizers import Adam
import numpy as np
import gym


We will be using this notebook to show how we can use reinfrcement learning for continuous action space applications, ones in which there are a need for finely tuning actions to be just right. A use for this might be in automated market makers where we want to use a specific volume of capital to make an order. 


In [16]:
class ReplayBuffer(): 
  def __init__(self, input_shape, memory_size=50): 
    self.memory_size = memory_size 
    self.memory_counter = 0 
    self.state_shape = input_shape
    self.state_memory = np.empty((self.memory_size,self.state_shape))
    self.next_state_memory = np.empty((self.memory_size,self.state_shape))
    self.action_memory = np.empty(self.memory_size)
    self.reward_memory = np.empty(self.memory_size) #This will be an array of all the memories
    self.terminal_memory = np.empty(self.memory_size)

  def sample_memories(self, sample_size): 
    state = np.empty((sample_size, self.state_shape))
    next_state = np.empty((sample_size, self.state_shape))
    rewards = np.empty(sample_size)
    actions = np.empty(sample_size)
    terminal = np.empty(sample_size)
    for i in range(sample_size):
      rand_ind = np.random.randint(self.memory_size) #Find a random index from which we want to sample
      state[i] = self.state_memory[rand_ind]
      next_state[i]  = self.next_state_memory[rand_ind]
      rewards[i]  = self.reward_memory[rand_ind]
      actions[i]  = self.action_memory[rand_ind]
      terminal[i]  = self.terminal_memory[rand_ind]
    return state, next_state, rewards, actions, terminal
  
  def remove_memories(self): 
    rand_ind = np.random.randint(self.memory_size-1) #this -1 will just be to keep it within bounds,
  # makingit almost completely uniform random but not quite, at least we always shift the matrix by 1 every time.
    self.state_memory[rand_ind:] = np.concatenate((self.state_memory[rand_ind+1:], np.zeros_like(self.state_memory[-1:])), axis = 0)
    self.next_state_memory[rand_ind:] = np.concatenate((self.next_state_memory[rand_ind+1:], np.zeros_like(self.next_state_memory[-1:])), axis = 0)
    self.reward_memory[rand_ind:] = np.concatenate((self.reward_memory[rand_ind+1:], np.zeros_like(self.reward_memory[-1:])), axis = 0)
    self.action_memory[rand_ind:] = np.concatenate((self.action_memory[rand_ind+1:], np.zeros_like(self.action_memory[-1:])), axis = 0)
    self.terminal_memory[rand_ind:] = np.concatenate((self.terminal_memory[rand_ind+1:], np.zeros_like(self.terminal_memory[-1:])), axis = 0)
  
  def add_memories(self, state, next_state, action, reward, done):
    self.state_memory[self.memory_size-1] = state
    self.next_state_memory[self.memory_size-1] = next_state
    self.reward_memory[self.memory_size-1] = reward
    self.action_memory[self.memory_size-1] = action
    self.terminal_memory[self.memory_size-1] = int(done)

  def start_adding_memories(self, state, next_state, action, reward, done, counter):
    self.state_memory[counter] = state
    self.next_state_memory[counter] = next_state
    self.reward_memory[counter] = reward
    self.action_memory[counter] = action
    self.terminal_memory[counter] = int(done)


In [5]:
class Actor(tf.keras.Model): 
  def __init__(self, num_hidden_units = 1024):
    super(Actor, self).__init__()
    self.shared_1 = layers.Dense(num_hidden_units, activation = "relu")
    self.shared_2 = layers.Dense(num_hidden_units, activation = "relu")
    self.actor = layers.Dense(1, "linear")
  
  def call(self, state): 
    x = self.shared_1(state)
    x = self.shared_2(x)
    action = self.actor(x)
    return action

In [18]:
class Critic(tf.keras.Model): 
  def __init__(self, num_hidden_units = 1024):
    super(Critic, self).__init__()
    self.shared_1 = layers.Dense(num_hidden_units, activation = "relu")
    self.shared_2 = layers.Dense(num_hidden_units, activation = "relu")
    self.critic = layers.Dense(1, "linear")

  def call(self, state, action):
    x = self.shared_1(tf.concat([state, action], axis = 1))
    x = self.shared_2(x)
    value = self.critic(x)
    return value
  


In [25]:
class Agent(): 
  def __init__(self, input_shape, memory_size = 50, max_action_val=1, min_action_val=-1, alpha = 0.001, gamma = 0.99, rho = 0.005):
    self.gamma = gamma
    self.aplha = alpha 
    self.rho = rho
    self.actor_network = Actor()  
    self.target_actor_network = Actor() 
    self.target_actor_network.set_weights(self.actor_network.get_weights())
    self.critic_network = Critic()
    self.target_critic_network = Critic() 
    self.target_critic_network.set_weights(self.critic_network.get_weights())
    self.max_action_val = max_action_val
    self.min_action_val = min_action_val
    self.optimizer = Adam(learning_rate = alpha)
    self.buff = ReplayBuffer(memory_size = memory_size, input_shape = 4)

  def choose_action(self, state): 
    tf_obs = tf.convert_to_tensor([state])
    rn = np.random.normal()
    action = self.actor_network.call(tf_obs)[0][0]
    action = tf.clip_by_value(action + rn, self.max_action_val, self.min_action_val)
    
    return action.numpy()

  def learn(self, done):
    states, next_states, rewards, actions, terminal = self.buff.sample_memories(10)
    states = tf.convert_to_tensor(states, dtype=tf.float32)
    next_states = tf.convert_to_tensor(next_states,dtype=tf.float32)
    rewards = tf.convert_to_tensor(rewards,dtype=tf.float32)
    actions = tf.convert_to_tensor(actions,dtype=tf.float32)
    terminal =  tf.convert_to_tensor(terminal,dtype=tf.float32)
  
    with tf.GradientTape() as tape: 
      next_actions = self.target_actor_network.call(next_states)
      target_val = self.target_critic_network.call(next_states, next_actions)
      val = self.critic_network.call(next_states, next_actions)
      target = rewards + self.gamma*(1-int(done))*(target_val) 
      critic_loss = (target-val)**2
      total_critic_loss = tf.reduce_mean(critic_loss) 

    critic_grads = tape.gradient(total_critic_loss, self.critic_network.trainable_variables)
    self.optimizer.apply_gradients(zip(critic_grads, self.critic_network.trainable_variables))

    with tf.GradientTape() as tape:
      other_actions = self.actor_network.call(states)
      other_vals = self.critic_network.call(states, other_actions)
      actor_loss = -tf.reduce_mean(other_vals)#We will perform gradient ascent
    actor_grads = tape.gradient(actor_loss, self.actor_network.trainable_variables)
    self.optimizer.apply_gradients(zip(actor_grads, self.actor_network.trainable_variables))

    self.update()

  def update(self): 
    critic_weights = self.critic_network.get_weights()
    curr_critic_target_weights = self.target_critic_network.get_weights()
    critic_target_weights = []
    for i in range(len(critic_weights)):
      critic_target_weights.append((1-self.rho) * curr_critic_target_weights[i] + self.rho *  critic_weights[i])

    self.target_critic_network.set_weights(critic_target_weights)

    actor_weights = self.actor_network.get_weights()
    curr_actor_target_weights = self.target_actor_network.get_weights()
    actor_target_weights = []
    for i in range(len(actor_weights)):
      actor_target_weights.append((1-self.rho) * curr_actor_target_weights[i] + self.rho *  actor_weights[i])

    self.target_actor_network.set_weights(actor_target_weights)


In [20]:
env = gym.make('CartPole-v0')

  logger.warn(
  deprecation(
  deprecation(


Admittedly I need to find a more appropraite env to run this on, this env is only used since it is the env I am using throughoutall the code. You will see some rounding taking place which is not ideal. 

In [None]:

agent = Agent(input_shape = 4)
added = 0
for i in range(2000): 
  state = env.reset()
  done = False
  total_reward = 0
  while not done: 
    action = agent.choose_action(state)
    next_state, reward, done, _ = env.step(action.astype(np.int32))
    total_reward+= reward
    #update the policy and the q-network
    if added >= 50:
      agent.buff.remove_memories()
      agent.buff.add_memories(state, next_state, action, reward, done)
      agent.learn(done)
    else: 
      agent.buff.start_adding_memories(state, next_state, action, reward, done, added)
      added = added +1

    state = next_state
  if i%20 == 0: 
    print(total_reward)


10.0
