In [1]:
import copy
import gym
import torch

import numpy as np
import torch.nn.functional as F
import random

from collections import deque, namedtuple
from IPython.display import HTML
from base64 import b64encode

from torch import Tensor, nn
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from torch.optim import AdamW



from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning import LightningModule, Trainer
from lightning.pytorch.loggers import TensorBoardLogger

from gym.wrappers import RecordVideo, RecordEpisodeStatistics, TimeLimit

In [2]:
def display_video(episode=0):
  video_file = open(f'/content/videos/rl-video-episode-{episode}.mp4', "r+b").read()
  video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
  return HTML(f"<video width=600 controls><source src='{video_url}'></video>")

In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print(device)

- Create Deep Q Learning

In [4]:
class DQN (nn.Module):
    def __init__(self, n_actions, hidden_size, obs_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size), # Camada linear com 4 entradas e 8 saídas
            nn.ReLU(), # Função de ativação ReLU
            nn.Linear(hidden_size, hidden_size),# Camada linear com 4 entradas e 8 saídas
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions),
        )

    def forward(self,x):
        return self.net(x.float())

- Create a Policy

In [5]:
## Create a Policy
#  técnica amplamente usada em Reinforcement Learning (RL) para balancear exploração e exploração durante o treinamento de agentes.
def epsilon_greedy(state, env, net, epsilon=0.0):
    if np.random.random() < epsilon:
        action = env.action_space.sample()  # Escolhe uma ação aleatória
    else:
        state = torch.tensor([state]).to(device)  # Converte o estado para tensor e move para o dispositivo (CPU/GPU)
        q_values = net(state)  # Calcula os valores Q(s, a) usando a rede neural
        _, action = torch.max(q_values, dim=1)  # Seleciona a ação com o maior valor Q
        action = int(action.item())  # Converte a ação para um inteiro
    return action  # Retorna a ação escolhida

- Create Buffer replay

In [6]:
#Create Buffer replay
#O principal motivo do uso do Replay Buffer em algoritmos de Reinforcement Learning (RL), especialmente no Deep Q-Learning (DQN), 
#é quebrar a correlação entre as amostras de treinamento, 
#o que leva a um aprendizado mais estável e eficiente. 
#Ele também permite o reaproveitamento de experiências, reduzindo a necessidade de interagir continuamente com o ambiente.

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)
    
    def append(self, experience):
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

In [7]:
class RLDataset(IterableDataset):
    def __init__(self, buffer, sample_size = 200): 
        self.buffer = buffer
        self.sample_size = sample_size
    

    def __iter__(self):
        for experience in self.buffer.sample(self.sample_size):
            yield experience

In [8]:
# ##Create Enviroment

# def create_enviroment(name):
#     env = gym.make(name,  render_mode="rgb_array")
#     env = RecordVideo(env, video_folder='./videos')
#     return env


# env = create_enviroment("LunarLander-v2")

# # Reset the environment to start
# state = env.reset()
# # Run for 1000 timesteps
# for _ in range(1000):
#       # Render the environment
#     action = env.action_space.sample()  # Take a random action
#     # print("Action taken:", action)

#     # Do this action in the environment and get
#     # next_state, reward, done and info
#     _, observation, reward, done, info = env.step(action)
#     # print('Observation Space: ', observation)
#     # print('Reward: ', reward)

#     # If the episode is done (CartPole has fallen), reset the environment
#     if done:
#         state = env.reset()

# env.close()  # Close the rendering window




In [None]:
import gym

def create_enviroment(name):
    env = gym.make(name,  render_mode="rgb_array")
    env = RecordVideo(env, video_folder='./videos', episode_trigger=lambda x : x % 50 == 0)
    env = RecordEpisodeStatistics(env)
    env = TimeLimit(env, max_episode_steps=400)
    return env

env = create_enviroment("LunarLander-v2")

env.reset()

for _ in range(10000):

   action = env.action_space.sample()

   print(action)

   observation, reward, terminated, truncated, info = env.step(action)

   if terminated or truncated:
      break
      

env.reset()
#observation, info = env.reset()
env.close()


In [10]:
class DeepQLearning(LightningModule):

    def __init__(self, 
                    env_name,                # Nome do ambiente (ex.: "LunarLander-v2"). Define o ambiente com o qual o agente interage.
                    policy=epsilon_greedy,   # Política usada para seleção de ações. O padrão é `epsilon-greedy`.
                    capacity=10_000,         # Capacidade máxima do Replay Buffer. Limita o número de transições armazenadas.
                    batch_size=256,          # Tamanho do batch para treinamento. Define quantas transições são amostradas por iteração de treinamento.
                    lr=1e-3,                 # Taxa de aprendizado (learning rate) usada pelo otimizador.
                    hidden_size=128,         # Tamanho das camadas ocultas na arquitetura da rede Q.
                    gamma=0.99,              # Fator de desconto para o cálculo de recompensa acumulada futura. Valores próximos de 1 dão mais peso a recompensas futuras.
                    loss_fn=F.smooth_l1_loss,# Função de perda usada para treinar a rede Q. O padrão é Smooth L1 Loss (Huber Loss).
                    optim=AdamW,             # Otimizador usado para atualizar os pesos da rede Q. O padrão é AdamW.
                    eps_start=1.0,           # Valor inicial de epsilon na política epsilon-greedy (exploração máxima).
                    eps_end=0.15,            # Valor final de epsilon na política epsilon-greedy (exploração mínima).
                    eps_last_episode=1000,    # Número de episódios até que o valor de epsilon decresça de `eps_start` para `eps_end`.
                    samples_per_epoch=10_000,# Número de amostras a serem coletadas por época para o Replay Buffer.
                    sync_rate=10):           # Taxa de sincronização (em épocas) entre a rede Q e a Target Q Network.
 
        super().__init__()
        self.env = create_enviroment(env_name)

        obs_size = self.env.observation_space.shape[0]
        n_actions = self.env.action_space.n

        self.q_net = DQN(hidden_size, obs_size, n_actions)
        self.target_q_net = copy.deepcopy(self.q_net)

        self.policy = policy
        self.buffer = ReplayBuffer(capacity=capacity)

        self.save_hyperparameters()

        # Fill the experience buffer

        while len(self.buffer) < self.hparams.samples_per_epoch:
            print(f"{len(self.buffer)} Samples in experience buffer. Filling...")
            self.play_episode(epsilon=self.hparams.eps_start)

    @torch.no_grad()
    def play_episode(self, policy=None, epsilon=0.):
        state = self.env.reset()
        done = False  # Initialize done properly

        while not done:
            if policy:
                action = policy(state, self.env, self.q_net, epsilon=epsilon)
            else:
                action = self.env.action_space.sample()

            next_state, reward, terminated, truncated, info = self.env.step(action)

            done = terminated or truncated
        
            exp = (state, action, reward, done, next_state)
            self.buffer.append(exp)
            self.state = next_state



    # Forward
    def forward(self, x):
        return self.q_net(x)

    # Configure optimizers
    def configure_optimizers(self):
        q_net_optimizer = self.hparams.optim(self.q_net.parameters(), lr=self.hparams.lr)
        return [q_net_optimizer]

    # Create dataloader
    def train_dataloader(self):
        dataset = RLDataset(self.buffer,self.hparams.samples_per_epoch)
        dataloader = DataLoader(
            dataset=dataset,
            batch_size=self.hparams.batch_size
        )
        return dataloader

    # Training step

    def training_step(self, batch, batch_idx):
        states, actions, rewards, dones, next_states = batch
        action = actions.unsqueeze(1)  
        rewards = rewards.unsqueeze(1)
        dones = dones.unsqueeze(1)

        state_action_value = self.q_net(states).gather(1,action)

        next_action_values = self.target_q_net(next_states.max(dim=1, keepdim=True))

        next_action_values[dones] = 0.0


        expected_state_action_values = rewards + self.hparams.gamma * next_action_values

        loss = self.hparams.loss_fn(state_action_value, expected_state_action_values)

        self.log('episode/Q-Error', loss)

        return loss

    # Training epoch end

    def on_train_epoch_end(self):
        epsilon = max(

            self.hparams.eps_end,
            self.hparams.eps_start - self.current_epoch / self.eps_last_episode
        )

        self.play_episode(policy=self.policy, epsilon=epsilon)

        self.log('episode/Return' , self.env.return_queue[-1])

        if self.current_epoch % self.hparams.sync_rate  == 0:
            self.target_q_net.load_state_dict(self.q_net.state_dict())



    

In [None]:
import os
import shutil

# Caminho relativo para a pasta que você quer remover
relative_path = "./videos"

# Verificar se a pasta existe antes de tentar removê-la
if os.path.exists(relative_path):
    shutil.rmtree(relative_path)  # Remove a pasta e todo o conteúdo
    print(f"Pasta '{relative_path}' removida com sucesso!")

In [None]:
algo = DeepQLearning('LunarLander-v2',
                      #capacity=10_000, 
                      #samples_per_epoch=900_000
                      )

trainer = Trainer(
    #gpus = 1,
    max_epochs=100_000,
    callbacks=[EarlyStopping(monitor='episode/Return', mode='max', patience = 500)]
)

trainer.fit(algo)

In [14]:
# logger = TensorBoardLogger("tb_logs", name="my_model")
# trainer = Trainer(logger=logger)