<a href="https://colab.research.google.com/github/ZahraAlharz/Oxford-AI-Summer-School/blob/main/Copy_of_2SuperMario_PPO_HW_Oxford.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Task

We will train a PPO agent which learns to play the classic super mario game.

You can use the stable baselines implementation of PPO or right your own version.

For the env, we will use gym_super_mario_bros. Read more about it [Here](https://github.com/Kautenja/gym-super-mario-bros/)

Note that the stable-baselines3 implementations expect a gymnasium environment and not a gym environment (gymnasium is the upgraded form of gym. gym is depreciated but we can still find a lot of environments made in it.)

Fortunately, gymnasium has a way to resolve that issue and convert a gym env to a gymnasium env. We do need to install a compatible version of gym though.

In [None]:
#!pip install gym_super_mario_bros==7.3.0 nes_py
#!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/rocm5.4.2
#!pip install stable-baselines3[extra]

In [None]:
%pip install swig
%pip install stable-baselines3 gymnasium[all] gym_super_mario_bros nes_py gym==0.10.9  # might need a restart of the session.



In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import BaseCallback

from gymnasium.wrappers import GrayScaleObservation
import gymnasium as gym
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from nes_py.wrappers import JoypadSpace

import os
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

## Making the environment

On top of making the gym requirement, we will make a vectorized environment (provided by stable baselines 3)

This introduces training over multiple environments simultaneously, making the traning faster. We will use DummyVecEnv which doesn't actually use subprocesses but if we were working with a complex environment with higher compute time, we could also use SubProcessVecEnv

Think about what wrappers you can use to make the job easier. You can also make the action-space simpler. Read more about it in the env page referenced above.

Use `'SuperMarioBros-v0'` version of environment

In [None]:
# Create the base Super Mario Bros environment
env = gym_super_mario_bros.make('SuperMarioBros-v0')

# Apply the JoypadSpace wrapper for simplified controls
env = JoypadSpace(env, SIMPLE_MOVEMENT)

env = gym.make("GymV21Environment-v0", env=env, render_mode="rgb_array")

# Apply grayscale conversion
env = GrayScaleObservation(env, keep_dim=True)

env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order="last")
#env = VecTransposeImage(env)  # Transpose the image from (H, W, C) to (C, H, W)

  result = entry_point.load(False)
  logger.warn(


## Creating and training the model

In [None]:
CHECKPOINT_DIR = './train'

LOG_DIR = './logs'

In [None]:
def custom_clip_range(a):
    a = 0.2
    return a

def custom_lr_schedule(lr):
    lr = 1e-4
    if lr > 1e-5:
        lr *= 0.999
    return lr

In [None]:
class TrainAndLoggingCallback(BaseCallback):
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        # Save the model and track training progress
        if self.num_timesteps % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.num_timesteps))
            self.model.save(model_path)

        return True

In [None]:
# Check if a previously trained model exists
if os.path.exists('./train/best_model.zip'):

    # Load the pre-trained model
    model_start = PPO.load('./train/best_model.zip', env, tensorboard_log=LOG_DIR, custom_objects={'clip_range': custom_clip_range, 'learning_rate': custom_lr_schedule})

    # Get the total number of steps completed during the previous training
    total_steps_completed = model_start.num_timesteps

    model = PPO.load('./train/best_model.zip', env, tensorboard_log=LOG_DIR, custom_objects={'clip_range': custom_clip_range, 'learning_rate': custom_lr_schedule})

    # Adjust the starting step count and the total number of training steps
    starting_step = total_steps_completed + 1
    total_training_steps = starting_step + 100000  # Resume training for 100,000 steps

else:
    # Create a new model if no pre-trained model exists
    model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=LOG_DIR, learning_rate=custom_lr_schedule, n_steps=2000)

    # Set the starting step count and the total number of training steps
    starting_step = 1
    total_training_steps = 100000  # Train for 100,000 steps

# Call back the trained and logged model after every 5000 steps (takes 150MB space for one run logged data for 5k steps) and save to CHECKPOINT_DIR.
callback = TrainAndLoggingCallback(check_freq=5000, save_path=CHECKPOINT_DIR)

We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=2000 and n_envs=1)


Using cuda device
Wrapping the env in a VecTransposeImage.


In [None]:
model.learn(total_timesteps=total_training_steps, callback=callback, reset_num_timesteps=False)

Logging to ./logs/PPO_0


  return (self.ram[0x86] - self.ram[0x071c]) % 256


-----------------------------
| time/              |      |
|    fps             | 155  |
|    iterations      | 1    |
|    time_elapsed    | 12   |
|    total_timesteps | 2000 |
-----------------------------
----------------------------------------
| time/                   |            |
|    fps                  | 105        |
|    iterations           | 2          |
|    time_elapsed         | 38         |
|    total_timesteps      | 4000       |
| train/                  |            |
|    approx_kl            | 0.01208765 |
|    clip_fraction        | 0.13       |
|    clip_range           | 0.2        |
|    entropy_loss         | -1.93      |
|    explained_variance   | 0.00194    |
|    learning_rate        | 9.99e-05   |
|    loss                 | 1.02       |
|    n_updates            | 10         |
|    policy_gradient_loss | -0.00428   |
|    value_loss           | 19         |
----------------------------------------
-----------------------------------------
| time/   

<stable_baselines3.ppo.ppo.PPO at 0x7996c3626b60>

In [None]:
#model = PPO.load('./train/best_model_10000.zip', env, custom_objects={'clip_range': custom_clip_range, 'learning_rate': custom_lr_schedule})

In [None]:
# Evaluate the model
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=16)
print(f'Mean reward: {mean_reward} +/- {std_reward}')



Mean reward: 643.0 +/- 0.0


## Visualizing the results

In [None]:
def frames_to_video(frames, fps=24):
    fig = plt.figure(figsize=(frames[0].shape[1] / 100, frames[0].shape[0] / 100), dpi=100)
    ax = plt.axes()
    ax.set_axis_off()

    if len(frames[0].shape) == 2:  # Grayscale image
        im = ax.imshow(frames[0], cmap='gray')
    else:  # Color image
        im = ax.imshow(frames[0])

    def init():
        if len(frames[0].shape) == 2:
            im.set_data(frames[0], cmap='gray')
        else:
            im.set_data(frames[0])
        return im,

    def update(frame):
        if len(frames[frame].shape) == 2:
            im.set_data(frames[frame], cmap='gray')
        else:
            im.set_data(frames[frame])
        return im,

    interval = 1000 / fps
    anim = FuncAnimation(fig, update, frames=len(frames), init_func=init, blit=True, interval=interval)
    plt.close()
    return HTML(anim.to_html5_video())

In [None]:
# Create a new environment for rendering
test_env = gym_super_mario_bros.make('SuperMarioBros-v0')
test_env = JoypadSpace(test_env, SIMPLE_MOVEMENT)

test_env = gym.make("GymV21Environment-v0", env=test_env, render_mode="rgb_array")
test_env = GrayScaleObservation(test_env, keep_dim=True)

test_env = DummyVecEnv([lambda: test_env])
test_env = VecFrameStack(test_env, 4, channels_order="last")

In [None]:
# Evaluate the model
state = test_env.reset()
frames = []

while True:
    action, _ = model.predict(state)
    state, _, done, _ = test_env.step(action)
    frames.append(test_env.render())
    if done:
        break
    if len(frames) > 9000:
        break

test_env.close()

  logger.warn(


In [None]:
frames_to_video(frames, fps=60)

In [None]:
model.save('mario')