In [1]:
import gym
import numpy as np
import tensorflow as tf

In [8]:
try: env
except NameError: env = None
if env: env.close()

env = gym.make("CartPole-v1")
obs = env.reset()
list(zip(['Velocity', 'Position', 'Angle', 'Angular velocity'], obs))

[('Velocity', -0.012878723447980411),
 ('Position', 0.01592590876217463),
 ('Angle', -0.034427017752065796),
 ('Angular velocity', 0.01857454853460591)]

## Hardcoded policy

In [None]:
from time import sleep

render = False
policy = lambda obs: int(obs[2] > 0)  # Move to the same direction of the pole 

rewards = []
for episode in range(100):
    obs = env.reset()
    episode_reward = 0
    for step in range(200):
        action = policy(obs)
        obs, reward, done, info = env.step(action)
        episode_reward += reward
        
        if render:
            env.render()
            sleep(0.001)
        if done:
            rewards.append(episode_reward)
            break    
            
print(f'Mean: {np.mean(rewards)}, Std: {np.std(rewards):.2f}, Min: {np.min(rewards)}, Max: {np.max(rewards)}'    )

## NN Policy Gradients `REINFORCE`

1. First, let the neural network policy play the game several times, and at each step, compute the gradients that would make the chosen action even more likely but don’t apply these gradients yet.
2. Once you have run several episodes, compute each action’s advantage (using the method described in the previous section).
3. If an action’s advantage is positive, it means that the action was probably good, and you want to apply the gradients computed earlier to make the action even more likely to be chosen in the future. However, if the action’s advantage is negative, it means the action was probably bad, and you want to apply the opposite gradients to make this action slightly less likely in the future. The solution is simply to multiply each gradient vector by the corresponding action’s advantage.
4. Finally, compute the mean of all the resulting gradient vectors, and use it to perform a Gradient Descent step

### Create a model

In [10]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.losses import binary_crossentropy
from tensorflow.keras.optimizers import Adam

model = Sequential([
    Dense(16, activation='elu', input_shape=[env.observation_space.shape[0]]),
    Dense(1, activation='sigmoid'),  # Binary action, left or right
])

optimizer = Adam(lr=1e-2)
loss_fn = binary_crossentropy

### Break down of the problem in helper function steps

In [119]:
from time import sleep

def run_one_step(env: gym.Env, obs: np.ndarray,
                 model: tf.keras.Model, loss_fn: tf.keras.losses,
                 render: bool):
    """Run one step of the environment. Option to render with 100 fps.
    
    - Compute and record gradients assuming all taken actions are correct
    - Advance one step in the environment
    """
    
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = tf.random.uniform([1, 1]) > left_proba
        
        # Assume always correct. i.e. left action == 0 -> left proba == 1
        target_y = tf.constant([1.]) - tf.cast(action, tf.float32)
        loss = loss_fn(target_y, left_proba)
    
    gradients = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(int(action))
    
    if render:
        env.render()
        sleep(0.01) # 100 fps
    
    return obs, reward, done, gradients


def run_episodes(env: gym.Env, n_episodes: int, n_max_steps: int,
                 model: tf.keras.Model, loss_fn: tf.keras.losses,
                 render: bool = False):
    """Run multiple episodes looping over run_one_step
    
    - Record rewards and gradients
    """
    
    all_gradients = []
    all_rewards = []
    for episode in range(n_episodes):
        obs = env.reset()
        episode_gradients = []
        episode_rewards = []
        for step in range(n_max_steps):
            obs, reward, done, gradients = run_one_step(env, obs, model, loss_fn, render)
            episode_rewards.append(reward)
            episode_gradients.append(gradients)

            if done:
                break
                
        all_rewards.append(episode_rewards)       
        all_gradients.append(episode_gradients)
                
    return all_gradients, all_rewards
  

def discount_all_rewards(all_rewards: '2D arary', epsilon):
    """Discount rewards (list of lists) with factor epsilon"""
    
    all_discounted_rewards = []    
    for episode_rewards in all_rewards:
        discounted_rewards = [episode_rewards[-1]]
        for i in range(len(episode_rewards) - 1):
            discounted_rewards.append(episode_rewards[-i + 1] + epsilon*discounted_rewards[i])
        discounted_rewards.reverse()
        
        all_discounted_rewards.append(discounted_rewards)
   
    return all_discounted_rewards


def normalize_all_rewards(all_rewards: '2D array'):
    """Normalize rewards (list of lists)"""   
    
    all_norm_rewards = []
    for episode_rewards in all_rewards:
        rewards_mean = np.mean(episode_rewards)
        rewards_std = np.std(episode_rewards)
        
        normed_episode_rewards = (np.array(episode_rewards) - rewards_mean ) / rewards_std
        
        all_norm_rewards.append(normed_episode_rewards)
    
    return all_norm_rewards

In [112]:
def report_stats(rewards):
    rewards = [sum(reward) for reward in rewards]
    rewards = np.array(rewards)
    return f'Mean: {np.mean(rewards)}, Std: {np.std(rewards):.2f}, Min: {np.min(rewards)}, Max: {np.max(rewards)}'        

In [114]:
n_episodes = 5
n_max_steps = 200

train_steps = 100
epsilon=0.95

for train_step in range(train_steps):
    gradients, rewards = run_episodes(env=env, n_episodes=n_episodes, n_max_steps=n_max_steps, 
                                      model=model, loss_fn=loss_fn, render=False)
    
    if not train_step%10:
        print(report_stats(rewards))
    
    discounted_rewards = discount_all_rewards(rewards, epsilon)
    normalized_rewards = normalize_all_rewards(discounted_rewards)
    
    all_gradients = []
    for var_index in range(len(model.trainable_variables)):
        var_gradients = []
        for episode_rewards, episode_gradients in zip(normalized_rewards, gradients):
            for step_reward, step_gradients in zip(episode_rewards, episode_gradients):
                var_gradients.append(step_reward*step_gradients[var_index])
                
        var_gradients = np.mean(var_gradients, axis=0)
        all_gradients.append(var_gradients)
                
    optimizer.apply_gradients(zip(all_gradients, model.trainable_variables))

Mean: 24.4, Std: 14.97, Min: 13.0, Max: 54.0
Mean: 43.6, Std: 15.29, Min: 26.0, Max: 72.0
Mean: 59.4, Std: 25.91, Min: 24.0, Max: 91.0
Mean: 134.2, Std: 54.24, Min: 38.0, Max: 200.0
Mean: 129.4, Std: 60.71, Min: 34.0, Max: 200.0
Mean: 193.4, Std: 8.24, Min: 181.0, Max: 200.0
Mean: 173.4, Std: 22.46, Min: 147.0, Max: 200.0
Mean: 147.4, Std: 36.44, Min: 89.0, Max: 200.0
Mean: 158.0, Std: 28.98, Min: 120.0, Max: 200.0
Mean: 198.8, Std: 2.40, Min: 194.0, Max: 200.0


### Visualizing the learned model

In [132]:
_, all_rewards = run_episodes(env, n_episodes=1, n_max_steps=n_max_steps,
                              model=model, loss_fn=loss_fn, render=True)

print(f'{int(sum(all_rewards[0]))}/{n_max_steps}')

200/200
