In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import numpy as np
import random
import gymnasium as gym
from gymnasium.spaces import Box
from gymnasium.wrappers import FrameStackObservation, TimeLimit, ResizeObservation, RecordVideo, MaxAndSkipObservation
from collections import deque
import retro
import io
import time

In [None]:
RENDER_ENV = False
RESIZE_ENV = True
LOAD_MODEL = True
Render_Frame_rate=4
new_size = (84,120) #Original Size 320, 224
batch_size = 32
num_episodes = 200
max_episode_steps = 5400
num_stacked_frames = 4
num_frame_skip = 2
version = 2
prev_model = 'DQN-Sonic-V1-E15-S5400.pth'

In [67]:
class ButtonActionWrapper(gym.Wrapper):
    """
    A wrapper that maps discrete actions to a set of button presses for the game.
    This simplifies the action space for the agent.
    """
    def __init__(self, env, buttons):
        super().__init__(env)
        self.buttons = buttons
        # Create a mapping from a single action index to the full button array.
        self._actions = np.identity(len(buttons), dtype=np.int8)
        self.action_space = gym.spaces.Discrete(len(buttons))

    def step(self, action):
        return self.env.step(self._actions[action])

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        return obs, info

In [68]:
class CustomRewardWrapper(gym.RewardWrapper):
    """
    Custom reward shaping to encourage forward movement.
    This wrapper modifies the reward based on the agent's horizontal position.
    """
    def __init__(self, env, mov_rew=0.01, score_rew=0.05, hp_rew=20, ring_rew=1, end_bonus=100):
        super(CustomRewardWrapper, self).__init__(env)
        self.mov_rew = mov_rew
        self.score_rew = score_rew
        self.hp_rew = hp_rew
        self.ring_rew = ring_rew
        self.end_bonus = end_bonus

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        game_variables = self.env.unwrapped.data.lookup_all()

        self.previous_pos_x = game_variables['x']
        self.previous_score = game_variables['score']
        self.previous_lives = game_variables['lives']
        self.previous_rings = game_variables['rings']
        self.previous_end_bonus = game_variables['level_end_bonus']

        return obs, info

    def reward(self, reward):
        #print(f"Reward original: {reward}")
        custom_reward = reward
        game_state = self.env.unwrapped.data

        if game_state:
            game_variables = game_state.lookup_all()
            current_pos_x = game_variables['x']
            current_score = game_variables['score']
            current_lives = game_variables['lives']
            current_rings = game_variables['rings']
            current_end_bonus = game_variables['level_end_bonus']

            # moverse hacia la derecha
            if current_pos_x > self.previous_pos_x:
                custom_reward += self.mov_rew
            else:
                custom_reward -= self.mov_rew

            #Recompensa por puntaje
            if current_score > self.previous_score:
                custom_reward += self.score_rew*(current_score-self.previous_score)

            #Penalizacion por perder vida
            if current_lives > self.previous_lives:
                custom_reward += self.hp_rew*(current_lives-self.previous_lives)

            if current_rings > self.previous_rings:
                custom_reward += self.ring_rew*(current_rings-self.previous_rings)

            if current_rings > self.previous_rings:
                custom_reward += self.end_bonus

            self.previous_pos_x = current_pos_x
            self.previous_score = current_score
            self.previous_lives = current_lives
            self.previous_rings = current_rings
            self.previous_end_bonus = current_end_bonus


        return custom_reward

In [None]:
class ConvDQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(ConvDQN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_shape[0], 16, kernel_size=8, stride=4),
            nn.LeakyReLU(),
            nn.Conv2d(16, 32, kernel_size=4, stride=2),
            nn.LeakyReLU(),
            nn.Conv2d(32, 64, kernel_size=2, stride=1),
            nn.LeakyReLU()
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(self.calc_conv_output(input_shape), 512),
            nn.LeakyReLU(),
            nn.Linear(512, num_actions)
        )

    def calc_conv_output(self, shape):
        dummy_input = torch.zeros(1, *shape)
        dummy_output = self.conv_layers(dummy_input)
        return int(np.prod(dummy_output.size()))

    def forward(self, x):
        conv_out = self.conv_layers(x).view(x.size()[0], -1)
        return self.fc_layers(conv_out)

In [None]:
def preprocess(state):
    # Convert the numpy array (Stack, Height, Width, Channels) to a PyTorch tensor
    state = torch.tensor(state, dtype=torch.float32)
    # Permute dimensions from (Stack, Height, Width, Channels) to (Stack, Channels, Height, Width)
    state = state.permute(0, 3, 1, 2)
    # Reshape to (Stack * Channels, Height, Width)
    state = state.reshape(-1, state.shape[2], state.shape[3])
    return state

In [None]:
class ConvDQNAgent:
    def __init__(self, input_shape, num_actions, lr, gamma, epsilon, epsilon_decay, buffer_size):
        self.input_shape = input_shape # input_shape is (Stack, Height, Width) or (Stack, Height, Width, Channels)
        self.num_actions = num_actions
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.memory = deque(maxlen=buffer_size)
        # Correct the input shape calculation for ConvDQN to be (Stack * Channels, Height, Width)
        # Assuming input_shape is (Stack, Height, Width) and images are RGB (3 channels)
        num_channels = 3 # Assuming RGB images
        # The input shape from the environment after wrappers is (Stack, Height, Width, Channels)
        # We need to transform it to (Stack * Channels, Height, Width) for the ConvDQN
        conv_input_shape = (input_shape[0] * num_channels, *input_shape[1:])

        # If the original input_shape included channels, we need to adjust
        # Assuming input_shape is (Stack, Height, Width, Channels)
        if len(input_shape) == 4:
             # input_shape is (Stack, Height, Width, Channels)
             # We want (Stack * Channels, Height, Width)
             conv_input_shape = (input_shape[0] * input_shape[3], input_shape[1], input_shape[2])

        self.device = 'cuda' #if torch.cuda.is_available() else 'cpu'
        self.model = ConvDQN(conv_input_shape, num_actions).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.num_actions)
        q_values = self.model(state)
        return torch.argmax(q_values).item()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                next_state = preprocess(next_state).unsqueeze(0).to(self.device) # Move to the correct device
                target = reward + self.gamma * torch.max(self.model(next_state)).item()
            current_q_values = self.model(state)
            target_f = current_q_values.clone().detach()
            target_f[0][action] = target

            self.optimizer.zero_grad()
            loss = nn.MSELoss()(target_f, current_q_values)
            loss.backward()
            self.optimizer.step()
        if self.epsilon > 0.01:
            self.epsilon *= self.epsilon_decay

In [71]:
try:
    env.close()
except:
    print('no enviroment to close')

In [None]:
env = retro.make(game="SonicTheHedgehog-Genesis", render_mode='rgb_array')
env = ButtonActionWrapper(env, buttons=['LEFT', 'RIGHT', 'A'])
env = CustomRewardWrapper(env)
env = MaxAndSkipObservation(env, skip=num_frame_skip)
if RESIZE_ENV:
  input_shape = (num_stacked_frames, *new_size)
  env = ResizeObservation(env, new_size)
else:
  input_shape = (num_stacked_frames, 320, 224)
env = TimeLimit(env, max_episode_steps=max_episode_steps)
env = FrameStackObservation(env, stack_size=num_stacked_frames)
# env = RecordVideo(
#     env,
#     video_folder='/content/drive/MyDrive/Video_IA',    # Folder to save videos
#     name_prefix="eval",               # Prefix for video filenames
#     episode_trigger=lambda x: True    # Record every episode
# )
action_dim = env.action_space.n
print(action_dim)
agent = ConvDQNAgent(input_shape, action_dim, lr=0.001, gamma=0.99, epsilon=1.0, epsilon_decay=0.9955, buffer_size=10000)
if LOAD_MODEL:
  agent.model.state_dict(torch.load('Saved_Models/DQN/'+prev_model, map_location=agent.device))

for episode in range(num_episodes):
  state, info = env.reset()
  state = preprocess(observation).unsqueeze(0).to(agent.device)
  total_reward = 0
  done = False
  frame_count = 0
  while not done:
    frame_count += 1
    action = agent.act(state = state)
    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    agent.remember(state, action, reward, observation, done)
    state = preprocess(observation).unsqueeze(0).to(agent.device)
    total_reward += reward
    agent.replay(batch_size)
    if frame_count % 5 == 0:
      print(f'step n={frame_count} with reward {reward}')
  print(f"Episode: {episode + 1}, Total Reward: {total_reward}")
env.close()
print(f"Episode finished with total reward: {total_reward}")

3
step n=5 with reward -0.02
step n=10 with reward -0.02
step n=15 with reward -0.02
step n=20 with reward -0.02
step n=25 with reward -0.02
step n=30 with reward -0.02


  agent.model.state_dict(torch.load('Saved_Models/DQN/'+prev_model, map_location=agent.device))


step n=35 with reward -0.02
step n=40 with reward 0.0
step n=45 with reward 0.0
step n=50 with reward 0.0
step n=55 with reward 0.0
step n=60 with reward 0.0
step n=65 with reward -0.02
step n=70 with reward -0.02
step n=75 with reward -0.02
step n=80 with reward -0.02
step n=85 with reward -0.02
step n=90 with reward -0.02
step n=95 with reward -0.02
step n=100 with reward -0.02
step n=105 with reward -0.02
step n=110 with reward -0.02
step n=115 with reward 0.02
step n=120 with reward 0.0
step n=125 with reward 0.0
step n=130 with reward 0.02
step n=135 with reward -0.02
step n=140 with reward -0.02
step n=145 with reward -0.02
step n=150 with reward -0.02
step n=155 with reward -0.02
step n=160 with reward -0.02
step n=165 with reward -0.02
step n=170 with reward -0.02
step n=175 with reward -0.02
step n=180 with reward -0.02
step n=185 with reward -0.02
step n=190 with reward 0.0
step n=195 with reward 0.0
step n=200 with reward 0.0
step n=205 with reward 0.0
step n=210 with reward

KeyboardInterrupt: 

In [None]:
#Guardar Modelo
model_save_path = f'Saved_Models/DQN/DQN-Sonic-V{version}-E{episode}-S{max_episode_steps}.pth'
try:
 torch.save(agent.model.state_dict(), model_save_path)
 print(f'Modelo exitosamente guardado en {model_save_path}')
except Exception as e:
 print(f'Error guardando el modelo error: {e}')

Modelo exitosamente guardado en Saved_Models/DQN/DQN-Sonic-V1-E15-S5400.pth


In [None]:
env = RecordVideo(
    env,
    video_folder='/content/drive/MyDrive/Video_IA',    # Folder to save videos
    name_prefix="eval",               # Prefix for video filenames
    episode_trigger=lambda x: True    # Record every episode
)
episode = 3
for _ in range(episode):
    obs, info = env.reset()
    done = False
    total_reward = 0
    episode+=1
    while not done:
        action, _ = agent.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        #print(f"Reward: {reward}")
        total_reward += reward

    print(f"Episode: {episode} Reward: {total_reward}")

  logger.warn(


AttributeError: 'RetroEnv' object has no attribute 'em'

In [None]:
#Cargar Modelo
