In [32]:
from game import GAME
import torch

import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.vec_env import DummyVecEnv

import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

import numpy as np

In [33]:
env = GAME(draw=True)  
observations = env.reset()

In [34]:
def rewards_function(information):
    colisoes = information['colisoes']
    bolas_caidas = information['bolas_caidas']
    perdeu = information['perdeu']
    ganhou = information['ganhou']
    joga_novamente = information['joga_novamente']
    bolas_jogador = information.get('bolas_jogador', [])
    bolas_adversario = information.get('bolas_adversario', [])
    winner = information.get('winner', None)
    penalizado = information.get('penalizado', False)

    rewards = 0

    if joga_novamente:
        rewards += 1

    if perdeu:
        rewards -= 1.5

    if ganhou:
        rewards += 1.5

    if penalizado:
        rewards -= 1

    return rewards

In [35]:
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        
        # Convolução para os dados de posição (x, y)
        self.conv_position = nn.Conv1d(
            in_channels=1, out_channels=16, kernel_size=2, stride=2
        )
        
        # Convolução para os dados de estado (se é do adversário e se está na mesa)
        self.conv_state = nn.Conv1d(
            in_channels=1, out_channels=16, kernel_size=2, stride=2
        )
        
        # Fully connected layers
        self.fc1 = nn.Linear(482, 128)  # Ajustar com base nas características concatenadas
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)

    def forward(self, x):
        # Garantir que o tensor de entrada seja do tamanho correto
        assert x.numel() == 62, f"Expected input of size 62, got {x.numel()} elements."

        # Separar dados de posição e estado
        position_data = x[0:60].reshape(-1, 4)[:, 0:2].reshape(1, 1, -1)  # Extrai (x, y) pares consecutivos
        state_data = x[0:60].reshape(-1, 4)[:, 2:4].reshape(1, 1, -1)  # Extrai (adversário, na mesa)
        white_ball_data = x[60:62]  # Dados da bola branca (x, y)

        # Aplicar convoluções
        position_features = torch.relu(self.conv_position(position_data))
        state_features = torch.relu(self.conv_state(state_data))

        # Flatten convoluções
        position_features = position_features.view(-1)  # Achatar
        state_features = state_features.view(-1)        # Achatar
        
        
        #print(f'Position features shape: {position_features.shape}')
        #print(f'State features shape: {state_features.shape}')

        # Concatenar características das convoluções e dados da bola branca
        combined_features = torch.cat([position_features, state_features, white_ball_data], dim=0)
        
        #print(f'Combined features shape: {combined_features.shape}')

        # Passar pelas fully connected layers
        x = torch.relu(self.fc1(combined_features))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [36]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.99  # Fator de desconto
        self.epsilon = 1.0  # Taxa de exploração
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.batch_size = 64

        self.model = DQN(state_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            # Exploração: escolher valores aleatórios dentro do intervalo permitido
            angle = np.random.uniform(0, 360)  # Ângulo contínuo entre 0° e 360°
            intensity = np.random.uniform(0, 1)  # Intensidade contínua entre 0 e 1
            #print(f"Explorando: Ângulo={angle:.2f}, Intensidade={intensity:.2f}")
            return np.array([angle, intensity])
            
        else:
            # Exploração: usar a política aprendida
            state = torch.FloatTensor(state)
            q_values = self.model(state)
            action = q_values.detach().numpy()  # Retorna o vetor [ângulo, intensidade]
            #print(f"Explotando: Ângulo={action[0]:.2f}, Intensidade={action[1]:.2f}")
            return action



    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        minibatch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                next_state = torch.FloatTensor(next_state).unsqueeze(0)
                target += self.gamma * torch.max(self.model(next_state).detach())

            state = torch.FloatTensor(state).unsqueeze(0)
            target_f = self.model(state)

            # Calculando a perda diretamente
            loss = self.criterion(target_f, torch.FloatTensor(action))  # Compara saída contínua com ação
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay



In [37]:
model_path = "dqn_agent_model6.pth"

agent = DQNAgent(state_size=62, action_size=2)

checkpoint = torch.load(model_path)
agent.model.load_state_dict(checkpoint['model_state_dict'])
agent.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
agent.epsilon = checkpoint['epsilon']  # Restaurar o epsilon atual

print("Modelo carregado com sucesso!")

Modelo carregado com sucesso!


  checkpoint = torch.load(model_path)


In [38]:
agent.epsilon = 0 # Desativar a exploração

In [None]:
print("Iniciando jogada", env.jogador_atual)

terminations = False 
is_me = False  

obs, _ = env.reset()


obs, information, terminations, rewards = env.step(
            (0, 10), 
            rewards_function=rewards_function
        )


while not terminations:
    
    if is_me:
        print("\n--- Seu Turno ---")
        while not env.iniciou_jogada:
            env.table.draw()

        obs, information, terminations, rewards = env.step(
            (env.iniciou_jogada_angulo, env.inicou_jogada_intensidade), 
            rewards_function=rewards_function
        )
        env.iniciou_jogada = False

        print("Recompensa da jogada:", rewards)
        
    else:
        while not env.iniciou_jogada:
            env.table.draw()
            
        obs_main = obs[0].numpy()  # Primeiro tensor (posição das bolas)
        obs_aux = obs[1].numpy()  # Segundo tensor (outros dados)
        combined_obs = np.concatenate([obs_main.flatten(), obs_aux.flatten()])

        action = agent.act(combined_obs)
        
        angulo, intensidade = action[0], action[1]
        
        while not env.iniciou_jogada:
            env.table.draw()
            
        obs, information, terminations, rewards = env.step((angulo, intensidade),
                                                        rewards_function=rewards_function)
        env.iniciou_jogada = False

        print("Recompensa da jogada:", rewards)
        print(f'Angulo: {angulo:.2f}, Intensidade: {intensidade:.2f}')
        
    #is_me = not is_me

Iniciando jogada 0
Recompensa da jogada: -1
Angulo: 166.19, Intensidade: 0.06
