<a href="https://colab.research.google.com/github/Offliners/RobotLab-MLDL-Training-2022/blob/main/tutorial-5/colab/tutorial-5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tutorial 5 - Super Mario

In [None]:
!nvidia-smi

# Install necessary packages

In [None]:
!pip install gym==0.21.0
!pip install gym-super-mario-bros==7.3.0
!pip install stable-baselines3==1.6.0

# Import packages

In [None]:
import os
import copy
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib
import gym_super_mario_bros
import gym
from nes_py.wrappers import JoypadSpace
from gym.wrappers import GrayScaleObservation
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv
import torch
from torch.utils.tensorboard import SummaryWriter
from IPython.display import display, HTML

# Runtime Arguments

In [None]:
class Args:
    def __getitem__(self, key):
        return getattr(self, key)

    def __setitem__(self, key, val):
        setattr(self, key, val)

    #@markdown Random seed
    seed = 3366 #@param

    #@markdown Super Mario Environment Setting
    world = 1 #@param
    stage = 1 #@param
    version = 0 #@param
    action_type = 'simple' #@param
    num_skip_frame = 4 #@param
    downsample_rate = 3 #@param
    num_stack_frame = 4 #@param

    #@markdown Hyperparameter
    total_timestep = 400000 #@param
    max_step = 1000 #@param
    step = 512 #@param
    episode = 20 #@param
    lr = 5e-5 #@param
    epoch = 10 #@param
    batchsize = 128 #@param
    gamma = 0.9 #@param
    check_freq = 10000 #@param

    #@markdown Workspace setting
    save_model_dir = './checkpoints/model' #@param
    tensorboard = './checkpoints/tensorboard' #@param
    output_video = './video' #@param


args = Args()

# Build Workspace

In [None]:
os.makedirs('./checkpoints', exist_ok=True)
os.makedirs('./checkpoints/model', exist_ok=True)
os.makedirs(args.tensorboard, exist_ok=True)
os.makedirs(args.output_video, exist_ok=True)

# Set Random Seed

In [None]:
def same_seed(seed): 
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)

# Create environment

In [None]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            if done:
                break
        return obs, reward, done, info


class Downsample(gym.ObservationWrapper):
    def __init__(self, env, ratio):
        gym.ObservationWrapper.__init__(self, env)
        (oldh, oldw, oldc) = env.observation_space.shape
        newshape = (oldh//ratio, oldw//ratio, oldc)
        self.observation_space = gym.spaces.Box(low=0, high=255,
            shape=newshape, dtype=np.uint8)

    def observation(self, frame):
        height, width, _ = self.observation_space.shape
        frame = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
        if frame.ndim == 2:
            frame = frame[:,:,None]
        return frame


class CustomRewardEnv(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
    
    def step(self, action):
        state, reward, done, info = self.env.step(action)
        
        if done: 
            reward = 1 if info['flag_get'] else -1
        else:
            reward /= 30
        
        return state, reward, done, info


env = gym_super_mario_bros.make(f'SuperMarioBros-{args['world']}-{args['stage']}-v{args['version']')

if args['action_type'] == "right":
    actions = RIGHT_ONLY
elif args['action_type'] == "simple":
    actions = SIMPLE_MOVEMENT
elif args['action_type'] == 'complex':
    actions = COMPLEX_MOVEMENT
else:
    print('Unknown action type!')

env = JoypadSpace(env, actions)
env = SkipFrame(env, skip=args['num_skip_frame'])
env = CustomRewardEnv(env)
env = GrayScaleObservation(env, keep_dim=True)
env = Downsample(env, args['downsample_rate'])
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, args['num_stack_frame'], channels_order='last')

# Callback

In [None]:
class TrainCallback(BaseCallback):
    def __init__(self, args, env, model, verbose=1):
        super(TrainCallback, self).__init__(verbose)
        self.args = args
        self.env = env
        self.model = model
        self.writer = SummaryWriter(args['tensorboard'])

    def _on_step(self):
        n_episodes = self.args['episode']
        if self.n_calls % self.args['check_freq'] == 0:
            model_path = os.path.join(self.args['save_model_dir'], 'mario_world_{}_{}.pth'.format(self['args.world'], self['args.stage']))
            self.model.save(model_path)

            total_reward = [0] * n_episodes
            total_time = [0] * n_episodes
            best_reward = 0
            for i in range(n_episodes):
                state = self.env.reset()
                done = False
                total_reward[i] = 0
                total_time[i] = 0
                while not done and total_time[i] < self.args['max_step']:
                    action, _ = self.model.predict(state)
                    state, reward, done, info = self.env.step(action)
                    total_reward[i] += reward[0]
                    total_time[i] += 1

                if total_reward[i] > best_reward:
                    best_reward = total_reward[i]

                state = self.env.reset()

            reward_avg = round(sum(total_reward) / n_episodes, 3)
            best_reward = round(best_reward, 3)
            print(f'[ Train | {self.n_calls}/{self.args['total_timestep']} ] average reward = {reward_avg}, best reward = {best_reward}')

            self.writer.add_scalars('Reward', {'average reward' : reward_avg, 'best reward' : best_reward}, self.n_calls)

        return True


callback = TrainCallback(args, env, model)

# Model

In [None]:
model = PPO('CnnPolicy', env, verbose=0, tensorboard_log=args['tensorboard'], learning_rate=args['lr'], n_steps=args['step'],
        batch_size=args['batchsize'], n_epochs=args['epoch'], gamma=args['gamma'])

# Tensorboard

In [None]:
%reload_ext tensorboard
%tensorboard --logdir=./checkpoints/tensorboard/

# Start training!

In [None]:
model.learn(total_timesteps=args['total_timestep'], callback=callback)

# Start testing!

In [None]:
n_episodes = args['episode']
total_reward = [0] * n_episodes
total_action = [0] * n_episodes
best_reward = 0
flags = 0
frames_best = []
for i in range(n_episodes):
    state = env.reset()
    done = False
    total_reward[i] = 0
    total_action[i] = 0
    frames = []
    flag = 0
    while not done and total_action[i] < args['max_step']:
        action, _ = model.predict(state)
        state, reward, done, info = env.step(action)
        total_reward[i] += reward[0]
        total_action[i] += 1
        frames.append(copy.deepcopy(env.render(mode='rgb_array')))

        if info[0]["flag_get"]:
            flag = 1
            flags += 1
            break 

    if total_reward[i] > best_reward:
        best_reward = total_reward[i]
        frames_best = copy.deepcopy(frames)

    if flag:
        print(f'[ Test | {i + 1}/{n_episodes} ] reward = {round(total_reward[i], 3)}, action step = {total_action[i]}, World {args['world']}-{args['stage']} completed!')
    else:
        print(f'[ Test | {i + 1}/{n_episodes} ] reward = {round(total_reward[i], 3)}, action step = {total_action[i]}')

avg_action_step = round(sum(total_action) / n_episodes, 3)
avg_reward = round(sum(total_reward) / n_episodes, 3)
print(f'average reward = {avg_reward}, average action step = {avg_action_step}, best_reward = {round(best_reward, 3)}')
print(f'Complete rate : [{flags}/{n_episodes}]')

# Gameplay display

In [None]:
frames_new = np.array(frames_best)
matplotlib.rcParams['animation.embed_limit'] = 2**128
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi = 72)
patch = plt.imshow(frames_new[0])
plt.axis('off')
plt.tight_layout(pad=0, h_pad=0, w_pad=0)
animate = lambda i: patch.set_data(frames_new[i])
ani = matplotlib.animation.FuncAnimation(plt.gcf(), animate, frames=len(frames_new), interval=50)
display(HTML(ani.to_jshtml()))
plt.close()

# Download gameplay video

In [None]:
FFwriter = animation.FFMpegWriter(fps=20, extra_args=['-vcodec', 'mpeg4'])
ani.save(os.path.join(args['output_video'], f'video_world_{args['world']}_{args['stage']}.mp4'), writer=FFwriter)

In [None]:
from google.colab import files
files.download(os.path.join(args['output_video'], f'video_world_{args['world']}_{args['stage']}.mp4'))