In [1]:
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from gym.wrappers import GrayScaleObservation
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
import matplotlib.pyplot as plt
import os
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.results_analysis import ts2xy
from stable_baselines3.common.results_analysis import load_results
import numpy as np
from stable_baselines3 import A2C

In [None]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)

In [None]:
done = True
for step in range(1000):
    if done:
        state = env.reset()
    state, reward, done, info = env.step(env.action_space.sample())
    env.render()
env.close()

In [None]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)
env = GrayScaleObservation(env, keep_dim=False)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, n_stack=4, channels_order='chw')
#add a neural network of 

In [None]:
state = env.reset()
state, reward, done, info = env.step(env.action_space.sample())

In [None]:
#plot the first 5 frames
plt.figure(figsize=(16, 8))
for index, frame in enumerate(state[:6]):
    plt.subplot(2, 3, index + 1)
    plt.imshow(frame)
    plt.axis('off')
plt.show()

In [None]:
class trainingCallback(BaseCallback):
    def __init__(self, check_freq: int, log_dir: str, verbose=1):
        super(trainingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.log_dir = log_dir
        self.best_mean_reward = -np.inf

    def _init_callback(self) -> None:
        if self.verbose > 0:
            print("Training callback is initialized")

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:
            x, y = ts2xy(load_results(self.log_dir), 'timesteps')
            if len(x) > 0:
                mean_reward = np.mean(y[-100:])
                if self.verbose > 0:
                    print("Num timesteps: {}".format(self.num_timesteps))
                    print("Best mean reward: {:.2f} - Last mean reward per episode: {:.2f}".format(self.best_mean_reward, mean_reward))
                if mean_reward > self.best_mean_reward:
                    self.best_mean_reward = mean_reward
                    if self.verbose > 0:
                        print("Saving new best model to {}".format(self.log_dir))
                    self.model.save(self.log_dir + 'best_model')
        return True

In [None]:
checkpoint_dir = ''
logs_dir = ''

In [None]:
callback = trainingCallback(check_freq=1000, save = checkpoint_dir, log_dir = logs_dir)

In [None]:
model1 = PPO('CnnPolicy', env, verbose=1, tensorboard_log=logs_dir, learning_rate= 0.03 n_steps= 256, batch_size= 256, n_epochs= 10, gamma= 0.99, gae_lambda= 0.95, clip_range= 0.2, ent_coef= 0.0, vf_coef= 0.5, max_grad_norm= 0.5, use_sde= False, sde_sample_freq= -1, target_kl= 0.01, seed= None, device= 'auto', _init_setup_model= True, policy_kwargs= None, full_tensorboard_log= False)
model1.learn(total_timesteps=1000000, callback=callback)

In [None]:
model1.save = ('mario_model1')

In [None]:
#set up the A2C model
model2 = A2C('CnnPolicy', env, verbose=1, tensorboard_log=logs_dir, learning_rate= 0.03 n_steps= 256, batch_size= 256, n_epochs= 10, gamma= 0.99, gae_lambda= 0.95, clip_range= 0.2, ent_coef= 0.0, vf_coef= 0.5, max_grad_norm= 0.5, use_sde= False, sde_sample_freq= -1, target_kl= 0.01, seed= None, device= 'auto', _init_setup_model= True, policy_kwargs= None, full_tensorboard_log= False)
model2.learn(total_timesteps=1000000, callback=callback)

In [None]:
model2.save = ('mario_model2') 

In [None]:
#plot the performance of both models and compare
results = load_results(logs_dir)
plt.plot(results['timesteps'], results['r'], label='PPO')
results = load_results(logs_dir)
plt.plot(results['timesteps'], results['r'], label='A2C')
plt.xlabel('Timesteps')
plt.ylabel('Rewards')
plt.title('Training Performance')
plt.legend()
plt.show()

In [None]:
#create a table to compare the performance of both models
from tabulate import tabulate
table = [["Model", "Mean Reward", "Std Reward"],
         ["PPO", "Mean Reward", "Std Reward"],
         ["A2C", "Mean Reward", "Std Reward"]]
print(tabulate(table))
