In [162]:
from IPython.display import clear_output
import sys

IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    # for saving videos
    !apt-get install ffmpeg

    !pip install gymnasium   # conferir se precisa

    # clone repository
    !git clone https://github.com/LucaLemos/UFRPE_AprendizagemReforco
    sys.path.append("/content/UFRPE_AprendizagemReforco")

    clear_output()
else:
    from os import path
    sys.path.append( path.dirname( path.dirname( path.abspath("__main__") ) ) )


In [163]:
import gymnasium as gym
import numpy as np
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

from util.qtable_helper import record_video_qtable, evaluate_qtable_policy
from util.notebook import display_videos_from_path


In [164]:
# Configuração do ambiente
ENV_NAME = "FrozenLake-v1"
env = gym.make(ENV_NAME, is_slippery=True, render_mode="rgb_array")

In [165]:
# Hiperparâmetros
GAMMA = 0.95  # Fator de desconto
ITERATIONS = 500  # Número de iterações do Fitted Q-Iteration
DATASET_SIZE = 10_000  # Tamanho do conjunto de dados (replay buffer)
BATCH_SIZE = 64  # Tamanho do batch para treinamento da rede neural
LEARNING_RATE = 1e-3  # Taxa de aprendizado para o otimizador

In [166]:
# Passo 1: Coletar um conjunto fixo de transições (Replay Buffer)
replay_buffer = deque(maxlen=DATASET_SIZE)
for _ in range(DATASET_SIZE):
    state, _ = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()  # Escolhe uma ação aleatória
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        replay_buffer.append((state, action, reward, next_state, done))
        state = next_state

In [167]:
# Passo 2: Definir a Rede Neural para Aproximação da Função Q
class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)  # Camada fully connected 1
        self.fc2 = nn.Linear(64, 64)         # Camada fully connected 2
        self.fc3 = nn.Linear(64, output_dim) # Camada de saída

    def forward(self, x):
        x = torch.relu(self.fc1(x))  # Função de ativação ReLU
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Inicializar a rede neural
input_dim = env.observation_space.n  # Número de estados
output_dim = env.action_space.n      # Número de ações
q_network = QNetwork(input_dim, output_dim)
optimizer = optim.Adam(q_network.parameters(), lr=LEARNING_RATE)
criterion = nn.MSELoss()  # Função de perda: Erro Quadrático Médio

# Função para converter estados em tensores one-hot
def to_one_hot(state, num_states):
    one_hot = np.zeros(num_states)
    one_hot[state] = 1
    return torch.FloatTensor(one_hot)


In [168]:
# Passo 3: Aplicar Fitted Q-Iteration (Offline RL)
for iteration in range(ITERATIONS):
    # Amostrar um batch de transições do replay buffer
    batch = random.sample(replay_buffer, BATCH_SIZE)
    states, actions, rewards, next_states, dones = zip(*batch)

    # Converter estados e próximos estados para one-hot
    states = torch.stack([to_one_hot(s, input_dim) for s in states])
    next_states = torch.stack([to_one_hot(ns, input_dim) for ns in next_states])
    actions = torch.LongTensor(actions)
    rewards = torch.FloatTensor(rewards)
    dones = torch.FloatTensor(dones)

    # Calcular os valores Q atuais
    current_q_values = q_network(states).gather(1, actions.unsqueeze(1)).squeeze()

    # Calcular os valores Q alvo (target)
    with torch.no_grad():
        next_q_values = q_network(next_states).max(1)[0]
        target_q_values = rewards + GAMMA * next_q_values * (1 - dones)

    # Calcular a perda e atualizar a rede
    loss = criterion(current_q_values, target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Exibir progresso
    if iteration % 50 == 0:
        print(f"Iteração {iteration}, Perda: {loss.item():.4f}")

Iteração 0, Perda: 0.0064
Iteração 50, Perda: 0.0008
Iteração 100, Perda: 0.0005
Iteração 150, Perda: 0.0009
Iteração 200, Perda: 0.0023
Iteração 250, Perda: 0.0095
Iteração 300, Perda: 0.0036
Iteração 350, Perda: 0.0033
Iteração 400, Perda: 0.0025
Iteração 450, Perda: 0.0031


In [169]:
# Passo 4: Extrair a Política Ótima
def extract_policy(q_network, num_states):
    policy = []
    for state in range(num_states):
        state_tensor = to_one_hot(state, num_states).unsqueeze(0)
        q_values = q_network(state_tensor)
        policy.append(torch.argmax(q_values).item())
    return policy

optimal_policy = extract_policy(q_network, input_dim)

In [170]:
# Passo 5: Avaliar a Política Aprendida
def evaluate_policy(env, policy, episodes=100):
    total_rewards = []
    for _ in range(episodes):
        state, _ = env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = policy[state]  # Segue a política aprendida
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            episode_reward += reward
            state = next_state
        total_rewards.append(episode_reward)
    return np.mean(total_rewards)

success_rate = evaluate_policy(env, optimal_policy) * 100
print(f"Taxa de sucesso da política aprendida: {success_rate:.2f}%")

Taxa de sucesso da política aprendida: 27.00%


In [171]:
# Passo 6: Implementar Q-Iteration (Programação Dinâmica)
def q_iteration(env, gamma=GAMMA, iterations=1000):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    Q = np.zeros((num_states, num_actions))  # Tabela Q inicializada com zeros

    for _ in range(iterations):
        Q_prev = Q.copy()
        for state in range(num_states):
            for action in range(num_actions):
                # Pega todas as transições possíveis para o par (state, action)
                transitions = env.unwrapped.P[state][action]
                q_value = 0
                for prob, next_state, reward, done in transitions:
                    # Atualiza o valor Q usando a equação de Bellman
                    q_value += prob * (reward + gamma * np.max(Q_prev[next_state]))
                Q[state][action] = q_value

        # Verifica convergência
        if np.max(np.abs(Q - Q_prev)) < 1e-6:
            break

    return Q

In [172]:
# Executar Q-Iteration
Q_table = q_iteration(env)
print("Tabela Q ótima calculada por Q-Iteration:")
print(Q_table)

# Extrair política ótima da tabela Q
optimal_policy_q_iteration = np.argmax(Q_table, axis=1)
print("Política ótima calculada por Q-Iteration:")
print(optimal_policy_q_iteration)

# Avaliar a política ótima do Q-Iteration
success_rate_q_iteration = evaluate_policy(env, optimal_policy_q_iteration) * 100
print(f"Taxa de sucesso da política aprendida por Q-Iteration: {success_rate_q_iteration:.2f}%")

Tabela Q ótima calculada por Q-Iteration:
[[0.18046009 0.17231765 0.17231765 0.16329372]
 [0.10614833 0.10574404 0.0976016  0.15474699]
 [0.15347004 0.14684237 0.14643808 0.13957259]
 [0.09056964 0.09056964 0.08394198 0.13254063]
 [0.20895664 0.15181126 0.14278732 0.1233147 ]
 [0.         0.         0.         0.        ]
 [0.17642697 0.12782831 0.17642697 0.04859865]
 [0.         0.         0.         0.        ]
 [0.15181126 0.20427963 0.18480701 0.27044895]
 [0.24681751 0.37464582 0.28900388 0.21347025]
 [0.4036687  0.34780027 0.28503101 0.17450613]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.27981326 0.39033815 0.50897584 0.34780027]
 [0.51816646 0.72367148 0.69032422 0.62233722]
 [0.         0.         0.         0.        ]]
Política ótima calculada por Q-Iteration:
[0 3 0 3 0 0 0 0 3 1 0 0 0 2 1 0]
Taxa de sucesso da política aprendida por Q-Iteration: 74.00%


In [173]:
# Gravar um vídeo da política treinada
record_video_qtable(ENV_NAME, Q_table, episodes=2, folder='videos/', prefix='q-iteration')

display_videos_from_path('videos/', prefix='q-iteration')