In [1]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential

In [2]:

env = gym.envs.make("CartPole-v1")

inputs = env.observation_space.shape[0]
# print("Observation space: ", env.observation_space)

model = Sequential([Dense(7, activation="relu", input_shape=[inputs]),
                        Dense(1, activation="sigmoid")])


In [3]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 7)                 35        
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 8         
Total params: 43
Trainable params: 43
Non-trainable params: 0
_________________________________________________________________


In [4]:
def play_one_step(env, obs, model, loss_fn):
    
    with tf.GradientTape() as tape:
        
        left_prob = model(obs[np.newaxis])
        
        action = tf.random.uniform(shape=[1, 1]) > left_prob
        
        y_target = tf.constant([[1.0]]) - tf.cast(action, tf.float32)
        
        loss = tf.reduce_mean(loss_fn(y_target, left_prob))
        
        
    gradients = tape.gradient(loss, model.trainable_variables)
    
    
    obs, reward, done, info = env.step(int(action[0, 0].numpy()))
    
    
    return obs, reward, done, gradients


In [5]:
def play_multiple_episodes(env, max_episodes, max_steps, model, loss_fn):
    
    rewards = []
    grads = []
    
    for episode in range(max_episodes):
        
        rewards_per_episode = []
        grads_per_episode = []
        
        obs = env.reset()
        
        for step in range(max_steps):
            
            obs, reward, done, grad = play_one_step(env, obs, model, loss_fn)
            
            rewards_per_episode.append(reward)
            grads_per_episode.append(grad)
            
            if done:
                break
                
        rewards.append(rewards_per_episode)
        grads.append(grads_per_episode)
        
    return rewards, grads



In [6]:
def discount_rewards(all_rewards, gamma):
    
    discounted = np.array(all_rewards)
    
    for step in range(len(all_rewards) - 2, -1, -1):
        discounted[step] = discounted[step] + gamma * discounted[step + 1]
    
    return discounted

In [7]:
def discount_normalize_rewards(all_rewards, gamma):
    
    all_discounted_rewards = [discount_rewards(reward, gamma) for reward in all_rewards]
    
#     print("all_discounted_rewards: ", np.asarray(all_discounted_rewards).shape, "  ", all_discounted_rewards)
    
    flat_rewards = np.concatenate(all_discounted_rewards)
    
#     print("flat_rewards: ", np.asarray(flat_rewards).shape, " ", flat_rewards)
    
    reward_mean = flat_rewards.mean()
    
    reward_std = flat_rewards.std()
    
    return [(discount_reward - reward_mean) / reward_std for discount_reward in all_discounted_rewards]

In [8]:
discount_rewards([10, 0, -50], 0.8)

array([-22, -40, -50])

In [9]:
discount_normalize_rewards([[10, 0, -50], [10, 20]], 0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [10]:
n_iterations = 50
n_episodes_per_update = 5
max_steps = 200
gamma = 0.95

In [11]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [12]:
loss_fn = tf.keras.losses.binary_crossentropy

In [13]:
# model.trainable_variables

In [14]:
for iteration in range(n_iterations):
    
    all_rewards, all_gradients = play_multiple_episodes(env, n_episodes_per_update, max_steps, model, loss_fn)
    
    all_final_rewards = discount_normalize_rewards(all_rewards, gamma)
    
    all_mean_gradients = []
    
    for variable_index in range(len(model.trainable_variables)):
        
        mean_grads = tf.reduce_mean([step_final_reward * all_gradients[episode_index][step][variable_index]
                                     for episode_index, final_episodic_reward in enumerate(all_final_rewards)
                                     for step, step_final_reward in enumerate(final_episodic_reward)], axis=0)
        
        all_mean_gradients.append(mean_grads)
        
        optimizer.apply_gradients(zip(all_mean_gradients, model.trainable_variables))
    



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



In [20]:
rewards, grads = play_multiple_episodes(env, 10, max_steps, model, loss_fn)

In [21]:
rewards # after training

[[1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0],
 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
 [1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0],
 [1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0],
 [1.0,
  1.0,
  1.0,
  1.0,
  1.0,
