# **Testing**

Describimos los pasos sencillos para crear un ambiente de gym que utilice Pacman de Atari 2600.

## **Instructivo de descargas**

 - pip install gym
 - conda install swig
 - pip install box2d
 - pip3 install box2d box2d-kengz
 - brew install swig 
 - pip install pygame

## **1. Importar bibliotecas**

In [32]:
import gym
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import math
import random
import matplotlib
from collections import namedtuple, deque
from itertools import count
from PIL import Image
import cv2


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

ALE = Arcade Learning Environment

In [33]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [34]:
env = gym.make("ALE/MsPacman-v5")

class WarpFrame(gym.ObservationWrapper):
    def __init__(self, env, width=84, height=84, grayscale=True, dict_space_key=None):
        '''
        Warp frames to 84x84 as done in the Nature paper and later work.
        If the environment uses dictionary observations, `dict_space_key` can be specified which indicates which
        observation should be warped.
        '''
        super().__init__(env)
        self._width = width
        self._height = height
        self._grayscale = grayscale
        self._key = dict_space_key
        if self._grayscale:
            num_colors = 1
        else:
            num_colors = 3

        new_space = gym.spaces.Box(
            low=0,
            high=255,
            shape=(self._height, self._width, num_colors),
            dtype=np.uint8,
        )
        if self._key is None:
            original_space = self.observation_space
            self.observation_space = new_space
        else:
            original_space = self.observation_space.spaces[self._key]
            self.observation_space.spaces[self._key] = new_space
        assert original_space.dtype == np.uint8 and len(original_space.shape) == 3

    def observation(self, obs):
        if self._key is None:
            frame = obs
        else:
            frame = obs[self._key]

        if self._grayscale:
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        frame = cv2.resize(
            frame, (self._width, self._height), interpolation=cv2.INTER_AREA
        )
        if self._grayscale:
            frame = np.expand_dims(frame, -1)

        if self._key is None:
            obs = frame
        else:
            obs = obs.copy()
            obs[self._key] = frame
        return obs
    
env = WarpFrame(env)

In [35]:
env = gym.wrappers.FrameStack(env, 4)

In [40]:
env.action_space.seed(42) # Seed aleatoria

observacion, info = env.reset(seed=42) # Resetear el environment a uno aleatorio, pero siempre el mismo

A = []

env.reset()
for _ in range(10):  # Por cada paso, que son 100, ejecuta lo siguiente
    observacion, reward, terminated, truncated, info = env.step(env.action_space.sample())
    
    A.append(np.asarray(observacion))

    if terminated or truncated:
        observacion, info = env.reset()

env.close()

In [48]:
class DQN(nn.Module):
    def _init_(self, num_inputs=4, num_actions=9):
        super(DQN, self).__init__()

        self.conv1 = nn.Conv2d(num_inputs, 32, kernel_size=3, stride=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=2, stride=1)
        self.fc3 = nn.Linear(64 * 20 * 20, 512)
        self.fc4 = nn.Linear(512, num_actions)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.fc3(x.view(x.size(0), -1)))
        return self.fc4(x)

In [51]:

class DQN(nn.Module):

    def __init__(self, input_channels, num_actions):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, num_actions)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.fc1(x.view(x.size(0), -1)))
        x = self.fc2(x)
        return x


In [52]:
# Create a random input tensor with shape (4, 84, 84, 1)
input_tensor = torch.from_numpy(np.random.rand(4, 84, 84, 1).astype(np.float32))

# Create a DQN with 4 input channels and 9 possible actions
dqn = DQN(4, 9)

# Get the Q-values for each action for the input tensor
q_values = dqn(input_tensor)

# Print the Q-values
print(q_values)

RuntimeError: Given groups=1, weight of size [32, 4, 8, 8], expected input[4, 84, 84, 1] to have 4 channels, but got 84 channels instead

In [20]:
# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()

In [21]:
state.shape

(84, 84, 1)

In [31]:
    
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = 84*84


policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
print(policy_net)

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # the second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []

def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()
    
if torch.cuda.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

TypeError: DQN.__init__() takes 1 positional argument but 3 were given

## **2. Crear el environment de Pacman**

Render_mode = 'human' permite que sea visual el juego en otra pestaña.

In [None]:
episodio = env.reset()

## **3. Reset del environment**

Resetear el environment para crear un nuevo episodio.

In [None]:
observation, reward, terminated, truncated, info = env.step(env.action_space.sample())
print(info)

In [None]:
obs = preprocess(observation)
grayscale_matrix = np.dot(obs[..., :3], [0.299, 0.587, 0.114]).astype(np.uint8)

In [None]:
grayscale_matrix.shape

In [None]:
env.reset()
env.close()


## **4. Ejemplo de juego aleatorizado**

Nos va a servir para al final comparar con un video como actua de forma aleatoria, y va cambiando por entrenos

- **Observación:** contiene información sobre el estado del ambiente en un episodio dado. Generalmente, contiene información en la velocidad a la que va el agente, los cambios de velocidad de otros agentes y la posición. 
- **info**
- **Step:** actualiza los outputs
- **Action_space.sample():** elige aleatoriamente una acción del espacio posible
- **Terminated:** cuando termina el episodio (pierde o gana)
- **Truncated:** cuando llega al máximo de steps

In [None]:

env.action_space.seed(42) # Seed aleatoria

observacion, info = env.reset(seed=42) # Resetear el environment a uno aleatorio, pero siempre el mismo

for _ in range(100):  # Por cada paso, que son 100, ejecuta lo siguiente
    observacion, reward, terminated, truncated, info = env.step(env.action_space.sample())
    
    print(reward)

    if terminated or truncated:
        observacion, info = env.reset()

env.close()


### **Reward system**

El sistema toma un modelo pr default de recompensa, que es el siguiente:


## **Nuevo environment**

In [None]:
class PacmanEnv(gym.Env):
    def __init__(self, render_mode="human", seed=None):
        if render_mode == "human":
            self.env = gym.make("ALE/MsPacman-v5", render_mode="human")
        else:
            self.env = gym.make("ALE/MsPacman-v5")
        self.action_space = self.env.action_space
        self.observation_space = self.env.observation_space
        self.seed = seed
        self.lives = self.env.ale.lives()
        
        # Dejamos lo demás igual, nos ayuda mucho que tengamos las posibles acciones
        
    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)

        # custom reward
        if reward > 0: # recibió algún tipo de reward
            
            if info['lives'] > 0:  # Si el número de vidas de pacman es positivo
                if reward == 10:  # En el caso por default, 10 de reward es por comer un puntito
                    reward = 25 # Si come un puntito, que el reward sea __
                elif reward == 50: # Si come un power pellet que activa el poder comer fantasmas
                    reward = 60
                elif reward == 200:  # En el caso por default, hay reward de 200 por comer un fantasma
                    reward = 100 # Si come un fantasma, cuánto queremos de reward
            else: # Si a pacman se le acabaron las vidas
                done = True  # Pone en true la bandera de que acabó el juego
                reward = -500  # Si perdemos, cuánto queremos que pierda de reward. Que le duela al pacman
                
        else:
            reward = -1  # Cuando camine pero no consiga nada, que busque la ruta óptima. Importante !
            
            # Castigo por perder vidas
            lives = info['lives']
            if self.env.ale.lives() < lives:
                # Agent lost a life:
                reward -= 1000

        return observation, reward, terminated, truncated, info

    def reset(self):
        return self.env.reset()

    def render(self, mode=None, render=True):
        if self.render_mode == 'human' and render:
            self.env.render(mode=mode)

    def close(self):
        self.env.close()



Iniciar

Iniciar con <code>PacmanEnv(seed = x, render_mode = None)</code> para que no lo haga rendered.

In [None]:
env_new = PacmanEnv(seed = 45, render_mode = None)

In [None]:
episodio_new = env_new.reset()

In [None]:

observacion, info = env_new.reset() # Resetear el environment a uno aleatorio, pero siempre el mismo
frames = [] # almacenar los frames
for _ in range(30):  # Por cada paso, que son 100, ejecuta lo siguiente
    observacion, reward, terminated, truncated, info = env_new.step(env_new.action_space.sample())
    frames.append(observacion)

    if terminated or truncated:
        observacion, info = env_new.reset()

env_new.close()

In [None]:
reward