## MountainCarContinuous v0

In [177]:
# conda install -c conda-forge gym
import gym
import matplotlib.pyplot as plt
import numpy as np

import tensorflow as tf
from tensorflow import keras

In [178]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

### Red Neuronal

- Recibe 2 inputs [car position, car velocity]
- valor:Push car to the left (negative value) or to the right (positive value)


In [308]:
# modelo que predice el proximo paso. segun lo observado.
n_inputs = 2 # [car position, car velocity]

model = keras.models.Sequential([
    keras.layers.Dense(10, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid")
])

### REINFORCE Utility Functions

**tf.GradientTape()**: TensorFlow proporciona la API tf.GradientTape para la diferenciación automática; es decir, calcular el gradiente de un cálculo con respecto a algunas entradas, normalmente tf.Variables. TensorFlow "registra" las operaciones relevantes ejecutadas dentro del contexto de un tf.GradientTape en una "cinta". Luego, TensorFlow usa esa cinta para calcular los gradientes de un cálculo "grabado" mediante la diferenciación en modo inverso.

In [321]:
import math

def calculate_reward(y_actual):
    return math.pow(y_actual, 2) * 0.1

0.016000000000000004


In [322]:
import random

# se calcula la probabilidad de ir a la izquierda.
# se calcula el gradiente
# se ejecuta un paso de env.step(...)
# return: se devuelve lo que genero el paso (observaciones) y el gradiente actual.

def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis]) # dada una muestra, la probabilidad de ir a la izquierda
        action = (tf.random.uniform([1,1]) > left_proba) # probabilidad aleatoria contra left_proba
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32) # prob left = (1-action)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba)) # calcular la pérdida del paso actual.
        grads = tape.gradient(loss, model.trainable_variables) # almacenar gradiente en grads. (usar más tarde)        
    obs, reward, done, info = env.step([action[0,0].numpy()]) # juega la acción y obtén una nueva observación.
    reward = calculate_reward(obs[0])
    return obs, reward, done, grads

In [312]:
# un episodio es un epoch
# un episodio se compone de varios steps.
# vamos a calcular los rewards del step actual.
# vamos a acumular los rewards
# vamos a acumular los gradientes.
# return: se devuelve todos los rewards acumuladors y todos los gradientes
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn) # se ejecuta un step.
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break;
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

# esto devuelve una lista de recompensas por episodio y una lista de gradientes por episodio

In [313]:
# Recordemos que debemos descontar gamma (discount_factor) a los rewards
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards) -2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

# se aplican los descuentos a los rewards y ademas se normalizan los datos.
def discount_and_normalize(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor) for rewards in all_rewards] # aplica descuento
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards] # normalizacion

### Ajuste de hiper-parametros

In [314]:
n_iterations = 100
n_episodes_per_update = 10
n_max_steps = 200
discount_rate = 0.95
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.mse

### Main Loop de Entrenamiento

In [323]:
# creamos el ambiente en Open AI-Gym
env = gym.make("MountainCarContinuous-v0")
env.seed(42);

# el main loop de entrenamiento
for iteration in range(n_iterations):
    
    # Ejecutamos multiples episodios los cuales tienen varios steps
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    
    # acumula todos los rewards
    total_rewards = sum(map(sum, all_rewards))                    
   
   

    print("\rIteration: {}, mean rewards: {:.1f}".format(          
        iteration, total_rewards / n_episodes_per_update), end="")
    
    # los rewards se les aplica el descuento y se normalizan
    all_final_rewards = discount_and_normalize(all_rewards,
                                                       discount_rate)
    
    # se calcula la media ponderada de los gradientes para cada variable
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

env.close()

Iteration: 99, mean rewards: 2.4

### Play trained model

In [324]:
from matplotlib.animation import FuncAnimation

def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

def render_policy_net(model, n_max_steps=1500, seed=42):
    frames = []
    env = gym.make("MountainCarContinuous-v0")
    env.seed(seed)
    np.random.seed(seed)
    obs = env.reset()
    for step in range(n_max_steps):
        frames.append(env.render(mode="rgb_array"))
        left_proba = model.predict(obs.reshape(1, -1))
        action = (tf.random.uniform([1,1]) > left_proba)
        obs, reward, done, info = env.step([float(action[0,0].numpy())])
        #print(obs)
        if done:
            break
    env.close()
    return frames

In [325]:
frames = render_policy_net(model)
plot_animation(frames)

<matplotlib.animation.FuncAnimation at 0x2a46d8e2c88>