In [None]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version only exists in Colab.
    %tensorflow_version 2.x
    !apt update && apt install -y libpq-dev libsdl2-dev swig xorg-dev xvfb
    !pip install -q -U tf-agents pyvirtualdisplay gym[atari,box2d]
    IS_COLAB = True
except Exception:
    IS_COLAB = False

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.config.list_physical_devices('GPU'):
    print("No GPU was detected. CNNs can be very slow without a GPU.")
    if IS_COLAB:
        print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# To get smooth animations
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "rl"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

    

In [None]:
# https://gym.openai.com/docs/
    
import gym

In [None]:
# Listar todos os ambientes virtuais disponiveis no Open AI Gym

gym.envs.registry.all()

In [None]:
# Criar o ambiente CartPole
# https://gym.openai.com/envs/CartPole-v1/

env = gym.make('CartPole-v1')


In [None]:
# O ambiente é inicializado através da chamada ao método reset()
# O método devolve a primeira observação, no formato de um array NumPy com 4 valores reais
# (Posição horizontal, Velocidade, Ângulo de inclinação, Velocidade angular)

env.seed(42)

obs = env.reset()

In [None]:
# (Posição horizontal, Velocidade, Ângulo de inclinação, Velocidade Angular)

obs

In [None]:
# Obter uma imagem do ambiente no formato NumPy Array
# Poderá também surgir um pop-up com a imagem. Se for esse o caso, podem ignorar essa janela

img = env.render(mode="rgb_array")
img.shape

In [None]:

def plot_environment(env, figsize=(5,4)):
    plt.figure(figsize=figsize)
    img = env.render(mode="rgb_array")
    plt.imshow(img)
    plt.axis("off")
    return img

In [None]:
# Visualizar a imagem obtida

plot_environment(env)
plt.show()

In [None]:
# Verificar quantas ações é possível efetuar neste ambiente

env.action_space


In [None]:
# 0 - accelerate left
# 1 - accelerate right

action = 1  


In [None]:
# Executar uma ação
# Devolve nova observação, recompensa, informação sobre o final do episódio e outra informação 

obs, reward, done, info = env.step(action)
obs

In [None]:
print('Reward:', reward)
print('Done:', done)
print('Additional info:', info)

In [None]:
# Continuar a acelerar para a direita
# Verificar se as variações nas observações são consistentes com esta ação
action = 1  # accelerate right
obs, reward, done, info = env.step(action)
obs

In [None]:
plot_environment(env)
plt.show()

In [None]:
# Quando termina a simulação, é necessário reinicializar o ambiente

if done:
    obs = env.reset()

In [None]:

env.seed(42)

# Politica básica
# Acelerar para tentar diminuir a inclinação do poste
# a observação obs[2] contém o ângulo de inclinação. 

def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1



In [None]:

# Simular 500 episódios 
# Recolher as recompensas em cada um destes episódios
# Cada episódio termina quando a variável done fica a true.

totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)

In [None]:
# Verificar o desempenho médio e a duração dos episódios com maior e menor duração
# Analise os resultados

np.mean(totals), np.std(totals), np.min(totals), np.max(totals)


In [None]:
# Executar um episódio e recolher imagens em cada um dos passos

env.seed(42)

frames = []

obs = env.reset()
for step in range(200):
    img = env.render(mode="rgb_array")
    frames.append(img)
    action = basic_policy(obs)

    obs, reward, done, info = env.step(action)
    if done:
        break

In [None]:
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

In [None]:
# Visualizar um episódio

plot_animation(frames)


In [None]:
# Neural Network Policies

keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)



In [None]:
# Criar uma rede neuronal básica que recebe uma observação e devolve a probabilidade de executar a ação 0

n_inputs = 4 

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid"),
])

In [None]:
 # Função que executa um episódio e recolhe as imagens para visualização numa animação

def render_policy_net(model, n_max_steps=200, seed=42):
    frames = []
    env = gym.make("CartPole-v1")
    env.seed(seed)
    np.random.seed(seed)
    obs = env.reset()
    for step in range(n_max_steps):
        frames.append(env.render(mode="rgb_array"))
        left_proba = model.predict(obs.reshape(1, -1))
        action = int(np.random.rand() > left_proba)
        obs, reward, done, info = env.step(action)
        if done:
            print('Performed', step, 'steps')
            break
    env.close()
    return frames

In [None]:
# Avaliar o desempenho da rede
# Ela ainda não foi treinada e terá um desempenho aleatório

frames = render_policy_net(model)
plot_animation(frames)


In [None]:
# Aprender uma estratégia básica -> Problema resolvido como uma situação supervisionada
# Pretende-se que a rede aprenda uma estratégia igual à estratégia básica que implementámos anteriormente
# Neste caso o target será definido em função do ângulo de inclinação do poste (obs[2]).

# Treinar em 50 ambientes diferentes, durante 5000 iterações

n_environments = 50
n_iterations = 5000

envs = [gym.make("CartPole-v1") for _ in range(n_environments)]
for index, env in enumerate(envs):
    env.seed(index)
np.random.seed(42)
observations = [env.reset() for env in envs]
optimizer = keras.optimizers.RMSprop()
loss_fn = keras.losses.binary_crossentropy

for iteration in range(n_iterations):
    # if angle < 0, we want proba(left) = 1., or else proba(left) = 0.
    target_probas = np.array([([1.] if obs[2] < 0 else [0.])
                              for obs in observations])
    with tf.GradientTape() as tape:
        left_probas = model(np.array(observations))
        loss = tf.reduce_mean(loss_fn(target_probas, left_probas))
    print("\rIteration: {}, Loss: {:.3f}".format(iteration, loss.numpy()), end="")
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    actions = (np.random.rand(n_environments, 1) > left_probas.numpy()).astype(np.int32)
    for env_index, env in enumerate(envs):
        obs, reward, done, info = env.step(actions[env_index][0])
        observations[env_index] = obs if not done else env.reset()

for env in envs:
    env.close()

In [None]:
# Avaliar o desempenho do modelo treinado
# Mudar a seed s para testar o comportamento em ambientes diferentes

s = 42

frames = render_policy_net(model, 200, s)
plot_animation(frames)

In [None]:
# Implementar uma política baseada em gradiente, utilizando uma variante do método REINFORCE

In [None]:
# Método auxiliar para executar um passo de um episódio

def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = (tf.random.uniform([1, 1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(int(action[0, 0].numpy()))
    return obs, reward, done, grads

In [None]:
# Método que permite executar vários episódios e guardar os gradientes e as recompensas

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [None]:
# Métodos auxiliares para processamento de recompensas

def discount_rewards(rewards, discount_rate):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_rate
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_rate):
    all_discounted_rewards = [discount_rewards(rewards, discount_rate)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]

In [None]:
# Criar e preparar a rede para o treino com o algoritmo indicado

n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_rate = 0.95

optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy

keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[4]),
    keras.layers.Dense(1, activation="sigmoid"),
])

In [None]:
# Treinar

env = gym.make("CartPole-v1")
env.seed(42);

for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    total_rewards = sum(map(sum, all_rewards))                     
    print("\rIteration: {}, mean rewards: {:.1f}".format(          
        iteration, total_rewards / n_episodes_per_update), end="") 
    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                       discount_rate)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

env.close()

In [None]:
# Avaliar o desempenho do modelo treinado com o algoritmo REINFORCE
# Mudar a seed s para testar o comportamento em ambientes diferentes

s = 42

frames = render_policy_net(model, 200, s)
plot_animation(frames)