In [2]:
#importing OPEN AI GYM dependancies 
!pip install gym_super_mario_bros==7.3.0 nes_py

!pip install stable-baselines3[extra]

!pip install tensorflow-tensorboard

In [None]:
#importing the environment and agent actions.
import gym_super_mario_bros

from nes_py.wrappers import JoypadSpace

from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

In [3]:
import numpy as np
import gym
from gym.spaces import Box

class GrayScaleObservation(gym.ObservationWrapper):
    #Convert the image observation from RGB to gray scale.
    

    def __init__(self, env: gym.Env, keep_dim: bool = False):
        #Convert the image observation from RGB to gray scale.
        #Args:
            #env (Env): The environment
            #keep_dim (bool): If `True`, a singleton dimension will be added
            
        super().__init__(env)
        self.keep_dim = keep_dim

        assert (
            len(env.observation_space.shape) == 3
            and env.observation_space.shape[-1] == 3
        )

        obs_shape = self.observation_space.shape[:2]
        if self.keep_dim:
            self.observation_space = Box(
                low=0, high=255, shape=(obs_shape[0], obs_shape[1], 1), dtype=np.uint8
            )
        else:
            self.observation_space = Box(
                low=0, high=255, shape=obs_shape, dtype=np.uint8
            )

In [None]:
#testing the environment/agent with sample action
done = True

for step in range(100000):
    if done:
        env.reset()
    state, reward, done, info = env.step(env.action_space.sample())
    env.render()
env.close()

In [4]:
#importing needed Wrappers 
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv

from matplotlib import pyplot as plt

In [5]:
#initialising and formating the environment for the CNN
env = gym_super_mario_bros.make("SuperMarioBros-v0")
env = JoypadSpace(env, SIMPLE_MOVEMENT)
env = GrayScaleObservation(env, keep_dim = True)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order = "last")

In [None]:
state = env.reset()
state, reward, done, info = env.step([env.action_space.sample()])
state.shape
plt.imshow(state[0])

In [7]:
#importing the Proximal Policy Optimization algorithm 
import os 

from stable_baselines3 import PPO

from stable_baselines3.common.callbacks import BaseCallback

In [8]:
#Creating a Callback class to save the model 
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [9]:
#initalising the callback path
CHECKPOINT_DIR = "./train/"
LOG_DIR="./logs/"
callback = TrainAndLoggingCallback(check_freq = 1000, save_path = CHECKPOINT_DIR)

In [None]:
#Initialising the model with Proximal Policy Optimization
model = PPO("CnnPolicy",env,verbose = 1, tensorboard_log=LOG_DIR, learning_rate = 0.000001, n_steps = 512)

In [10]:
#Initialising the learning process. 
model.learn(total_timesteps = 100000, callback = callback)

In [None]:
#loading saved model 
model=PPO.load("./train/best_model_14000")

In [None]:
# Experiment 
state = env.reset()

while True:
    action, _state = model.predict(state)
    state, reward, done, info = env.step(action)
    env.render()