In [1]:
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
from pathlib import Path
from collections import deque
import random, time, datetime, os, copy
import numpy as np

# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace

# Super Mario environment for OpenAI Gym
import gym_super_mario_bros

# imageio is used to save the training progress as a gif
import imageio

# plotly is used to view the training progress
import plotly.express as px

# kaleido is used to export the plotly figure as a png
import kaleido

# pyvirtualdisplay is used to create a virtual display to watch the agent play
from pyvirtualdisplay import Display

# progress bars:
from tqdm.notebook import tqdm

  warn(f"Failed to load image Python extension: {e}")


In [2]:
smbe = gym_super_mario_bros.SuperMarioBrosEnv()
gym_super_mario_bros.make('SuperMarioBros-v0')

  logger.warn(


<TimeLimit<OrderEnforcing<PassiveEnvChecker<SuperMarioBrosEnv<SuperMarioBros-v0>>>>>

In [3]:
import pandas as pd
s = pd.Series(dir(gym_super_mario_bros))
s[~s.str.contains("__") & ~s.str.startswith("_")]

0                 SuperMarioBrosEnv
1     SuperMarioBrosRandomStagesEnv
14                             make
15                          smb_env
16            smb_random_stages_env
dtype: object

# RL Definitions
- **Environment** The world that an agent interacts with and learns from.
- **Action** $a$ : How the Agent responds to the Environment. The set of all possible Actions is called *action-space*.
- **State** $s$ : The current characteristic of the Environment. The set of all possible States the Environment can be in is called *state-space*.
- **Reward** $r$: Reward is the key feedback from Environment to Agent. It is what drives the Agent to learn and to change its future action. An aggregation of rewards over multiple time steps is called **Return**
- **Optimal Action-Value function** $Q^*(s,a)$ : Gives the expected return if you start in state $s$, take an arbitrary action $a$, and then for each future time step take the action that maximizes returns. $Q$ can be said to stand for the “quality” of the action in a state. We try to approximate this function.
- **Q-Value** $Q(s,a)$ : The expected return if you start in state $s$, take action $a$, and then for each future time step take the action that maximizes returns. $Q$ can be said to stand for the “quality” of the action in a state. We try to approximate this function.

## Environment

### Initialize Environment

In [4]:
import warnings
from gym.utils import passive_env_checker
warnings.filterwarnings("ignore", category=UserWarning, module="gym.envs.registration")
warnings.filterwarnings("ignore", category=DeprecationWarning, module=passive_env_checker.__name__)
warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)

# version of super mario bros
smb_version = 'SuperMarioBros-1-1-v3'

# initialize env based on gym version
if gym.__version__ < "0.26":
    env = gym_super_mario_bros.make(smb_version
                                    , new_step_api=True)
else:
    env = gym_super_mario_bros.make(smb_version
                                    , render_mode='human'
                                    , apply_api_compatibility=True
                                    # , mode='rgb_array'
                                    )

# limit the possible actions to:
#   0. walk right
#   1. jump right
env = JoypadSpace(env,
                  [
                    # 0. walk right
                    ["right"]
                    
                    # 1. jump right
                    , ["right", "A"]
                    ])

env.metadata['render_modes'] = ['human', 'rgb_array']

env.reset()

next_state, reward, done, trunc, info = env.step(action=0)
print(f"\nnext_state.shape: {next_state.shape},\n reward: {reward},\n done: {done}\n")

for k, v in info.items():
    print(f"{k}: {v}")


next_state.shape: (240, 256, 3),
 reward: 0.0,
 done: False

coins: 0
flag_get: False
life: 2
score: 0
stage: 1
status: small
time: 400
world: 1
x_pos: 40
y_pos: 79


### Preprocess environment

- mario doesn't need to know *everything* about the environment
- only needs to know what it needs to know to make a decision

preprocessing the environment is a way to reduce the state space, and make it easier for the agent to learn

#### preprocessing steps:
Use a bunch of `Wrappers` from `gym.wrappers` to preprocess the environment
1. convert to grayscale `GreyScaleObservation` 
    - reduces the number of channels from 3 to 1
    - new `next_state.shape` is `(1, 240, 256)`
2. downsample/resize to a square image: `ResizeObservation` 
    - reduces the size of the image
    - new `next_state.shape` is `(1, 84, 84)`
3. consecutive frames don't change much, so we can skip every nth frame with `SkipFrame` 
    - reduces the number of frames
    - new `next_state.shape` is `(1, 84, 84)`, but the total number of frames is reduced by a factor of `n`
    - `SkipFrame` inherits from `gym.Wrapper`, which is a wrapper for the environment
4. stack the frames to get a short history of the environment: `FrameStack` 
    - squashes the frames into a single array
    - we can ID if mario is moving left or right or jumping by looking at the last few frames

#### 1. Convert to Grayscale

In [5]:
class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation

#### 2. Downsample/Resize to a Square Image

In [6]:
class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation

#### 3. Skip Every Nth Frame

In [7]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info

#### 4. Stack the Frames

In [8]:
# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
if gym.__version__ < '0.26':
    env = FrameStack(env, num_stack=4, new_step_api=True)
else:
    env = FrameStack(env, num_stack=4)

## Agent

Create a `Mario` class to represent the agent in the game. `Mario` should be able to:
1. take an action according to the optimal action policy based on the current state of the environment
2. remember experiences, where an experience = (current state, current action, reward, next state)
   -  `Mario` caches and later recalls experiences to update the action policy
3. learn a better action policy based on the experiences it has had


In [12]:
# Create a virtual display if on a linux server
if os.name == "posix":
    display = Display(visible=0, size=(1400, 900))
    display.start()


use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()

save_dir = Path("checkpoints") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)

mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)

logger = MetricLogger(save_dir)

episodes = 10
# Initialize a list to store the frames for each episode
frames = []

for e in tqdm(range(episodes), desc="Episodes"):
    state = env.reset()
    # Clear the frames list for each episode
    episode_frames = []

    while True:
        action = mario.act(state)
        next_state, reward, done, trunc, info = env.step(action)

        # Render the environment and save the frame
        frame = env.render()
        episode_frames.append(frame)

        mario.cache(state, next_state, action, reward, done)
        q, loss = mario.learn()
        logger.log_step(reward, loss, q)
        state = next_state

        if done or info["flag_get"]:
            break

    print(f"Episode {e} - Logging Episode")
    logger.log_episode()
    if e % 20 == 0:
        print(f"Episode {e} - Saving Model")
        logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)

    # Add the episode frames to the main frames list
    frames.append(episode_frames)

# Save the frames as a video
print("Saving video")
imageio.mimsave('mario_gameplay.mp4', [frame for episode_frames in frames for frame in episode_frames], fps=30)


Using CUDA: False



Episodes:   0%|          | 0/10 [00:00<?, ?it/s]

  logger.warn(


Episode 0 - Step 534 - Epsilon 0.9998665088940238 - Mean Reward 1264.0 - Mean Length 534.0 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 8.075 - Time 2023-04-23T08:21:23
