# **Super Mario Bros. with Stable-Baseline3 PPO**



## First steps

In [1]:
import gym_super_mario_bros
#The 1-1 specifies the map to be loaded
STAGE_NAME = 'SuperMarioBros-1-1-v0' # Standar versión
#STAGE_NAME = 'SuperMarioBros-1-1-v3' # Rectangle versión
env = gym_super_mario_bros.make(STAGE_NAME) #Create the enviroment

>The next step would be to specify the moves that our Mario could make. The enviroment brings us by default certain predefined movements, although we can create our own as we will see in the section of pre processing.

In [2]:
from nes_py.wrappers import JoypadSpace

from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from gym_super_mario_bros.actions import COMPLEX_MOVEMENT
from gym_super_mario_bros.actions import RIGHT_ONLY
print("Simple Movements : ", SIMPLE_MOVEMENT)
print("Complex Movements : ", COMPLEX_MOVEMENT)
print("Right Only Movements : ", RIGHT_ONLY)
env = JoypadSpace(env, SIMPLE_MOVEMENT) #specify the movements

Simple Movements :  [['NOOP'], ['right'], ['right', 'A'], ['right', 'B'], ['right', 'A', 'B'], ['A'], ['left']]
Complex Movements :  [['NOOP'], ['right'], ['right', 'A'], ['right', 'B'], ['right', 'A', 'B'], ['A'], ['left'], ['left', 'A'], ['left', 'B'], ['left', 'A', 'B'], ['down'], ['up']]
Right Only Movements :  [['NOOP'], ['right'], ['right', 'A'], ['right', 'B'], ['right', 'A', 'B']]


>With these steps we can start playing with mario bros. 

In [3]:
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)

# done = True
# for step in range(5):
#     if done: # Done will be true if Mario dies in the game
#         state = env.reset()
#     state, reward, done, info = env.step(env.action_space.sample())
#     env.render() # If we are running the program in Colab we will need to comment the rendering of the environment. 
# env.close()

## Pre-procesing

In [4]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
# Import Frame Stacker Wrapper and GrayScaling Wrapper
from gym.wrappers import GrayScaleObservation
# Import Vectorization Wrappers
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv, SubprocVecEnv, vec_monitor


>This section analyzes the pre-processing that has been done to the environment. On the one hand, we have the SkipFrame function. By default, in each frame the game performs an action (a movement) and returns the reward for that action. What happens, is that to train the AI it is not necessary to make a move in each frame. That is why, the function executes the movement every X frames giving less work to do the training.

In [5]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        done = False
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info

>The second step is the re-scaling of our environment. By default the enviroment is given by 240*256 pixels. In order to optimize our model it is not necessary to have so many pixels and that is why we can rescale our enviroment to a smaller scale.

In [6]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)
state = env.reset()
print(state.shape)

(240, 256, 3)


In [7]:
class ResizeEnv(gym.ObservationWrapper):
    def __init__(self, env, size):
        gym.ObservationWrapper.__init__(self, env)
        (oldh, oldw, oldc) = env.observation_space.shape
        newshape = (size, size, oldc)
        self.observation_space = gym.spaces.Box(low=0, high=255,
            shape=newshape, dtype=np.uint8)

    def observation(self, frame):
        height, width, _ = self.observation_space.shape
        frame = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
        if frame.ndim == 2:
            frame = frame[:,:,None]
        return frame

In [8]:
class CustomRewardAndDoneEnv(gym.Wrapper):
    def __init__(self, env=None):
        super(CustomRewardAndDoneEnv, self).__init__(env)
        self.current_score = 0
        self.current_x = 0
        self.current_x_count = 0
        self.max_x = 0
    def reset(self, **kwargs):
        self.current_score = 0
        self.current_x = 0
        self.current_x_count = 0
        self.max_x = 0
        return self.env.reset(**kwargs)
    def step(self, action):
        state, reward, done, info = self.env.step(action)
        reward += max(0, info['x_pos'] - self.max_x)
        if (info['x_pos'] - self.current_x) == 0:
            self.current_x_count += 1
        else:
            self.current_x_count = 0
        if info["flag_get"]:
            reward += 500
            done = True
            print("GOAL")
        if info["life"] < 2:
            reward -= 500
            done = True
        self.current_score = info["score"]
        self.max_x = max(self.max_x, self.current_x)
        self.current_x = info["x_pos"]
        return state, reward / 10., done, info

>By default the environment is composed of the RGB room. This data is unnecessary when training our model and we will get better results if we convert our game to a grayscale.

In [9]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, RIGHT_ONLY)
state = env.reset()
print("RGB scale : ",state.shape)
env = GrayScaleObservation(env, keep_dim=True)
state = env.reset()
print("Gray scale:",state.shape)

RGB scale :  (240, 256, 3)
Gray scale: (240, 256, 1)


>Finally, it is important to group the frames when training. If you only train with one frame the AI will not be able to know where Mario or the enemies are moviing. This is why a FrameStack of 4 frames is created for training.

In [10]:
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

>This is the final pre-processing

In [29]:
from stable_baselines3.common.monitor import Monitor
from typing import Callable
from stable_baselines3.common.utils import set_random_seed

MOVEMENT = [["right"], ["right", "A"]]
STAGE_NAME = 'SuperMarioBros-1-1-v0'
def make_mario_env(env_id: str, rank: int, seed: int = 0) -> Callable:
    def _init() -> gym.Env:
        env = gym_super_mario_bros.make(env_id)
        env.reset(seed=seed + rank)
        env = JoypadSpace(env, MOVEMENT)
        #env = CustomRewardAndDoneEnv(env)
        env = SkipFrame(env, skip=4)
        env = GrayScaleObservation(env, keep_dim=True)
        env = ResizeEnv(env, size=84)
        env = Monitor(env)
        return env

    set_random_seed(seed)
    return _init

def make_parallel_env(env_id, n_envs):
    env = SubprocVecEnv([make_mario_env(env_id, i) for i in range(n_envs)])
    env = VecFrameStack(env, 4, channels_order='last')
    return env

def make_single_env(env_id):
    env = make_mario_env(env_id, 0)()
    env = DummyVecEnv([lambda: env])
    env = VecFrameStack(env, 4, channels_order='last')
    return env

env = make_parallel_env(STAGE_NAME, 1)
#env = make_single_env(STAGE_NAME)

In [12]:
# env.reset()
# state, reward, done, info = env.step([0])
# print('state:', state.shape) #Color scale, height, width, num of stacks

In [13]:
# def display_all_frame():
#     plt.figure(figsize=(16,16))
#     for idx in range(state.shape[3]):
#         plt.subplot(1,4,idx+1)
#         plt.imshow(state[0][:,:,idx])
#     plt.show()

In [14]:
#display_all_frame()

## Training of the model

In [15]:
# Import PPO for algos
from stable_baselines3 import PPO
import torch as th
from torch import nn

# Import Base Callback for saving models
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

In [16]:
# Model Param
CHECK_FREQ_NUMB = 10_000
TOTAL_TIMESTEP_NUMB = 1_000_000
LEARNING_RATE = 0.0001
GAE = 1.0
ENT_COEF = 0.01
N_STEPS = 512
GAMMA = 0.9
BATCH_SIZE = 64
N_EPOCHS = 10

# Test Param
EPISODE_NUMBERS = 20
MAX_TIMESTEP_TEST = 1000

>Once the environment has been preprocessed, it is time to start training our AI model. In this case the stable-baseline3 PPO algorithm will be used due to its simplicity, but other alternatives such as DQN or DDQN can be explored. Before starting with the training, a convolutional neural network (CNN) has been created.

>

In [17]:
class MarioNet(BaseFeaturesExtractor):

    def __init__(self, observation_space: gym.spaces.Box, features_dim):
        super(MarioNet, self).__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(th.as_tensor(observation_space.sample()[None]).float()).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=MarioNet,
    features_extractor_kwargs=dict(features_dim=512),
)

>The next step consists of the creation of a file where the AI will save the results obtained in each iteration. In this way, later we will be able to visualize graphically the learning of our model.

>In this case, the average score, the average starting time and the best score obtained will be saved for each iteration.

In [18]:
import shutil
from pathlib import Path

save_dir = Path('./model')
save_dir.mkdir(parents=True)
reward_log_path = (save_dir / 'reward_log.csv')

In [19]:
with open(reward_log_path, 'a') as f:
    print('timesteps,reward,best_reward', file=f)

>This callback function will be in charge of writing the aforementioned data to the file. This function will be executed automatically each time an iteration has been completed.

In [20]:
class TrainAndLoggingCallback(BaseCallback):
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        print('Start Training')
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.num_timesteps % self.check_freq == 0:
            print('start test')
            model_path = (save_dir / 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

            total_reward = [0] * EPISODE_NUMBERS
            total_time = [0] * EPISODE_NUMBERS
            best_reward = 0
            test_env = make_single_env(STAGE_NAME)
            for i in range(EPISODE_NUMBERS):
              
                state = test_env.reset()  # reset for each new trial
                done = False
                total_reward[i] = 0
                total_time[i] = 0
                while not done and total_time[i] < MAX_TIMESTEP_TEST:
                    action, _ = model.predict(state)
                    state, reward, done, info = test_env.step(action)
                    total_reward[i] += reward[0]
                    total_time[i] += 1

                if total_reward[i] > best_reward:
                    best_reward = total_reward[i]
                    best_epoch = self.n_calls

                state = test_env.reset()  # reset for each new trial

            print('time steps:', self.num_timesteps, '/', TOTAL_TIMESTEP_NUMB)
            print('average reward:', (sum(total_reward) / EPISODE_NUMBERS),
                  'average time:', (sum(total_time) / EPISODE_NUMBERS),
                  'best_reward:', best_reward)

            with open(reward_log_path, 'a') as f:
                print(self.n_calls, ',', sum(total_reward) / EPISODE_NUMBERS, ',', best_reward, file=f)

        return True

>Finally, all that remains is for our AI to start learning. 

In [26]:
callback = TrainAndLoggingCallback(check_freq=CHECK_FREQ_NUMB, save_path=save_dir)

In [30]:
model = PPO('CnnPolicy', env, verbose=0, policy_kwargs=policy_kwargs, tensorboard_log=save_dir, learning_rate=LEARNING_RATE, n_steps=N_STEPS,
              batch_size=BATCH_SIZE, n_epochs=N_EPOCHS, gamma=GAMMA, gae_lambda=GAE, ent_coef=ENT_COEF)

In [31]:
model.learn(total_timesteps=TOTAL_TIMESTEP_NUMB)

Process ForkServerProcess-13:
Traceback (most recent call last):
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/famoose/PycharmProjects/bsc_thesis/venv/lib/python3.9/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py", line 27, in _worker
    cmd, data = remote.recv()
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 255, in recv
    buf = self._recv_bytes()
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 419, in _rec

KeyboardInterrupt: 

## Results and Conclusion

>This last section analyzes the results and conclusions of this project. As can be seen in the graphs, two different models have been trained, one using the standard set and the other using the rectangle set.

>In the standard game, 1050000 iteractions have been executed, while in the rectangular game there have been 640000.  Although the rectangular model has been trained with much fewer iterations, the best model has similar results to the best standard model. 

>If we run the function that calculates the win rate we can see that both models have a 20% win rate.

In [None]:
import pandas as pd
reward_log = pd.read_csv("reward_log_Standar.csv", index_col='timesteps')
reward_log.plot()

In [None]:
import pandas as pd
reward_log = pd.read_csv("reward_log_Rectangle.csv", index_col='timesteps')
reward_log.plot()

In [None]:
reward_log = pd.read_csv("reward_log_Standar.csv", index_col='timesteps')
best_epoch = reward_log['reward'].idxmax()
print('best epoch:', best_epoch)

In [None]:
best_model_path = os.path.join(save_dir, 'best_model_{}'.format(50000))
model = PPO.load(best_model_path)

In [None]:
env = make_single_env(STAGE_NAME)
state = env.reset()
done = False
plays = 0
wins = 0
while plays < 100:
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    if done:
        state = env.reset() 
        if info[0]["flag_get"]:
          wins += 1
        plays += 1
print("Model win rate: " + str(wins) + "%")

In [None]:
state = env.reset()
plays = 0

while plays < 100:
    if done:
        state = env.reset() 
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    env.render() #Only in local, not in Colab 

>[Demo](https://youtube.com/shorts/jta7SegNNwM)
<iframe width='560' height='315' src="https://youtube.com/shorts/jta7SegNNwM"/>