In [None]:
# conda install -c conda-forge gym
import gym
import matplotlib.pyplot as plt
import numpy as np

import tensorflow as tf
from tensorflow import keras

### Create environment and get basic info from Pole Cart

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

### Neural Network

In [None]:
# this model will serve to predict the next step.
n_inputs = 4 # == 4, because of the 4 variables above.

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(5, activation="sigmoid") # because the output is 1 or 0 (left or right)
])

### Reinforce Algorithm Utility Functions

In [None]:
# one-step function
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis]) # given one sample, the probability of going left
        action = (tf.random.uniform([1,1]) > left_proba) # random probability against left_proba
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32) # prob left = (1-action)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba)) # calculate loss of current step.
    grads = tape.gradient(loss, model.trainable_variables) # store gradient in grads. (to be used later)
    obs, reward, done, info = env.step(int(action[0,0].numpy())) # play the action and get a new observation.
    return obs, reward, done, grads

In [None]:
# like in the previous notebook, lets build a set of episodes that uses play_one_step
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn) # using previous created fn
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break;
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

# this returns a list of rewards per episode and a list of gradients per episode

In [None]:
# calculate future discounted rewards.
# just applies a discount to a reward list
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards) -2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

# discounts all rewards and normilize them
def discount_and_normalize(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor) for rewards in all_rewards] # apply discounts
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards] # normalize

# test
print(discount_rewards([10, 0, -50], discount_factor = 0.8))
print(discount_and_normalize([[10, 0, -50],[10,20]], discount_factor = 0.8))

### Algorithm Hyper-Parameters

In [None]:
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_rate = 0.95
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy

### Main Loop

In [None]:
env = gym.make("CartPole-v1")
env.seed(42);

for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    total_rewards = sum(map(sum, all_rewards))                     # Not shown in the book
    print("\rIteration: {}, mean rewards: {:.1f}".format(          # Not shown
        iteration, total_rewards / n_episodes_per_update), end="") # Not shown
    all_final_rewards = discount_and_normalize(all_rewards,
                                                       discount_rate)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

env.close()

### Play trained model

In [None]:
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

def render_policy_net(model, n_max_steps=500, seed=42):
    frames = []
    env = gym.make("CartPole-v1")
    env.seed(seed)
    np.random.seed(seed)
    obs = env.reset()
    for step in range(n_max_steps):
        frames.append(env.render(mode="rgb_array"))
        left_proba = model.predict(obs.reshape(1, -1))
        action = (tf.random.uniform([1,1]) > left_proba)
        obs, reward, done, info = env.step(int(action[0,0].numpy()))
        #print(obs)
        if done:
            break
    env.close()
    return frames

In [None]:
frames = render_policy_net(model)
plot_animation(frames)