In [5]:
!pip install torch torchvision gymnasium[atari] numpy scikit-image torchsummary ale_py





## Escala RGB

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gymnasium as gym
import time
import random
from collections import deque
import math
from skimage.transform import resize
from torch.utils.tensorboard import SummaryWriter

#imprimir se a gpu esta disponivel
print(torch.cuda.is_available())

gym.envs.registration.registry.keys()

True


dict_keys(['CartPole-v0', 'CartPole-v1', 'MountainCar-v0', 'MountainCarContinuous-v0', 'Pendulum-v1', 'Acrobot-v1', 'phys2d/CartPole-v0', 'phys2d/CartPole-v1', 'phys2d/Pendulum-v0', 'LunarLander-v3', 'LunarLanderContinuous-v3', 'BipedalWalker-v3', 'BipedalWalkerHardcore-v3', 'CarRacing-v3', 'Blackjack-v1', 'FrozenLake-v1', 'FrozenLake8x8-v1', 'CliffWalking-v0', 'Taxi-v3', 'tabular/Blackjack-v0', 'tabular/CliffWalking-v0', 'Reacher-v2', 'Reacher-v4', 'Reacher-v5', 'Pusher-v2', 'Pusher-v4', 'Pusher-v5', 'InvertedPendulum-v2', 'InvertedPendulum-v4', 'InvertedPendulum-v5', 'InvertedDoublePendulum-v2', 'InvertedDoublePendulum-v4', 'InvertedDoublePendulum-v5', 'HalfCheetah-v2', 'HalfCheetah-v3', 'HalfCheetah-v4', 'HalfCheetah-v5', 'Hopper-v2', 'Hopper-v3', 'Hopper-v4', 'Hopper-v5', 'Swimmer-v2', 'Swimmer-v3', 'Swimmer-v4', 'Swimmer-v5', 'Walker2d-v2', 'Walker2d-v3', 'Walker2d-v4', 'Walker2d-v5', 'Ant-v2', 'Ant-v3', 'Ant-v4', 'Ant-v5', 'Humanoid-v2', 'Humanoid-v3', 'Humanoid-v4', 'Humanoid-v5

In [7]:
class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),  # Input channels adjusted for RGB (3 or 4*3 if frame stacking).
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        self.fc = nn.Sequential(
            nn.Linear(self.feature_size(input_shape), 512),
            nn.ReLU(),
            nn.Linear(512, num_actions)
        )

    def feature_size(self, input_shape):
        return self.conv(torch.zeros(1, *input_shape)).view(1, -1).size(1)

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [8]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (torch.FloatTensor(np.array(states)),
                torch.LongTensor(np.array(actions)),
                torch.FloatTensor(np.array(rewards)),
                torch.FloatTensor(np.array(next_states)),
                torch.FloatTensor(np.array(dones)))

    def __len__(self):
        return len(self.memory)

In [9]:
class DQNAgent:
    def __init__(self, state_shape, num_actions, learning_rate=0.00025, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.9995, batch_size=32, memory_size=50000, update_target_freq=100):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = DQN(state_shape, num_actions).to(self.device)
        self.target_model = DQN(state_shape, num_actions).to(self.device)
        self.target_model.load_state_dict(self.model.state_dict())  # Initialize target network with the same weights
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.memory = ReplayBuffer(memory_size)
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size
        self.update_target_freq = update_target_freq  # Frequency to update target network
        self.train_step = 0
        self.num_actions = num_actions

    def choose_action(self, state):
        if random.random() < self.epsilon:
            return random.randrange(self.num_actions)  # Explore
        else:
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            q_values = self.model(state)
            return q_values.argmax().item()  # Exploit

    def learn(self):
        if len(self.memory) < self.batch_size:
            return  # Not enough samples in memory

        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        states = states.to(self.device)
        actions = actions.unsqueeze(1).to(self.device)  # Reshape for gather
        rewards = rewards.to(self.device)
        next_states = next_states.to(self.device)
        dones = dones.to(self.device)

        # Q-values for current states
        q_values = self.model(states).gather(1, actions)

        # Q-values for next states using target network
        next_q_values = self.target_model(next_states).max(1)[0].unsqueeze(1)

        # Target Q-values
        target_q_values = rewards.unsqueeze(1) + self.gamma * next_q_values * (1 - dones.unsqueeze(1))

        # Loss calculation
        loss = nn.MSELoss()(q_values, target_q_values)

        # Optimization step
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Update train step and target network
        self.train_step += 1
        if self.train_step % self.update_target_freq == 0:
            self.target_model.load_state_dict(self.model.state_dict())

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [10]:
class MCTSNode:
    def __init__(self, state, parent=None, action_taken=None):
        self.state = state
        self.parent = parent
        self.action_taken = action_taken
        self.children = {}  # Dictionary of child nodes
        self.visits = 0
        self.value = 0

class HybridDQNMCTS:
    def __init__(self, dqn_agent, env, num_simulations=50, exploration_constant=1.4):
        self.dqn_agent = dqn_agent
        self.env = env
        self.num_simulations = num_simulations  # Number of MCTS simulations
        self.exploration_constant = exploration_constant  # C in UCT formula

    def get_action(self, state):
        root = MCTSNode(state)

        for _ in range(self.num_simulations):
            node = root
            sim_env = gym.make(self.env.spec.id, render_mode=None) # Create a new simulation environment for each simulation
            sim_state, _ = sim_env.reset()
            sim_state = preprocess_state(sim_state)

            # Selection
            while node.children:
                if len(node.children) < self.env.action_space.n:  # Check for unexplored actions
                    action = self._expand(node, sim_state)
                    if action is None: # Handle the edge case of fully expanded node.
                        break

                    sim_state, reward, terminated, truncated, _ = sim_env.step(action)
                    sim_state = preprocess_state(sim_state)
                    done = terminated or truncated
                    child = MCTSNode(sim_state, node, action)
                    node.children[action] = child
                    node = child
                    break
                else:
                    action = self._select_uct(node)
                    sim_state, reward, terminated, truncated, _ = sim_env.step(action)
                    sim_state = preprocess_state(sim_state)
                    done = terminated or truncated
                    node = node.children[action]

            # Simulation (Rollout)
            value = self._simulate(sim_state, sim_env)

            # Backpropagation
            while node:
                node.visits += 1
                node.value += value
                node = node.parent
            sim_env.close()

        # Choose the action with the most visits
        return max(root.children.items(), key=lambda x: x[1].visits)[0]

    def _expand(self, node, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.dqn_agent.device)
        q_values = self.dqn_agent.model(state_tensor)[0].cpu().detach().numpy()
        untried_actions = [a for a in range(self.env.action_space.n) if a not in node.children]

        if not untried_actions:
            return None

        # Prioritize actions based on DQN Q-values
        action = max(untried_actions, key=lambda a: q_values[a])
        return action

    def _select_uct(self, node):
        log_parent_visits = math.log(node.visits)

        def uct_value(child):
            exploitation = child.value / child.visits if child.visits > 0 else 0
            exploration = math.sqrt(log_parent_visits / (child.visits + 1e-10))
            return exploitation + self.exploration_constant * exploration

        return max(node.children.items(), key=lambda x: uct_value(x[1]))[0]

    def _simulate(self, state, sim_env):
        value = 0
        discount = 1.0
        max_steps = 100  # Limit simulation steps
        sim_state = state

        for _ in range(max_steps):
            state_tensor = torch.FloatTensor(sim_state).unsqueeze(0).to(self.dqn_agent.device)
            q_values = self.dqn_agent.model(state_tensor)[0].cpu().detach().numpy()
            action = np.argmax(q_values) # Use DQN for action selection during simulation
            sim_state, reward, terminated, truncated, _ = sim_env.step(action)
            sim_state = preprocess_state(sim_state)
            done = terminated or truncated

            value += discount * reward
            discount *= self.dqn_agent.gamma

            if done:
                break
        return value

class HybridAgent:
    def __init__(self, state_shape, num_actions, env):
        self.dqn_agent = DQNAgent(state_shape, num_actions)
        self.mcts = HybridDQNMCTS(self.dqn_agent, env)
        self.training_mode = True  # Toggle between training (DQN) and evaluation (MCTS)

    def choose_action(self, state):
        if self.training_mode:
            return self.dqn_agent.choose_action(state)  # DQN action selection during training
        else:
            return self.mcts.get_action(state)  # MCTS action selection during evaluation

    def learn(self):
        return self.dqn_agent.learn()  # DQN learning

    def save(self, filepath):
        torch.save(self.dqn_agent.model.state_dict(), filepath)

    def load(self, filepath):
        self.dqn_agent.model.load_state_dict(torch.load(filepath))
        self.dqn_agent.target_model.load_state_dict(torch.load(filepath))  # Load into target model as well

In [None]:
def train_agent(env, agent, episodes=1000, save_freq=20):
    try:
        total_steps = 0
        for episode in range(episodes):
            state, _ = env.reset()
            state = preprocess_state(state)

            # Frame Stacking
            stacked_frames = deque([state] * 4, maxlen=4)
            state = np.transpose(state, (2, 0, 1))  # (H, W, C) -> (C, H, W)
            stacked_frames = deque([state] * 4, maxlen=4)
            state = np.concatenate(stacked_frames, axis=0)  # Concatenate along channel axis


            episode_reward = 0
            episode_steps = 0
            start_time = time.time()
            done = False

            while not done:
                action = agent.choose_action(state)
                next_state, reward, terminated, truncated, _ = env.step(action)
                next_state = preprocess_state(next_state)

                # Frame Stacking - Update
                next_state = np.transpose(next_state, (2, 0, 1))
                stacked_frames.append(next_state)
                next_state = np.concatenate(stacked_frames, axis=0)

                done = terminated or truncated
                agent.dqn_agent.memory.push(state, action, reward, next_state, done)  # Store transition in replay buffer
                agent.learn() # Train the DQN

                state = next_state
                episode_reward += reward
                episode_steps += 1
                total_steps += 1

            episode_time = time.time() - start_time
            steps_per_second = episode_steps / episode_time

            print(f"Episode {episode + 1}/{episodes}, Reward: {episode_reward}, Steps: {episode_steps}, Time: {episode_time:.2f}s, Steps/s: {steps_per_second:.2f}, Epsilon: {agent.dqn_agent.epsilon:.3f}")

            if (episode + 1) % save_freq == 0:
                agent.save(f'dqn_model_episode_{episode + 1}.pth')
        return agent
    except Exception as e:
        print(f"Error during training: {e}")
        raise

def preprocess_state(state):
    """Resize and normalize the state."""
    state = resize(state, (84, 84), anti_aliasing=True)
    state = state.astype(np.float32)
    return state

## Rodando o treinamento

#### Rodando o treinamento com a escala RGB

In [None]:
from ale_py import ALEInterface
from torchsummary import summary

ale = ALEInterface()

if __name__ == "__main__":
    env = gym.make('ALE/SpaceInvaders-v5', render_mode="rgb_array")
    state_shape = (12, 84, 84)  # 4 frames * 3 channels (RGB)
    num_actions = env.action_space.n

    hybrid_agent = HybridAgent(state_shape, num_actions, env)
    # Resumo do modelo
    summary(hybrid_agent.dqn_agent.model, state_shape, device="cuda" if torch.cuda.is_available() else "cpu")
    
    train_agent(env, hybrid_agent, episodes=1000)
    env.close()

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 20, 20]          24,608
              ReLU-2           [-1, 32, 20, 20]               0
            Conv2d-3             [-1, 64, 9, 9]          32,832
              ReLU-4             [-1, 64, 9, 9]               0
            Conv2d-5             [-1, 64, 7, 7]          36,928
              ReLU-6             [-1, 64, 7, 7]               0
            Linear-7                  [-1, 512]       1,606,144
              ReLU-8                  [-1, 512]               0
            Linear-9                    [-1, 6]           3,078
Total params: 1,703,590
Trainable params: 1,703,590
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.32
Forward/backward pass size (MB): 0.33
Params size (MB): 6.50
Estimated Total Size (MB): 7.15
---------------------------------------

## Rodando o modelo treinado

In [None]:
import torch
import gymnasium as gym
import numpy as np
import time
from collections import deque
from skimage.transform import resize


class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        self.fc = nn.Sequential(
            nn.Linear(self.feature_size(input_shape), 512),
            nn.ReLU(),
            nn.Linear(512, num_actions)
        )

    def feature_size(self, input_shape):
        return self.conv(torch.zeros(1, *input_shape)).view(1, -1).size(1)

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

class Agent:  # Simplified agent for inference
    def __init__(self, state_shape, num_actions, epsilon=0.05):  # Add epsilon parameter
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = DQN(state_shape, num_actions).to(self.device)
        self.epsilon = epsilon  # Store epsilon
        self.num_actions = num_actions

    def load(self, filepath):
        self.model.load_state_dict(torch.load(filepath, map_location=self.device))
        self.model.eval()  # Set to evaluation mode

    def choose_action(self, state):
        if random.random() < self.epsilon:  # Epsilon-greedy
            return random.randrange(self.num_actions)
        else:
            with torch.no_grad():  # No need to track gradients during inference
                state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
                q_values = self.model(state)
                return q_values.argmax().item()

def preprocess_state(state):
    """Resize and normalize the RGB state."""
    state = resize(state, (84, 84), anti_aliasing=True)  # Keep RGB channels
    state = state.astype(np.float32)
    return state
# --- Inference Function ---

def run_agent(env, agent, model_path, episodes=10):
    agent.load(model_path)  # Load the trained model
    total_rewards = []

    for episode in range(episodes):
        state, _ = env.reset()
        state = preprocess_state(state)

        # Frame Stacking (RGB) - Consistent with training
        state = np.transpose(state, (2, 0, 1))  # (H, W, C) -> (C, H, W)
        stacked_frames = deque([state] * 4, maxlen=4)
        state = np.concatenate(stacked_frames, axis=0)

        done = False
        episode_reward = 0
        episode_steps = 0
        start_time = time.time()

        while not done:
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            next_state = preprocess_state(next_state)

            # Update frame stack (RGB) - Consistent
            next_state = np.transpose(next_state, (2, 0, 1))  # (H, W, C) -> (C, H, W)
            stacked_frames.append(next_state)
            state = np.concatenate(stacked_frames, axis=0)

            done = terminated or truncated
            episode_reward += reward
            episode_steps += 1

            env.render()  # Render the environment (human mode)


        episode_time = time.time() - start_time
        steps_per_second = episode_steps / episode_time

        total_rewards.append(episode_reward)
        print(f"Episode {episode + 1}/{episodes}, Reward: {episode_reward}, Steps: {episode_steps}, Time: {episode_time:.2f}s, Steps/s: {steps_per_second:.2f}")

    env.close()
    avg_reward = np.mean(total_rewards)
    std_reward = np.std(total_rewards)
    print(f"\nAverage Reward over {episodes} episodes: {avg_reward:.2f}")
    print(f"Standard Deviation of Rewards: {std_reward:.2f}")


In [41]:
if __name__ == "__main__":
    env = gym.make('ALE/SpaceInvaders-v5', render_mode="human") #Human render mode.
    state_shape = (12, 84, 84)  # 4 frames * 3 channels (RGB)
    num_actions = env.action_space.n

    agent = Agent(state_shape, num_actions)
    model_file = "dqn_model_episode_40.pth"  # Replace with your model's filename
    run_agent(env, agent, model_file, episodes=5)

Episode 1/5, Reward: 15.0, Steps: 255, Time: 9.89s, Steps/s: 25.77
Episode 2/5, Reward: 300.0, Steps: 941, Time: 36.19s, Steps/s: 26.00
Episode 3/5, Reward: 430.0, Steps: 972, Time: 37.66s, Steps/s: 25.81
Episode 4/5, Reward: 115.0, Steps: 531, Time: 25.40s, Steps/s: 20.91
Episode 5/5, Reward: 115.0, Steps: 516, Time: 20.37s, Steps/s: 25.34

Average Reward over 5 episodes: 195.00
Standard Deviation of Rewards: 149.30
