In [1]:
import warnings
warnings.filterwarnings('ignore')
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT #Importing simple controls

In [2]:
#Creación del entorno
env = gym_super_mario_bros.make('SuperMarioBros-v0')

In [3]:
#Random action
acts = env.get_action_meanings()
print(acts)

#Input space shape
shp = env.observation_space.shape
print(shp)


['NOOP']
(240, 256, 3)


### Preparamos los wrappers para el preprocesamiento del entorno

In [4]:
import numpy as np
import gym
import collections 

In [5]:
#Wrappers de gym para preprocesar el entorno
#Taken from: https://console.paperspace.com/ml-showcase/notebook/rcrd0w769nip72j?file=mario_notebook.ipynb

class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env=None, skip=4):
        """Return only every `skip`-th frame"""
        super(MaxAndSkipEnv, self).__init__(env)
        # most recent raw observations (for max pooling across time steps)
        self._obs_buffer = collections.deque(maxlen=2)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        done = None
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break
        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, done, info

    def reset(self):
        """Clear past frame buffer and init to first obs"""
        self._obs_buffer.clear()
        obs = self.env.reset()
        self._obs_buffer.append(obs)
        return obs


class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        old_shape = self.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]),
                                                dtype=np.float32)

    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)


class ScaledFloatFrame(gym.ObservationWrapper):
    """Normalize pixel values in frame --> 0 to 1"""
    def observation(self, obs):
        return np.array(obs).astype(np.float32) / 255.0


class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps, dtype=np.float32):
        super(BufferWrapper, self).__init__(env)
        self.dtype = dtype
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),
                                                old_space.high.repeat(n_steps, axis=0), dtype=dtype)

    def reset(self):
        self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
        return self.observation(self.env.reset())

    def observation(self, observation):
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer


### Preprocesamiento del entorno

In [6]:
from gym.wrappers import ResizeObservation, GrayScaleObservation

In [7]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')

def wrap_env(env):
    env = MaxAndSkipEnv(env)
    env = GrayScaleObservation(env, keep_dim=True) #Conversion a escala de grises
    env = ResizeObservation(env, 84) #Ajuste del tamaño de la imagen
    env = ImageToPyTorch(env) #Conversión a tensores de PyTorch
    env = BufferWrapper(env, 4) #Buffer de 4 frames
    env = ScaledFloatFrame(env) #Reescalado del frame de 0 a 1
    return JoypadSpace(env, SIMPLE_MOVEMENT) #Addición de controles simples

In [8]:
env = wrap_env(env)

#Acciones posibles
acts = env.get_action_meanings()
print(acts)

#Tamaño del espacio de observación
shp = env.observation_space.shape
print(shp)

['NOOP', 'right', 'right A', 'right B', 'right A B', 'A', 'left']
(4, 84, 84)


In [9]:
print('Actions availables in the game: ', env.action_space.n)

Actions availables in the game:  7


## Neuronal Network Architecture

In [10]:
import torch
import torch.nn as nn
from torchsummary import summary

In [11]:
#Definción de la arquitectura de la red neuronal (aproximación de deepmind)
def make_DQN(input_shape, n_actions, summary_=True):
    net  = nn.Sequential(
        nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
        nn.ReLU(),
        nn.Conv2d(32, 64, kernel_size=4, stride=2),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size=3, stride=1),
        nn.ReLU(),
        nn.Flatten(),
        nn.Linear(3136, 512),
        nn.ReLU(),
        nn.Linear(512, n_actions)
        )
    net.to(torch.device("cuda"))
    if summary_:
        summary(net, env.observation_space.shape) #Printing the network
    return net

In [12]:
net = make_DQN(shp, env.action_space.n)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 20, 20]           8,224
              ReLU-2           [-1, 32, 20, 20]               0
            Conv2d-3             [-1, 64, 9, 9]          32,832
              ReLU-4             [-1, 64, 9, 9]               0
            Conv2d-5             [-1, 64, 7, 7]          36,928
              ReLU-6             [-1, 64, 7, 7]               0
           Flatten-7                 [-1, 3136]               0
            Linear-8                  [-1, 512]       1,606,144
              ReLU-9                  [-1, 512]               0
           Linear-10                    [-1, 7]           3,591
Total params: 1,687,719
Trainable params: 1,687,719
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.11
Forward/backward pass size (MB): 0.35
Params size (MB): 6.44
Estimat

## Experience Replay and Target Network

### Experience replay

In [12]:
import numpy as np
import collections
from collections import deque

In [13]:
#Taken from: https://github.com/jorditorresBCN/aprendizaje-por-refuerzo/blob/main/Capitulo09.ipynb
Experience = collections.namedtuple('Experience', field_names=['state', 'action', 'reward', 'done', 'new_state'])

class ExperienceReplay:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience):
        self.buffer.append(experience)

    def sample(self, BATCH_SIZE):
        indices = np.random.choice(len(self.buffer), BATCH_SIZE, replace=False)
        states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])
        return np.array(states), np.array(actions), np.array(rewards, dtype=np.float32), \
               np.array(dones, dtype=np.uint8), np.array(next_states)

Target Network es una copia de la red neuronal principal para calcular el valor Q(s', a') en la ecuación de Bellman.

## Deep Q-Learning algorithm

Hiperparametros

In [None]:
#Hyperparamenters for SuperMarioBros-v0

MEAN_REWARD_BOUND = 2100 #Recompensa media para considerar el entrenamiento como exitoso.
NUMBER_OF_REWARDS_TO_AVERAGE = 10

GAMMA = 0.99
BATCH_SIZE = 32
LEARNING_RATE = 1e-4
EXPERIENCE_REPLAY_SIZE = 10000
SYNC_TARGET_NETWORK = 1000 #La red neuronal secundaria es actualizada cada 1000 steps

EPS_START = 1.0
EPS_DECAY = 0.9995
EPS_MIN = 0.02

In [None]:
#Hyperparamenters for SuperMarioBros-v1: DeepMind aproximation
MEAN_REWARD_BOUND = 2100 #Recompensa media para considerar el entrenamiento como exitoso.
NUMBER_OF_REWARDS_TO_AVERAGE = 15

GAMMA = 0.99
BATCH_SIZE = 32
LEARNING_RATE = 0.00025
EXPERIENCE_REPLAY_SIZE = 100000
SYNC_TARGET_NETWORK = 10000 #Actualización de la red neuronal secundaria cada 10000 steps

EPS_START = 1.0
EPS_DECAY = 0.1
EPS_MIN = 0.01

In [None]:
#Hyperparamenters for SuperMarioBros-v2
MEAN_REWARD_BOUND = 2500 #Recompensa media para considerar el entrenamiento como exitoso.
NUMBER_OF_REWARDS_TO_AVERAGE = 15

GAMMA = 0.99
BATCH_SIZE = 32
LEARNING_RATE = 0.00001
EXPERIENCE_REPLAY_SIZE = 100000 
SYNC_TARGET_NETWORK = 10000 #Target network is updated every 10000 steps

EPS_START = 1.0
EPS_DECAY = 0.1
EPS_MIN = 0.01

In [None]:
#Hyperparamenters for SuperMarioBros-v3
MEAN_REWARD_BOUND = 3500 #Recompensa media para considerar el entrenamiento como exitoso.
NUMBER_OF_REWARDS_TO_AVERAGE = 15

GAMMA = 0.99
BATCH_SIZE = 32
LEARNING_RATE = 0.00001
EXPERIENCE_REPLAY_SIZE = 100000 
SYNC_TARGET_NETWORK = 10000 

EPS_START = 1.0
EPS_DECAY = 0.1
EPS_MIN = 0.01

2 fases principales:
- Cuando se obtienen las muestras de la interacción del agente con el entorno al realizar acciones y estas se almacenan en forma de tuplas en el búfer de memoria D.
- Aquella en la que se selecciona aleatoriamente un pequeño lote de tuplas desde D y se entrena la red neuronal con dicho lote de datos.

No se podrá pasar a la fase de aprendizaje de inmediato, se deberá esperar hasta tener suficientes tuplas en el búfer de memoria D. En el pseudocódigo, no se borra la memoria D después de cada episodio porque nos permite recordar y construir lotes de experiencias entre episodios.


    Agent

In [15]:
class Agent:
    def __init__(self, env, exp_replay_buffer):
        self.env = env
        self.exp_replay_buffer = exp_replay_buffer
        self._reset()
    
    def _reset(self):
        self.current_state = self.env.reset()
        self.total_reward = 0.0
    
    def step(self, net, epsilon=0.0, device='cuda'):
        done_reward = None
        if np.random.random() < epsilon:
            action = self.env.action_space.sample()
        else:
            state_ = np.array([self.current_state])
            state = torch.tensor(state_).to(device)
            q_vals = net(state)
            _, act_ = torch.max(q_vals, dim=1)
            action = int(act_.item())
        #print(action)
        new_state, reward, is_done, _ = self.env.step(action)
        self.total_reward += reward
        exp = Experience(self.current_state, action, reward, is_done, new_state)
        self.exp_replay_buffer.append(exp)
        self.current_state = new_state
        
        if is_done:
            done_reward = self.total_reward
            self._reset()
        return done_reward


## Entrenamiento y ejecución

In [None]:
from torch.utils.tensorboard import SummaryWriter
%load_ext tensorboard

In [None]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = wrap_env(env)
net = make_DQN(shp, env.action_space.n)
target_net = make_DQN(shp, env.action_space.n, False)

buffer = ExperienceReplay(EXPERIENCE_REPLAY_SIZE)
agent  = Agent(env, buffer)

epsilon = EPS_START
total_rewards = []
frame_number = 0

#Optimizer we will use to update the weights of the neural network
optimizer = torch.optim.Adam(net.parameters(), lr=LEARNING_RATE)

writer = SummaryWriter(comment="-" + 'SuperMarioBros-v0')

Bucle de entrenamiento

In [None]:
import datetime
print(">>>Training starts at ",datetime.datetime.now())

while True:
        frame_number += 1
        epsilon = max(epsilon*EPS_DECAY, EPS_MIN)

        reward = agent.step(net, epsilon, device='cuda')
        if reward is not None:
            total_rewards.append(reward)

            mean_reward = np.mean(total_rewards[-NUMBER_OF_REWARDS_TO_AVERAGE:])
            
            print(f"Frame:{frame_number} | Total games:{len(total_rewards)} | Mean reward: {mean_reward:.3f}  (epsilon used ={epsilon:.2f})")
            
            writer.add_scalar("epsilon", epsilon, frame_number)
            writer.add_scalar("reward_100", mean_reward, frame_number)
            writer.add_scalar("reward", reward, frame_number)


            if mean_reward > MEAN_REWARD_BOUND:
                print(f"SOLVED in {frame_number} frames and {len(total_rewards)} games")
                break

        if len(buffer) < EXPERIENCE_REPLAY_SIZE:
            continue

        batch = buffer.sample(BATCH_SIZE)
        states_, actions_, rewards_, dones_, next_states_ = batch

        states = torch.tensor(states_).to('cuda')
        next_states = torch.tensor(next_states_).to('cuda')
        actions = torch.tensor(actions_).to('cuda')
        actions2 = actions.type(torch.int64)
        rewards = torch.tensor(rewards_).to('cuda')
        dones = torch.ByteTensor(dones_).to('cuda')

        Q_values = net(states).gather(1, actions2.unsqueeze(-1)).squeeze(-1)
        
        next_state_values = target_net(next_states).max(1)[0]
        next_state_values[dones] = 0.0
        next_state_values = next_state_values.detach()
        
        expected_Q_values = next_state_values * GAMMA + rewards
        loss = nn.MSELoss()(Q_values, expected_Q_values)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if frame_number % SYNC_TARGET_NETWORK == 0:
            target_net.load_state_dict(net.state_dict())

torch.save(net.state_dict(), "SuperMarioBros-v3.dat")
writer.close()
print(">>>Training ends at ",datetime.datetime.now())

Visualización de los modelos entrenados

In [19]:
#Taken (part) from: https://github.com/jorditorresBCN/aprendizaje-por-refuerzo/blob/main/Capitulo09.ipynb

def visualize_replay(path):
    env = wrap_env(gym_super_mario_bros.make('SuperMarioBros-v0'))
    net = make_DQN(env.observation_space.shape, env.action_space.n, False).cpu()
    model = path
    net.load_state_dict(torch.load(model))

    state = env.reset()
    total_reward = 0.0

    while True:
            env.render()
            state_ = torch.tensor(np.array([state], copy=False))
            q_vals = net(state_).data.numpy()[0]
            action = np.argmax(q_vals)
            state, reward, done, _ = env.step(action)
            total_reward += reward
            if done:
                break
            
    print("Total reward: %.2f" % total_reward)
    env.close()



In [21]:
path = "./models/SuperMarioBros-v0.dat"
visualize_replay(path)

Total reward: 1602.00


In [22]:
path = "./models/SuperMarioBros-v1.dat"
visualize_replay(path)

Total reward: -92.00


In [20]:
path = "./models/SuperMarioBros-v2.dat"
visualize_replay(path)

Total reward: 2209.00
