In [None]:
!pip install gym-super-mario-bros==7.3.0 nes_py

In [None]:
!pip install stable-baselines3[extra]

In [None]:
import gym
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from gym.wrappers import GrayScaleObservation
import matplotlib.pyplot as plt
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv
from nes_py.wrappers import JoypadSpace
JoypadSpace.reset = lambda self, **kwargs: self.env.reset(**kwargs) # StackOverflow code to fix a problem when calling state.reset()

In [None]:
SIMPLE_MOVEMENT

# Random Agent

In [None]:
env = gym.make("SuperMarioBros-v0",apply_api_compatibility=True,render_mode="human")
# Wrap the environment to reduce the action space : 7 instead of 256
env = JoypadSpace(env,SIMPLE_MOVEMENT)

done = True
for step in range(1000):
    if done:
        env.reset()
    action = env.action_space.sample()
    state, reward, done,_, info = env.step(action)
    env.render()
env.close()

# Preprocessing the environment

In [None]:
env = gym.make("SuperMarioBros-v0",apply_api_compatibility=True)
# Wrap the environment to reduce the action space : 7 instead of 256
env = JoypadSpace(env,SIMPLE_MOVEMENT)
# Graycscale the observation space
env= GrayScaleObservation(env, keep_dim=True)
# plt.imshow(env.reset()[0], cmap="Greys")  
# Wrap into the Dummy Environment
env = DummyVecEnv([lambda: env])
# Stack the frames (so the agent can predict the movements of ennemies)
env = VecFrameStack(env,4)
print("OBSERVATION SPACE", str(env.observation_space))
print("ACTION SPACE :",str(env.action_space))
print("RENDER :",str(env.render_mode))

# RL Model

In [None]:
import os
from stable_baselines3 import PPO 
from stable_baselines3.common.callbacks import BaseCallback # Saving models

In [None]:
# Callback to save the model every check_freq steps 
# Don't save too often because a trained model is still quite big
class TrainAndLoggingCallback(BaseCallback): 
    def __init__(self,check_freq,save_path,verbose=1):
        super(TrainAndLoggingCallback,self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path
        
    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)
        
    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model{}'.format(self.n_calls))
            self.model.save(model_path)
        return True

In [None]:
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs/'

In [None]:
callback = TrainAndLoggingCallback(check_freq=100000,save_path=CHECKPOINT_DIR)

In [None]:
model = PPO("CnnPolicy",env,learning_rate=0.000001,n_steps=512, tensorboard_log=LOG_DIR,verbose=1)

In [None]:
model.learn(total_timesteps=1000000,callback=callback)

# Testing trained model

In [None]:
model = PPO.load("./train/best_model300000.zip")

In [None]:
env = gym.make("SuperMarioBros-v0",apply_api_compatibility=True,render_mode="human")
env = JoypadSpace(env,SIMPLE_MOVEMENT)
env= GrayScaleObservation(env, keep_dim=True)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env,4)

frames= []
state = env.reset()
for steps in range(1000):
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    frame = env.render()
    frames.append(frame)
env.close()

# To improve the model :

- Reduce the learning_rate
- Train for longer : 1 million epochs should be a minimum


# Optional : Create a mp4 video of the agent

In [None]:
import cv2
env = gym.make("SuperMarioBros-v0",apply_api_compatibility=True,render_mode="rgb_array")
env = JoypadSpace(env,SIMPLE_MOVEMENT)
env= GrayScaleObservation(env, keep_dim=True)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env,4)

frames= []
state = env.reset()
for steps in range(1000):
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    frame = env.render()
    frames.append(frame)
env.close()

output_video_file = 'agent_demo.mp4'
frame_size = (frames[0].shape[1], frames[0].shape[0])  
fps = 60  
codec = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_video_file, codec, fps, frame_size)
for frame in frames:
    bgr_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    video_writer.write(bgr_frame)

video_writer.release()