# Tests of trained models

I will use this notebook to create a function, to test the models that are being generated during the development of the project.

In [1]:
# Imports for "gym" and "retro" features
from gym import Env
import gym
from gym.spaces import MultiDiscrete, Box, MultiBinary, Discrete
from gym.wrappers import Monitor as gymMon
from gym.wrappers.monitoring.video_recorder import VideoRecorder

import retro
from retro import RetroEnv

# To help with image preprocessing
import numpy as np
import cv2

# To help with the operating system
import time
import os
from tqdm import tqdm

# Everything concerning SB3
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv, VecFrameStack, VecTransposeImage
from stable_baselines3.common.evaluation import evaluate_policy

In [2]:
class RetroMtpoNesReducedRL(Env):
    """
    Class that creates a retro "Gym" object, and allows me to manipulate its observation space.
     With this I seek to reduce the observations space, to speed up the training stage.

     This class creates a "focus area", removing the outter most two thirds of the screen (vertically), 
     leaving in "focus" a area where the action of the game takes place. Additionally I reduce the number
     of color channels, from three to one, which gives the feeling that the game is in black and white (also
     called "grayscaling").

     In this class, additionally, the "viewing" area is reduced, going from an observation space of 196x80x1
     to one of 84x84x1.
     
     The main inspiration for this class comes from a Youtube tutorial from Nickolas Renotte.
     
     https://www.youtube.com/watch?v=rzbFhu6So5U&t=6248s
     
    """
    def __init__(self, state='GlassJoe.state',
                 scenario='scenario_king_hippo',
                 inttype=retro.data.Integrations.STABLE,
                 points_as_rewards=True):
        super(RetroEnv).__init__()
        # Most of these lines comes from GYM RETRO library.
        self.img = None
        rom_path = retro.data.get_romfile_path('Mtpo-Nes', inttype)
        self.system = retro.get_romfile_system(rom_path)
        core = retro.get_system_info(self.system)
        self.buttons = core['buttons']
        self.observation_space = Box(low=0, high=255, shape=(84,84,1), dtype=np.uint8)
        self.action_space = MultiBinary(9)
        self.state = state
        self.scenario = scenario
        self.game = retro.make(game='Mtpo-Nes',
                               state=self.state,
                               scenario=self.scenario,
                              )
        self.points_as_rewards = points_as_rewards
        self.picture = None
        

    def preprocess(self, observation):
        """ 
        Method to preprocess the images that the "RetroEnv" object uses during training.
         The idea is to deliver a reduced observation, which helps streamline the training processes of the
         agent. The derivation of the reduced observation can be seen in the notebook:
        
         - '1_CV_Preprocessing.ipynb'
        
         which is part of this 'Notebooks' section
        """
        # Cropping
        xlen = observation.shape[0]
        ylen = observation.shape[1]
        focus_zone = observation[int(xlen*(1/8)):int(xlen*(3/2)),int(ylen/3):-int(ylen/3)]
        # Grayscale
        gray = cv2.cvtColor(focus_zone, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (84,84), interpolation=cv2.INTER_CUBIC)
        
        # We must fit the output to a tensor with three dimensions, since
        # it is the data structure that the gym object expects.
        # values between 0 and 1.
        channels = np.reshape(resize, (84,84,1))

        return channels

    def reset(self):
        # Returns the fist "frame"
        obs = self.game.reset()
        processed_obs = self.preprocess(obs)
        self.score = 0
        self.picture = processed_obs
        return processed_obs
    
    def step(self, action):
        # Go one step further in the emulation of the game
        # Integrate the modification to the observation using the "preprocessed()" method
        obs, reward, done, info = self.game.step(action)
        processed_obs = self.preprocess(obs)
        
        # This is to return the points of the game as the reward if we want it.
        if self.points_as_rewards:
            reward_as_points = info['POINTS'] - self.score
            self.score = info['POINTS']
            return processed_obs, reward_as_points, done, info
        else:  
            return processed_obs, reward, done, info
    
    # The rest of the methods are not used much, yet might come in
    # handy in some cases
    def render(self, *args, **kwargs):
        self.game.render()
        
    def close(self):
        self.game.close()

    def get_image(self):
        return self.picture
    
    def get_buttons(self):
        return self.buttons
    
    def get_action_meaning(self, act):
        return self.game.get_action_meaning(act)
    
    def get_in_game_score(self):
        return self.score

    def get_in_game_reward(self):
        return self.in_game_reward

In [3]:
class RetroMtpoNesReduced(Env):
    """
    Clase que crea un objeto retro "gym", y me permite manipular el espacio de observaciones del mismo.
    Con esto busco reducir el espacio de observaciones, para acelerar la etapa de entrenamiento.

    Esta clase crea un "area de foco", eliminando dos tercios de la pantalla (verticalmente), más especificamente
    eliminando dos franjas verticales de los extremos, dejando en "foco" el are donde se lleva a cabo la acción
    del juego. Adicionalmente reduzco la cantidad de canales de color, de tres a uno, dejando solo un canal,
    lo que da la sensación de que el juego es en blanco y negro.

    En esta clase, adicionalmente, se realiza una reducción del area de visión, pasando de un espacio de observaciones de 196x80x1
    a uno de 84x84x1, y permitiendo que el agente solo "observe" la diferencia entre el "frame" actual, y el anterior, 
    y no todo el "frame" completo. 
    """
    def __init__(self, state='GlassJoe.state', scenario='scenario.json', inttype=retro.data.Integrations.STABLE,
        points_as_rewards=True):
        super(RetroEnv).__init__()
        self.img = None
        rom_path = retro.data.get_romfile_path('Mtpo-Nes', inttype)
        self.system = retro.get_romfile_system(rom_path)
        core = retro.get_system_info(self.system)
        self.buttons = core['buttons']
        self.observation_space = Box(low=0, high=255, shape=(84,84,1), dtype=np.uint8)
#         self.action_space = MultiBinary(12)
        self.action_space = Discrete(24)
        self.state = state
        self.scenario = scenario
        self.game = retro.make(game='Mtpo-Nes', state=self.state, scenario=self.scenario)
        self.points_as_rewards = points_as_rewards
        self.picture = None

    def preprocess(self, observation):
        """ Metodo para preprocesar las imagenes que el objeto "env" utiliza durante el entrenamiento.
        La idea es entregar una observación reducida, que ayude a agilizar los procesos de entrenamiento del
        agente. La derivación de la observación reducida puede verse en el notebook: 
        
        - "2_aletelecom_CV_Preprocessing.ipynb"
        
        que es parte de esta sección de "Notebooks"
        """
        # Cropping
        xlen = observation.shape[0]
        ylen = observation.shape[1]
        focus_zone = observation[int(xlen*(1/8)):int(xlen*(3/2)),int(ylen/3):-int(ylen/3)]
        # Grayscale
        gray = cv2.cvtColor(focus_zone, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (84,84), interpolation=cv2.INTER_CUBIC)
        
        # Debemos ajustar la salida a un tensor con tres dimensiones, debido a que
        # es la estructura de datos que espra el objeto gym.
        # También dividimos la salida de canales entre "255" para "normalizar"
        # los valores entre 0 y 1.
        channels = np.reshape(resize / 255, (84,84,1))

        return channels

    def reset(self):
        # Retorna el primer "frame"
        # Sin cambios a la implementación original
        obs = self.game.reset()
        processed_obs = self.preprocess(obs)
        self.score = 0
        self.previous_frame = processed_obs
        frame_delta = processed_obs - self.previous_frame
        self.picture = frame_delta
        return processed_obs
    
    def step(self, action):
        # Avanza un paso en la emulación del juego
        # Integra la modificación a la observación mediante el metodo "preprocessed()"
        obs, reward, done, info = self.game.step(action)
        processed_obs = self.preprocess(obs)

        # La variable "frame_delta" es la diferencia entre el "frame" anterior
        # el actual, lo que "muestra" al agente solo las diferencias entre
        # cada cuadro, reduciendo aún más el procesamiento.
        frame_delta = processed_obs - self.previous_frame
        self.previous_frame = processed_obs   
        self.picture = frame_delta
        if self.points_as_rewards:
            reward_as_points = info['POINTS'] - self.score
            self.score = info['POINTS']
            return frame_delta, reward_as_points, done, info
        else:  
            return frame_delta, reward, done, info
    
    def render(self, *args, **kwargs):
        self.game.render()
        
    def close(self):
        self.game.close()

    def get_image(self):
        return self.picture


In [4]:
class Discretizer(gym.ActionWrapper):
    """
    Wrap a gym environment and make it use discrete actions.
    Args:
        combos: ordered list of lists of valid button combinations
    """

    def __init__(self, env, combos):
        super().__init__(env)
        assert isinstance(env.action_space, gym.spaces.MultiBinary)
        buttons = env.unwrapped.buttons
        self._decode_discrete_action = []
        for combo in combos:
            arr = np.array([False] * env.action_space.n)
            for button in combo:
                arr[buttons.index(button)] = True
            self._decode_discrete_action.append(arr)

        self.action_space = gym.spaces.Discrete(len(self._decode_discrete_action))

    def action(self, act):
        return self._decode_discrete_action[act].copy()


class MtpoDiscretizer(Discretizer):
    """
    Use Sonic-specific discrete actions
    based on https://github.com/openai/retro-baselines/blob/master/agents/sonic_util.py
    """
    
    def __init__(self, env):
        USE_STAR = [
        [], # Sin movimiento
        ['RIGHT'], # Esquiva a la derecha
        ['LEFT'], # Esquiva a la izquierda
        ['DOWN'], # Se cubre
        ['UP', 'A'], # Golpea a la cara con un derechazo
        ['UP', 'B'], # Golpea a la cara con un izquierdazo
        ['A'], # Golpea al cuerpo con un derechazo
        ['B'], # Golpea al cuerpo con un izquierdazo
        ['START'], # Utiliza súper poder
        ]
        
        DODGE = [
        [], # Sin movimiento
        ['RIGHT'], # Esquiva a la derecha
        ['LEFT'], # Esquiva a la izquierda
        ['UP', 'A'], # Golpea a la cara con un derechazo
        ['UP', 'B'], # Golpea a la cara con un izquierdazo
        ['A'], # Golpea al cuerpo con un derechazo
        ['B'], # Golpea al cuerpo con un izquierdazo
        ['START'], # Utiliza súper poder
        ]

# Acciones para no utilizar la estrella durante la pelea (súper poder)
        NO_STAR = [
        [], # Sin movimiento
        ['RIGHT'], # Esquiva a la derecha
        ['LEFT'], # Esquiva a la izquierda
        ['DOWN'], # Se cubre
        ['UP', 'A'], # Golpea a la cara con un derechazo
        ['UP', 'B'], # Golpea a la cara con un izquierdazo
        ['A'], # Golpea al cuerpo con un derechazo
        ['B'], # Golpea al cuerpo con un izquierdazo
        ]
        
        super().__init__(env=env, combos=DODGE)
        
        def get_action(self):
            print(self.buttons)

In [5]:
class RetroMtpoNesReducedGail(Env):
    """
    Clase que crea un objeto retro "gym", y me permite manipular el espacio de observaciones del mismo.
    Con esto busco reducir el espacio de observaciones, para acelerar la etapa de entrenamiento.

    Esta clase crea un "area de foco", eliminando dos tercios de la pantalla (verticalmente), más especificamente
    eliminando dos franjas verticales de los extremos, dejando en "foco" el are donde se lleva a cabo la acción
    del juego. Adicionalmente reduzco la cantidad de canales de color, de tres a uno, dejando solo un canal,
    lo que da la sensación de que el juego es en blanco y negro.

    En esta clase, adicionalmente, se realiza una reducción del area de visión, pasando de un espacio de observaciones de 196x80x1
    a uno de 84x84x1, y permitiendo que el agente solo "observe" la diferencia entre el "frame" actual, y el anterior, 
    y no todo el "frame" completo. 
    """
    def __init__(self, state='GlassJoe.state',
                 scenario='scenario.json',
                 inttype=retro.data.Integrations.STABLE,
                 use_restricted_actions=retro.Actions.DISCRETE,
                 points_as_rewards=True):
        super(RetroEnv).__init__()
        self.img = None
        rom_path = retro.data.get_romfile_path('Mtpo-Nes', inttype)
        self.system = retro.get_romfile_system(rom_path)
        core = retro.get_system_info(self.system)
        self.buttons = core['buttons']
        self.observation_space = Box(low=0, high=255, shape=(84,84,1), dtype=np.uint8)
#         self.action_space = Discrete(32)
#         self.observation_space = Box(low=0, high=1, shape=(84,84,1), dtype=np.uint8)
        self.action_space = Discrete(24)
        self.state = state
        self.scenario = scenario
        self.use_restricted_actions = use_restricted_actions
        self.game = retro.make(game='Mtpo-Nes',
                               state=self.state,
                               scenario=self.scenario,
                               use_restricted_actions=self.use_restricted_actions)
        self.points_as_rewards = points_as_rewards
        self.picture = None
        

    def preprocess(self, observation):
        """ Metodo para preprocesar las imagenes que el objeto "env" utiliza durante el entrenamiento.
        La idea es entregar una observación reducida, que ayude a agilizar los procesos de entrenamiento del
        agente. La derivación de la observación reducida puede verse en el notebook: 
        
        - "2_aletelecom_CV_Preprocessing.ipynb"
        
        que es parte de esta sección de "Notebooks"
        """
        # Cropping
        xlen = observation.shape[0]
        ylen = observation.shape[1]
        focus_zone = observation[int(xlen*(1/8)):int(xlen*(3/2)),int(ylen/3):-int(ylen/3)]
        # Grayscale
        gray = cv2.cvtColor(focus_zone, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (84,84), interpolation=cv2.INTER_CUBIC)
        
        # Debemos ajustar la salida a un tensor con tres dimensiones, debido a que
        # es la estructura de datos que espra el objeto gym.
        # También dividimos la salida de canales entre "255" para "normalizar"
        # los valores entre 0 y 1.
        channels = np.reshape(resize, (84,84,1))

        return channels

    def reset(self):
        # Retorna el primer "frame"
        # Sin cambios a la implementación original
        obs = self.game.reset()
        processed_obs = self.preprocess(obs)
        self.score = 0
        self.previous_frame = processed_obs
        frame_delta = processed_obs - self.previous_frame
        self.picture = frame_delta
        return processed_obs
    
    def step(self, action):
        # Avanza un paso en la emulación del juego
        # Integra la modificación a la observación mediante el metodo "preprocessed()"
        obs, reward, done, info = self.game.step(action)
        processed_obs = self.preprocess(obs)
        
        # La variable "frame_delta" es la diferencia entre el "frame" anterior
        # el actual, lo que "muestra" al agente solo las diferencias entre
        # cada cuadro, reduciendo aún más el procesamiento.
        frame_delta = processed_obs - self.previous_frame
        self.previous_frame = processed_obs   
        self.picture = frame_delta
        if self.points_as_rewards:
            reward_as_points = info['POINTS'] - self.score
            self.score = info['POINTS']
            return frame_delta, reward_as_points, done, info
        else:  
            return frame_delta, reward, done, info
    
    def render(self, *args, **kwargs):
        self.game.render()
        
    def close(self):
        self.game.close()

    def get_image(self):
        return self.picture
    
    def get_buttons(self):
        return self.buttons
    
    def get_action_meaning(self, act):
        return self.game.get_action_meaning(act)
    
    def get_in_game_score(self):
        return self.score

In [6]:
GAME_STATE = 'KingHippo.state'
# GAME_STATE = 'GlassJoe.state'
SCENARIO = "scenario_score"
#SCENARIO = 'scenario_king_hippo'
# CLASS = 'FUCKED'
# CLASS = 'REDUCED'
CLASS = 'GAIL'
STACKED_FRAMES = 12
POINTS_AS_REWARDS = True

VIDEO_RECORD_PATH = os.path.join('..', 'video_gifs')
if CLASS == 'REDUCED':
    env = RetroMtpoNesReduced(
        state=GAME_STATE,
        scenario=SCENARIO,
#         video_record_path=VIDEO_RECORD_PATH
    )
#     env = MtpoDiscretizer(env)
    env = DummyVecEnv([lambda: env])
    env = VecFrameStack(env, STACKED_FRAMES, channels_order='last')
elif CLASS == 'FUCKED':
    env = RetroMtpoNes(
        state=GAME_STATE,
        scenario=SCENARIO,
       video_record_path=VIDEO_RECORD_PATH
    )
#     env = MtpoDiscretizer(env)
#     env = DummyVecEnv([lambda: env])
    env = VecFrameStack(env, STACKED_FRAMES, channels_order='last')
elif CLASS == 'GAIL':
    env = RetroMtpoNesReducedGail(
        state=GAME_STATE,
        scenario=SCENARIO,
        use_restricted_actions=retro.Actions.DISCRETE,
        points_as_rewards=POINTS_AS_REWARDS
    )
    env = Monitor(env)
    env = DummyVecEnv([lambda: env])
#     env = VecTransposeImage(env)
    env = VecFrameStack(env, STACKED_FRAMES, channels_order='last')
    env = VecTransposeImage(env, skip=True)

In [7]:
# env.close()

In [8]:
# custom_objects = {
# #     "lr_schedule": lambda x: .003,#x,
#     "clip_range": lambda x: 0.02#x
# }

In [9]:
# custom_objects = {
#             "learning_rate": lambda _: 0.0,
#             "lr_schedule": lambda _: 0.0,
#             "clip_range": lambda _: 0.0,
#             "gamma": lambda _: 0.0,
#             "ent_coef": lambda _: 0.0
#         }

In [10]:
#MODEL_NAME = 'PPO_VK_checkpoint_model_2000000.zip'
# MODEL_NAME = 'trial_VK_0_best_model.zip'
# #MODEL_PATH = os.path.join('..', 'models', 'train', 'vonkaiser', MODEL_NAME)
# MODEL_PATH = os.path.join('..', 'models', 'opt', 'vonkaiser_2', MODEL_NAME)
# #MODEL_PATH = os.path.join(MODEL_NAME)
# model = PPO.load(MODEL_PATH, custom_objects=custom_objects)

In [11]:
# MODEL_NAME = 'PPO_model_750000S_12SF_DISCRETIZED_PUNCHES.zip'
# MODEL_NAME = 'PPO_008592522717275543_model_2000000S_12SF_DISCRETIZED_POINTS.zip'

MODEL_NAME = 'Partial_GAIL-MLP(1024)-Default-PPO_MLP_HWC_model_550_rounds_54TrajExp.zip'


# MODEL_PATH = os.path.join('..', 'models', 'train', 'glassjoe', MODEL_NAME)
# MODEL_PATH = os.path.join('..', 'notebooks', MODEL_NAME)
MODEL_PATH = os.path.join('..', 'models', 'train', 'gail', 'kinghippo', MODEL_NAME)

model = PPO.load(MODEL_PATH)

# model = PPO.load(MODEL_PATH, print_system_info=True, custom_objects=custom_objects)

In [12]:
# from imitation.policies.serialize import load_policy
# from imitation.util import util

# POLICY_NAME = 'PPO-DAGGER-MLP(1024)-1000-policy_PRETRAINED_KingHippo_POINTS.zip'
# POLICY_PATH = os.path.join('..', 'models', 'train', 'dagger', 'kinghippo', POLICY_NAME)
# local_policy = load_policy("ppo", env, loader_kwargs={"path": POLICY_PATH})

In [13]:
# Para

Observamos el modelo jugar:

# El mejor modelo hasta ahora es el 5 PPO_47 para Glass Joe

# El mejor modelo hasta ahora es el 09 PPO_10 para Von Kaiser

In [14]:
VELOCIDAD = 0.001
GAMES = 100

total_reward = np.zeros(shape=(GAMES,))
for n in tqdm(range(GAMES)):
    episode_rew = 0
    obs = env.reset()
    done = False
    while not done: 
        if done:
            obs = env.reset()
        action, _ = model.predict(obs)
#         action = [env.action_space.sample()]
        obs, reward, done, info = env.step(action)
        episode_rew += reward
#         env.render()
#         time.sleep(VELOCIDAD)
    total_reward[n] = float(episode_rew)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [13:48<00:00,  8.29s/it]


In [15]:
total_reward.mean()

0.0

In [16]:
total_reward

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [17]:
# N_EVAL_EPISODES = 100

# mean_reward, std_reward = evaluate_policy(model,
#                 env,
#                 n_eval_episodes=N_EVAL_EPISODES,
#                 return_episode_rewards=True,
#                 deterministic=False,
#                 render=True,
#                # reward_threshold=10.0
#                )

In [18]:
# print(mean_reward)

In [19]:
# mean_reward, std_reward = evaluate_policy(model,
#                 env,
#                 n_eval_episodes=N_EVAL_EPISODES,
#                 return_episode_rewards=True,
#                 deterministic=True,
#                 render=True,
#                # reward_threshold=10.0
#                )

In [20]:
# print(mean_reward)

In [21]:
env.close()