Adapted from for kaggle notebook.
https://www.youtube.com/watch?v=2eeYqJ0uBKE

In [2]:
!pip install -q gym_super_mario_bros==7.3.0 nes_py

### For rendering and display remotely on kaggle notebook
https://stackoverflow.com/a/45179251

In [3]:
!xvfb-run -s "-screen 0 1400x900x24" jupyter notebook

In [4]:
import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display

In [5]:
def show_state(env, step=0, info=""):
    plt.figure(3)
    plt.clf()
    plt.imshow(env.render(mode='rgb_array'))
    plt.title("Step: %d %s" % (step, info))
    plt.axis('off')

    display.clear_output(wait=True)
    display.display(plt.gcf())

In [6]:
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace

from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

In [7]:
SIMPLE_MOVEMENT

In [8]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)

In [9]:
env.action_space

In [10]:
env.observation_space.shape

In [11]:
done = True
for step in range(100):
    if done:
        env.reset()
    state, reward, done, info = env.step(env.action_space.sample())
    #print(state, reward, done, info)
    show_state(env,step,info)
#env.close()

Mario is up and running. Yay!

## Preprocess environment

In [12]:
#!pip install torch==1.10.1+cu113 torchvision==0.11.2+cu113 torchaudio===0.10.1+cu113
!pip install -q torch torchvision torchaudio

In [13]:
!pip install -q stable-baselines3[extra]

In [14]:
from gym.wrappers import GrayScaleObservation
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv
from matplotlib import pyplot as plt

In [15]:
# env = gym_super_mario_bros.make('SuperMarioBros-v0')
# env = JoypadSpace(env, SIMPLE_MOVEMENT)
env = GrayScaleObservation(env, keep_dim=True)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [16]:
state = env.reset()

In [17]:
state, reward, done, info = env.step([5])

In [18]:
plt.figure(figsize=(20,16))
for idx in range(state.shape[3]):
    plt.subplot(1,4,idx+1)
    plt.imshow(state[0][:,:,idx])
plt.show()

This is the sample game state world we are going to work with.

## Training the RL model

In [19]:
import os
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback

In [20]:
class TrainAndLoggingCallback(BaseCallback):
    
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path
    
    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)
    
    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)
        return True

In [21]:
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs/'

In [22]:
callback = TrainAndLoggingCallback(check_freq=100, save_path=CHECKPOINT_DIR)

In [23]:
model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=LOG_DIR, learning_rate=0.000001, n_steps=512)

In [24]:
model.learn(total_timesteps=100, callback=callback)

In [25]:
model.save('astatemodel')

In [26]:
##

## TEST IT OUT

In [27]:
model = PPO.load('./train/best_model_100')

In [28]:
state = env.reset()

In [30]:
state = env.reset()

for step in range(10000):
    if done:
        env.reset()
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    #print(state, reward, done, info)
    show_state(env,step,info)
#env.close()