In [61]:
# !pip install gym-super-mario-bros

In [62]:
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from gym.wrappers import FrameStack, GrayScaleObservation
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import BaseCallback
import os
import sys
import time
import numpy as np
import random
import math
from stable_baselines3 import PPO, DQN
import matplotlib.pyplot as plt

In [63]:
SIMPLE_MOVEMENT

[['NOOP'],
 ['right'],
 ['right', 'A'],
 ['right', 'B'],
 ['right', 'A', 'B'],
 ['A'],
 ['left']]

In [64]:
env = gym_super_mario_bros.make('SuperMarioBros2-v1')
env = JoypadSpace(env, SIMPLE_MOVEMENT)
env = GrayScaleObservation(env, keep_dim=True)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [65]:
env.action_space

Discrete(7)

In [66]:
env.observation_space.shape

(240, 256, 4)

In [67]:
stepsize = 10000

Test Env

In [68]:
# env.reset()
# done = True
# stepsize = 10000
# for _ in range(stepsize):
#     if done:
#         env.reset()
#         # break
#     state, reward, done, info = env.step(env.action_space.sample())
#     env.render()
# env.close()

In [69]:
# From https://github.com/nicknochnack/MarioRL/blob/main/Mario%20Tutorial.ipynb
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [70]:
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs/'


if not os.path.exists(CHECKPOINT_DIR):
    os.makedirs(CHECKPOINT_DIR)

if not os.path.exists(LOG_DIR):
    os.makedirs(LOG_DIR)

# Setup model saving callback
callback = TrainAndLoggingCallback(check_freq=1000, save_path=CHECKPOINT_DIR)

In [71]:
# This is the AI model started
model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=LOG_DIR, learning_rate=0.001, 
            n_steps=512) 

Using cpu device
Wrapping the env in a VecTransposeImage.


In [72]:
model.learn(total_timesteps=100000, callback=callback, reset_num_timesteps=False, tb_log_name='PPO-3')

Logging to ./logs/PPO-3_0
----------------------------
| time/              |     |
|    fps             | 52  |
|    iterations      | 1   |
|    time_elapsed    | 9   |
|    total_timesteps | 512 |
----------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 10          |
|    iterations           | 2           |
|    time_elapsed         | 98          |
|    total_timesteps      | 1024        |
| train/                  |             |
|    approx_kl            | 0.085928135 |
|    clip_fraction        | 0.513       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.88       |
|    explained_variance   | -0.000406   |
|    learning_rate        | 0.001       |
|    loss                 | 5.79        |
|    n_updates            | 10          |
|    policy_gradient_loss | 0.0368      |
|    value_loss           | 43.7        |
-----------------------------------------
---------------

KeyboardInterrupt: 

In [73]:
model.save('thisisatestmodel')

For Results

In [74]:
model = PPO.load('./train/best_model_18000')

In [75]:
# Start the game 
state = env.reset()
# Loop through the game
while True: 
    
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    env.render()



KeyboardInterrupt: 

: 

In [None]:
# model = DQN('CnnPolicy', env, verbose=1 ,tensorboard_log=LOG_DIR, learning_rate=0.0001)

In [None]:
# for i in range(10000):
#     model.learn(total_timesteps=stepsize ,reset_num_timesteps=False, tb_log_name='DQN-1')
#     model.save(f"{CHECKPOINT_DIR}\{stepsize*i}")