In [1]:
# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# To get smooth animations
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')

In [2]:
import gym
env = gym.make('CartPole-v1')
obs = env.reset()
obs

array([ 0.02909088, -0.03275333, -0.00021314,  0.00194261])

Here we've created a CartPole environment. This is a 2D simulation in which a cart can be accelerated left or right in order to balance a pole placed on top of it. You can get the list of all available environments by running *'gym.envs.registry.all()'* After the environment is created, you must initialize it using the '*reset()'* method. This returns the first observation. Observations depend on the type of environment. For the CartPole environment, each observation is a 1D NumPy array containing four floats: these floats represent the cart's horizontal position (0.0 = center), its velocity (positive means right), the angle of the pole ( 0.0= vertical) and its angular velocity(positive means clockwise).

Now let's display this environment by calling its render() method

In [5]:
env.render()

True

In [5]:
'''
If you want render() to return the rendered image as a NumPy array, you can set mode='rgb_array' 
(oddly, this environment will render the environment to screen as well):
'''
img = env.render(mode = 'rgb_array')
print('image shape: ',img.shape) #height, width, channels (3= Red, Green, Blue)

#Save as .png
from PIL import Image
image = Image.fromarray(img)
name = 'CartPole-v1.jpg'
image.save(name)
print(f'image saved as {name}')

image shape:  (400, 600, 3)
image saved as CartPole-v1.jpg


In [11]:
#Let's ask the environment what actions are possible:
env.action_space

Discrete(2)

Discrete means that the possible actions are integers 0 and 1, which represent accelerating left (0), or right(1).
Other environments may have other discrete actions, or other kinds of actions (e.g., continuous) Since the pole is
leaning toward the right (obs[2] > 0), let's accelerate the cart toward the right:

In [12]:
action =1 #accelerate right
obs, reward, done, info  = env.step(action)
obs

array([ 0.00935201,  0.16721267,  0.02682759, -0.30439258])

The step() method executes the given action and returns four values:
**obs**:
    This is the new observation. The cart is now moving toward the right (obs[1] > 0)The pole is still tilted toward the right (obs[2] > 0), but its angular velocity is now negative (obs[3] < 0), so it will likely be tilted toward the left affter the next step
    
**reward**:
    In the environment, you get a reward of 1.0 at every step, no matter what you do, so the goal is to keep the episode running as long as possible
    
**done**:
    This value will be '*True*' when the episode is over. This will happen when the pole tillts too much, or goes off the screen, or after 200 steps. (In this last case, you have won) After that, the environment must be reset before it can be used again.
    
**info**:
    This environment-specific dictionary can provide some extra information that you may find useful for debugging or for training. For example, in some games it may indicate how many lives the agent has.
    
 Once you have finished using an environment, you should call its *'close'* method to free resources.

Let's hardcode a simple policy that accelerates left when the pole is leaning toward the left and accelerates right when the pole is leaning toward the right. We will run this policy to see the average rwards it gets over 500 episodes:

In [6]:
def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1
totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        
        if done:
            break
        totals.append(episode_rewards)

In [7]:
import numpy as np
np.mean(totals), np.std(totals), np.min(totals), np.max(totals)

(22.105222437137332, 13.432795392937024, 1.0, 71.0)

In [25]:
#Let's visualize one episode:
env.seed(42)

frames = []
obs = env.reset()
for step in range(200):
    img= env.render(mode ='rgb_array')
    frames.append(img)
    action = basic_policy(obs)
    
    obs,reward,done,info = env.step(action)
    if done:
        break

In [26]:
#Now show the animation:


def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat= False, interval= 40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(
    fig, update_scene, fargs=(frames, patch),
    frames =len(frames), repeat = repeat, interval =interval)
    plt.close()
    return anim

In [10]:
plot_animation(frames)

## Neural Network Policies
Let's create a neural network policy. Just like with the policy we hardcoded earlier, this neural network will take an observation as input, and it will output the action to be executed. More precisely, it will estimate a probability for each action, and then we will select an action randomly, according to the estimated probabilities. In the case of the CartPole environment, there are just two possible actions (left or right), so we only need one output neuron. It will output the probability p of action 0 (left) and of course the probability of action 1 (right) will be 1-p. For example, if it outputs 0.7 then we will pick action 0 with 70% probability or action 1 with 30% probability.

In [11]:
import tensorflow as tf
from tensorflow import keras

In [12]:
n_inputs = 4 # == env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation ='relu', input_shape=[n_inputs]),
    keras.layers.Dense(1, activation ='sigmoid'),
])

## Policy Gradients 

Let's use tf.keras to implement REINFORCE algorithm. We will train the neural network policy we built earlier so that it learns to balance the pole on the cart. First, we need a function that will play one step. We will pretend for now that whatever action it takes is the right one so that we can compute the loss and its gradients (these gradients will just be saved for a while, and we will modify them later depending on how good or bad the action turned out to be):

In [13]:
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = (tf.random.uniform([1,1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs,reward,done,info = env.step(int(action[0,0].numpy()))
    return obs, reward, done, grads

Let's walkthrough this function:

###### left_proba = model(obs[np.newaxis])
* Within the *GradientTape* block, we start by calling the model, giving it a single observation (we reshape the observation so it becomes a batch containing a single instance, as the model expects a batch with np.newaxis). This outputs the probability of going left.

###### action = (tf.random.uniform([1,1]) > left_proba)
* Next, we sample a random float between 0 and 1, and we check whether it is gerater than left_proba. The action will be False with probability left_proba, or True with probability 1 - left_proba. Once we cast this Boolean to a number, the action will be 0 (left) or 1 (right) with the appropriate probabilities.

###### y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
* Then we define the target probability of going left: it is 1 minus the action (cast to a float). If the action is 0 (left), then the target probability of going left will be 1. If the action is 1 (right), then the target probability will be 0.

######           loss = tf.reduce_mean(loss_fn(y_target, left_proba))
###### grads = tape.gradient(loss, model.trainable_variables)
* Then we compute the loss using the given loss function, and we use the tape to compute the gradient of the loss with regard to the model's trainable variables. Again, these gradients will be tweaked later, before we apply them, depending on how good or bad the action turned out to be.

###### obs,reward,done,info = env.step(int(action[0,0].numpy()))
* Finally, we play the selected action, and we return the new observation, the reward, whether the episode is ended or not and of course the gradients that we just computed.

Now let's create another function that will rely on the *' play_one_step() function to play multiple episodes, returning all the rewards and gradients for each episode and each step:


In [14]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs,reward, done, grads = play_one_step(env,obs,model,loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [15]:
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

In [16]:
def discount_and_normalize_rewards(all_rewards,discount_factor):
    all_discounted_rewards = [discount_rewards (rewards, discount_factor)
                             for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
           for discounted_rewards in all_discounted_rewards]

In [17]:
#When you increase the discount_factor the agent consider future rewards more.
discount_rewards([10, 0, -50], discount_factor=0.9)


array([-30, -45, -50])

In [18]:
discount_and_normalize_rewards([[10,0,-50],[10,20]], discount_factor=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [37]:
#Let's define the hyperparameters 
# (Run 150 iterations each iteration play 10 episodes and each episode will last 200 steps.)
n_iterations = 50
n_episodes_per_update = 5
n_max_steps = 200
discount_factor = 0.95

In [38]:
optimizer = keras.optimizers.Adam(lr = 0.01)
loss_fn  = keras.losses.binary_crossentropy

In [39]:
#We are ready to build and run the training loop:
for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
    env,n_episodes_per_update,n_max_steps,model,loss_fn)
    
    total_rewards = sum(map(sum, all_rewards))                     # Not shown in the book
    print("\rIteration: {}, mean rewards: {:.1f}".format(          # Not shown
        iteration, total_rewards / n_episodes_per_update), end="") # Not shown
    
    all_final_rewards =  discount_and_normalize_rewards(all_rewards, discount_factor)
    
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
         
         

Iteration: 49, mean rewards: 200.0

In [40]:

def render_policy_net(model, n_max_steps=200, seed=42):
    frames = []
    env = gym.make("CartPole-v1")
    env.seed(seed)
    np.random.seed(seed)
    obs = env.reset()
    for step in range(n_max_steps):
        frames.append(env.render(mode="rgb_array"))
        left_proba = model.predict(obs.reshape(1, -1))
        action = int(np.random.rand() > left_proba)
        obs, reward, done, info = env.step(action)
        if done:
            break
    env.close()
    return frames



In [42]:
frames = render_policy_net(model)
plot_animation(frames)

Let's walk through this code:

* At each training iteration, this loop calls the play_multiple_episodes() function, which plays the game 10 times and returns all the rewards and gradients for every episode and step

* Then we call the discount_and_normalize_rewards() to compute each action's normalized advantage (which in the code we call the final_reward). This provides a measure of how good or bad each action actually was, in hindsight.

* Next, we go through each trainable variable, and for each of them we compute the weighted mean of the gradients for that variable over all episodes and all steps, weighted by the final_reward.

* Finally we apply these mean gradients using the optimizer: the model's trainable variables will be tweaked, and hopefully the policy will be a bit better

And we're done! This code will train thre neural network policy, and it will successfully learn to balance the pole on the cart. The mean reward per episode will get very close to 200 (which is the maximum by default with this environment). Success!

## Markov Decision Process

Let's define some transition probabilities, rewards and possible actions. For example, in state s0, if action a0 is chosen then with proba 0.7 we will go to state s0 with reward +10, with probability 0.3 we will go to state s1 with no reward, and with never go to state s2 (so the transition probabilities are [0.7, 0.3, 0.0], and the rewards are [+10, 0, 0]):


In [7]:
import pandas as pd
import numpy as np

In [8]:
transition_probabilities = [ # shape=[s, a, s']
        [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
        [[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
        [None, [0.8, 0.1, 0.1], None]]
rewards = [ # shape=[s, a, s']
        [[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
        [[0, 0, 0], [0, 0, 0], [0, 0, -50]],
        [[0, 0, 0], [+40, 0, 0], [0, 0, 0]]]
possible_actions = [[0, 1, 2], [0, 2], [1]]

## Q-Value Iteration


In [9]:
Q_values = np.full((3, 3), -np.inf) # -np.inf for impossible actions
for state, actions in enumerate(possible_actions):
    Q_values[state, actions] = 0.0  # for all possible actions

In [10]:
gamma = 0.90  # the discount factor

history1 = [] # Not shown in the book (for the figure below)
for iteration in range(50):
    Q_prev = Q_values.copy()
    history1.append(Q_prev) # Not shown
    for s in range(3):
        for a in possible_actions[s]:
            Q_values[s, a] = np.sum([
                    transition_probabilities[s][a][sp]
                    * (rewards[s][a][sp] + gamma * np.max(Q_prev[sp]))
                for sp in range(3)])

history1 = np.array(history1) # Not shown

In [11]:
Q_values

array([[18.91891892, 17.02702702, 13.62162162],
       [ 0.        ,        -inf, -4.87971488],
       [       -inf, 50.13365013,        -inf]])

In [12]:
np.argmax(Q_values, axis=1)

array([0, 0, 1])

The optimal policy for this MDP, when using a discount factor of 0.90, is to choose action a0 when in state s0, and choose action a0 when in state s1, and finally choose action a1 (the only possible action) when in state s2.

Let's try again with a discount factor of 0.95:

In [13]:
Q_values = np.full((3, 3), -np.inf) # -np.inf for impossible actions
for state, actions in enumerate(possible_actions):
    Q_values[state, actions] = 0.0  # for all possible actions

In [14]:
gamma = 0.95  # the discount factor

for iteration in range(50):
    Q_prev = Q_values.copy()
    for s in range(3):
        for a in possible_actions[s]:
            Q_values[s, a] = np.sum([
                    transition_probabilities[s][a][sp]
                    * (rewards[s][a][sp] + gamma * np.max(Q_prev[sp]))
                for sp in range(3)])

In [15]:
Q_values

array([[21.73304188, 20.63807938, 16.70138772],
       [ 0.95462106,        -inf,  1.01361207],
       [       -inf, 53.70728682,        -inf]])

In [30]:
np.argmax(Q_values, axis= 1)

array([0, 2, 1])

Now the policy has changed! In state s1, we now prefer to go through the fire (choose action a2). This is because the discount factor is larger so the agent values the future more, and it is therefore ready to pay an immediate penalty in order to get more future rewards.

## Q-Learning

Q-Learning works by watching an agent play (e.g., randomly) and gradually improving its estimates of the Q-Values. Once it has accurate Q-Value estimates (or close enough), then the optimal policy consists in choosing the action that has the highest Q-Value (i.e., the greedy policy).

We will need to simulate an agent moving around in the environment, so let's define a function to perform some action and get the new state and a reward:


In [31]:
def step(state, action):
    probas = transition_probabilities[state][action]
    next_state = np.random.choice([0, 1, 2], p=probas)
    reward = rewards[state][action][next_state]
    return next_state, reward


In [32]:
#Now let's implement the agent's exploration policy. 
#Since the state space is pretty small, a simple random policy will be sufficient
def exploration_policy(state):
    return np.random.choice(possible_actions[state])

In [33]:
#Next we initialize the Q-Values just like earlier, we are ready to run the Q-Learning Algorithm  with learning rate decay
alpha0 = 0.05 #initial learning rate
decay = 0.005 #learning rate decay
gamma = 0.90 #discount factor
state = 0 #initial state

for iteration in range(10000):
    action = exploration_policy(state)
    next_state, reward =  step(state,action)
    next_value = np.max(Q_values[next_state])
    alpha = alpha0 / (1 + iteration * decay)
    Q_values[state,action] *= 1 - alpha
    Q_values[state,action] += alpha * (reward + gamma * next_value)
    state = next_state
    

In [34]:
Q_values

array([[19.11894203, 17.99653154, 14.30196979],
       [ 0.61169518,        -inf, -4.04382923],
       [       -inf, 50.34958178,        -inf]])