In [0]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')
import gym

In [0]:
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        # Call the model, give it single observation, outputs probability of going left
        left_proba = model(obs[np.newaxis])
        
        # Sample random float between 0 and 1, check if greater than left_proba
        action = (tf.random.uniform([1,1]) > left_proba)
        
        # Define target probability of going left
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(int(action[0,0].numpy()))
    return obs, reward, done, grads

In [0]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    # Plays multiple episodes
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        # Plays an episode
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    # Returns list of reward lists (one per episode, containing one reward per step) and
    # a list of gradient lists (one per episode, one tuple of gradients per step, each tuple containing one gradient
    # tensor per trainable variable)
    return all_rewards, all_grads

In [0]:
# Computes the discounted rewards
def discount_rewards(rewards, discount_rate):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_rate
    return discounted

# Discounts and normalizes rewards
def discount_and_normalize_rewards(all_rewards, discount_rate):
    all_discounted_rewards = [discount_rewards(rewards, discount_rate) for rewards in all_rewards] 
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]

In [5]:
discount_rewards([10, 0, -50], discount_rate=0.8)

array([-22, -40, -50])

In [6]:
# Positive normalized action advantages are good whereas negative normalized action advantages are bad
discount_and_normalize_rewards([[10,0,-50], [10,20]], discount_rate=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [0]:
# Hyperparameters
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_rate = 0.95
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy

In [0]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

n_inputs = 4 # == env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid"),
])

In [0]:
with tf.device('/device:GPU:0'):

  env = gym.make("CartPole-v1")
  env.seed(42)

  for iteration in range(n_iterations):
      # Plays the game n times and returns the rewards and gradients for every episode and step
      all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps,model, loss_fn)
      
      total_rewards = sum(map(sum, all_rewards))                     
      print("\rIteration: {}, mean rewards: {:.1f}".format(iteration, total_rewards / n_episodes_per_update), end="")
      
      # Computes each action's normalized advantage, provides measure of how good each action was
      all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_rate)
      
      all_mean_grads = []
      for var_index in range(len(model.trainable_variables)):
          mean_grads = tf.reduce_mean([final_reward * all_grads [episode_index][step][var_index] 
                                      for episode_index, final_rewards in enumerate(all_final_rewards)
                                      for step, final_reward in enumerate(final_rewards)], axis=0)
          all_mean_grads.append(mean_grads)
      optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

  env.close()

Iteration: 10, mean rewards: 31.0