In [1]:
import gym
import cv2
import time
import collections
import numpy as np
import torch
import torch.nn as nn

#### Clases y funciones

In [2]:
# 1. Este wrapper se queda con el máximo de cada uno de los píxeles en los dos últimos frames
# debido al efecto de parpadeo que tienen algunos juegos de Atari.
# 2. Otra acción importante que realiza, es tomar un frame cada N pasos, pues la 
# diferencia entre los fotogramas subsecuentes es mínima, y esto permite acelerar 
# el proceso de entrenamiento debido a que no se tiene que procesar cada frame.
class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env=None, skip=4):
        super(MaxAndSkipEnv, self).__init__(env)
        self._obs_buffer = collections.deque(maxlen=2)
        self._skip = skip
        
    def step(self, action):
        total_reward = 0.0
        done = None
        for _ in range(self._skip):
            obs, reward, done, trunc, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break
        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, done, trunc, info
    
    def reset(self):
        self._obs_buffer.clear()
        obs, info = self.env.reset()
        self._obs_buffer.append(obs)
        return (obs, info)
    
# Este wrapper presiona el botón FIRE para iniciar el juego
#! En la nueva versión de gym step nos devuelve 5 valores,
#! por lo que se ha modificado esa parte del código al aplicar env.step
class FireResetEnv(gym.Wrapper):
    def __init__(self, env=None):
        super(FireResetEnv, self).__init__(env)
        assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
        assert len(env.unwrapped.get_action_meanings()) >= 3
    
    def step(self, action):
        return self.env.step(action)
    
    def reset(self):
        self.env.reset()
        obs, _, done, _, info = self.env.step(1) # Presiona el botón FIRE
        if done:
            self.env.reset()
        return (obs, info)
    
    
# Este wrapper escala el frame a 84x84 y convierte el frame a escala de grises
class ProcessFrame84(gym.ObservationWrapper):
    def __init__(self, env=None):
        super(ProcessFrame84, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
            
    def observation(self, obs):
        return ProcessFrame84.process(obs)
        
    @staticmethod
    def process(frame):
        if frame.size == 210 * 160 * 3:
            img = np.reshape(frame, [210, 160, 3]).astype(np.float32)
        elif frame.size == 250 * 160 * 3:
            img = np.reshape(frame, [250, 160, 3]).astype(np.float32)
        else:
            assert False, "Unknown resolution."
        img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
        resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_AREA)
        x_t = resized_screen[18:102, :]
        x_t = np.reshape(x_t, [84, 84, 1])
        return x_t.astype(np.uint8)
        
# Este wrapper cambia la forma de la observación de (H, W, C) a (C, H, W)
class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        old_shape = self.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]), dtype=np.float32)
        
    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)
    
# Este wrapper apila varios fotogramas seguidos (generalmente 4)
class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps, dtype=np.float32):
        super(BufferWrapper, self).__init__(env)
        self.dtype = dtype
        self.observation_space = gym.spaces.Box(env.observation_space.low.repeat(n_steps, axis=0), env.observation_space.high.repeat(n_steps, axis=0), dtype=dtype)
        
    def reset(self):
        self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
        obs, info = self.env.reset()
        return (self.observation(self.env.reset()[0]), info)
    
    def observation(self, observation):
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer
    
# Este wrapper escala los valores de los píxeles a valores entre 0 y 1 y los convierte a float32
class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, observation):
        return np.array(observation).astype(np.float32) / 255.0

In [3]:
# Crear entorno con los wrappers
def make_env(env_name, mode='rgb_array'):
    env = gym.make(env_name, render_mode=mode)
    env = MaxAndSkipEnv(env)
    env = FireResetEnv(env)
    env = ProcessFrame84(env)
    env = ImageToPyTorch(env)
    env = BufferWrapper(env, 4)
    return ScaledFloatFrame(env)

# Crear red neuronal convolucional
def make_DQN(input_shape, output_shape):
    net = nn.Sequential(
        nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
        nn.ReLU(),
        nn.Conv2d(32, 64, kernel_size=4, stride=2),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size=3, stride=1),
        nn.ReLU(),
        nn.Flatten(),
        nn.Linear(64*7*7, 512),
        nn.ReLU(),
        nn.Linear(512, output_shape)
    )
    return net

#### Visualizar partida con red entrenada

In [4]:
visualize = True

# Iniciar entorno y cargar pesos de la red 
env = make_env("PongNoFrameskip-v4", mode='human')
net = make_DQN(env.observation_space.shape, env.action_space.n)
net.load_state_dict(torch.load('Practica7_Q_Network.dat'))

estado = env.reset()[0]
recompensa_total = 0.0

while True:
    # Visualizar y tiempo
    start_ts = time.time()
    if visualize:
        env.render()
    
    #| TOMA DE DECISIONES
    estado_ = torch.tensor(np.array([estado], copy=False))  # Convertir estado a tensor
    q_vals = net(estado_).data.numpy()[0]                   # Calcular valores Q
                                                            # data: extrae el tensor del resultado el cual se almacena en la GPU
                                                            # La indexación [0] es para extraer el array que a su vez está dentro de otro array (el cual no contiene nada más)
    accion = np.argmax(q_vals)
    estado, recompensa, done, truncado, info = env.step(accion)
    recompensa_total += recompensa
    
    if done:
        break
    
    # Visualización
    if visualize:
        delta = 1/30 - (time.time() - start_ts)
        if delta > 0:
            time.sleep(delta)

print("Recompensa total: %.2f" % recompensa_total)

  if not isinstance(terminated, (bool, np.bool8)):
  logger.warn(


Recompensa total: 20.00


: 