# Dependencies 

In [1]:
## Setup
# import environment
import gym_super_mario_bros
# import joypad
from nes_py.wrappers import JoypadSpace
# import controls
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

## Preprocessing
from gym.wrappers import GrayScaleObservation, ResizeObservation
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

## modeling
import torch
from torch import nn
from torchvision import transforms as T

from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os, copy

## Agent training
import os

## logging
import numpy as np
import time, datetime
import matplotlib.pyplot as plt


In [2]:
CHECKPOINT_PATH = "./checkpoints_pytorch"
LOG_PATH = "./logs"

#### Agent actions
These are the possible actions that the agent can take in the environment.

In [3]:
SIMPLE_MOVEMENT

[['NOOP'],
 ['right'],
 ['right', 'A'],
 ['right', 'B'],
 ['right', 'A', 'B'],
 ['A'],
 ['left']]

# Setup environment
For this project we well be making use if the gym-super-mario-bros enviroment.

In [4]:

env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", render_mode='rgb', apply_api_compatibility=True)
env.action_space

  logger.warn(
  logger.warn(


Discrete(256)

In [5]:
env.reset()
next_state, reward, done, trunc, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")


(240, 256, 3),
 0.0,
 False,
 {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'y_pos': 79}


  if not isinstance(terminated, (bool, np.bool8)):


Simplify the action space to limit the number of possible actions. This will make it easier for the agent to learn the optimal policy as there will be less possible actions to choose from.

In [6]:
# wrap environment with controls
env = JoypadSpace(env, [["right"], ["right", "A"]])
env.action_space

Discrete(2)

The observation space serves as the input to the agent. The agent will use this information to learn the optimal policy.
In our case the observation space is a frame from the game.

In [7]:
env.observation_space.shape

(240, 256, 3)

Test the environment with random actions.

In [8]:
env_demo = env

# done = True # reset the env
# # loop X steps
# for step in range(20):
#     if done:
#         # start the env
#         env_demo.reset()
#     # pass a random action to the env
#     print(env_demo.action_space.sample())
#     state, reward, done, info = env_demo.step(env_demo.action_space.sample()) 
#     # render the env
#     env_demo.render()
# env_demo.close()


Each step that the agent takes in the enviroment will return a new state. This state is the observation space for the next step. In our case an image of the game:

In [9]:
# state.shape
# # plt.imshow(state)

The reward function form the enviroment assumes that the objective is to move as far to the right as possible, as fast as possible and without dying. The value of the reward reflects this. More on how this reward is calculated can be found here, in the gym-super-mario-bros documentation. https://pypi.org/project/gym-super-mario-bros/

In [10]:
# reward

To check wether or not the game is running we use the done variable

In [11]:
# done

Miscellaneous information about the environment is found in the info variable.

In [12]:
# info

# Preprocessing Environment
To make use of the data that the environment returns we need to preprocess it. The two steps we will take are:
1. Convert the image to grayscale - This will reduce the size of the observation space and make it easier for the agent to learn the optimal policy.
2. Frame stacking - This gives the agent a sense of motion and context and helps it understand the dynamics of the game.

### Wrap the environment:

In [13]:
class FrameSkip(gym.Wrapper):
    def __init__(self, env, skip=4):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0

        for _ in range(self._skip):
            # accumulate reward
            state, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return state, total_reward, done, info

In [14]:
# frame skip
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info

In [15]:
# Grayscale
print("Input shape before grayscale: ", env.observation_space.shape)
class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation
print("Input shape after grayscale: ", env.observation_space.shape)


Input shape before grayscale:  (240, 256, 3)
Input shape after grayscale:  (240, 256, 3)


In [16]:
# Wrap the environment with the wrapper
# env = DummyVecEnv([lambda: env])


In [17]:
# reize observation
class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation

In [18]:
# FrameStack
# env = VecFrameStack(env, 4, channels_order='last')

In [19]:
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
env = FrameStack(env, num_stack=4)

# Build the agent

In [20]:
class MarioNet(nn.Module):
    """
    Small CNN
    """
    def __init__(self, input_dim, output_dim):
        super().__init__()
        c, h, w= input_dim
        
        if h != 84:
            raise ValueError(f"Expecting input height: 84, got: {h}")
        if w != 84:
            raise ValueError(f"Expecting input width: 84, got: {w}")
        
        self.online = nn.Sequential(
            nn.Conv2d(c, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim),
        )

        self.target = copy.deepcopy(self.online)

        for p in self.target.parameters():
            p.requires_grad = False
    def forward(self, input, model):
        if model == "online":
            return self.online(input)
        elif model == "target":
            return self.target(input)

class MarioAgent:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.net = MarioNet(self.state_dim, self.action_dim).float().to(self.device)

        self.exploration_rate = 1
        self.exploration_rate_decay = 0.99999975
        self.exploration_rate_min = 0.1
        self.curr_step = 0
        self.save_every = 10000 
        self.memory = deque(maxlen=100000)
        self.batch_size = 32
        self.gamma = 0.9

        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
        self.loss_fn = torch.nn.SmoothL1Loss()

        self.burnin = 1e4 # min. experiences before training
        self.learn_every = 3 # no. of experiences between updates to Q_online
        self.sync_every = 1e4 # no. of experiences between Q_target & Q_online sync

    
    def act(self, state):
        """ Given a state, choose a epsilon-greedy action """
        if random.random() < self.exploration_rate:
            action_idx = random.randint(0, self.action_dim - 1)
        else:
            state = state.__array__() 
            state = torch.tensor(state).unsqueeze(0).to(self.device)
            action_values = self.net(state, model="online")
            action_idx = torch.argmax(action_values, axis=1).item()

        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)

        # Increment step
        self.curr_step += 1
        return action_idx

    def cache(self, state, next_state, action, reward, done):
        """ Add to memory """
        def is_first_tuple(x):
            return x[0] if isinstance(x, tuple) else x
        
        state = is_first_tuple(state).__array__()
        next_state = is_first_tuple(next_state).__array__()

        state = torch.tensor(state, device=self.device)
        next_state = torch.tensor(next_state, device=self.device)
        action = torch.tensor([action], device=self.device)
        reward = torch.tensor([reward], device=self.device)
        done = torch.tensor([done], device=self.device)

        self.memory.append((state, next_state, action, reward, done))
    
    def recall(self):
        """ Sample from memory """
        batch = random.sample(self.memory, self.batch_size)
        state, next_state, action, reward, done = map(torch.stack, zip(*batch))
        return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()
    

    def td_estimate(self, state, action):
        """ Estimate TD(0) """
        current_Q = self.net(state, model="online")[np.arange(0, self.batch_size), action]
        return current_Q
    
    @torch.no_grad()
    def td_target(self, reward, next_state, done):
        next_state_Q = self.net(next_state, model="online")
        best_action = torch.argmax(next_state_Q, axis=1)
        next_Q = self.net(next_state, model="target")[np.arange(0, self.batch_size), best_action]

        return (reward + (1 - done.float()) * self.gamma * next_Q).float()
    
    def update_Q(self, td_estimate, td_target):
        loss = self.loss_fn(td_estimate, td_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()
    
    def sync_Q_target(self):
        self.net.target.load_state_dict(self.net.online.state_dict())

    def save(self):
        save_path = (f"{CHECKPOINT_PATH}/ mario_net_{self.curr_step}.chkpt")
        torch.save(dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate), save_path)
        print(f"MarioNet saved to {save_path} at step {self.curr_step}")

    def learn(self):
        """  """
        if self.curr_step % self.sync_every == 0:
            self.sync_Q_target()
        
        if self.curr_step % self.save_every == 0:
            self.save()

        if self.curr_step < self.burnin:
            return None, None
        
        if self.curr_step % self.learn_every != 0:
            return None, None
        
        # Sample from memory
        state, next_state, action, reward, done = self.recall()

        # Get TD Estimate
        td_est = self.td_estimate(state, action)

        # Get TD Target
        td_tgt = self.td_target(reward, next_state, done)

        # Backpropagate loss through Q_online
        loss = self.update_Q(td_est, td_tgt)

        return (td_est.mean().item(), loss)


In [21]:
class MetricLogger:
    def __init__(self, save_dir):
        self.save_dir = save_dir / "log"
        with open(self.save_dir, "w") as f:
            f.write(
                f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
                f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}\n"
                f"{'timeDelta':>15}{'Time':>20}\n"    
                )
        self.ep_rewards_plot = save_dir / "reward_plot.jpg"
        self.ep_lengths_plot = save_dir / "length_plot.jpg"
        self.ep_avg_losses_plot  = save_dir / "loss_plot.jpg"
        self.ep_avg_qs_plot  = save_dir / "q_plot.jpg"

        # Metrics
        self.ep_rewards = []
        self.ep_lengths = []
        self.ep_avg_losses = []
        self.ep_avg_qs  = []

        # Moving averages
        self.moving_avg_ep_rewards = []
        self.moving_avg_ep_lengths = []
        self.moving_avg_ep_avg_losses = []
        self.moving_avg_ep_avg_qs = []

        # Current episode
        self.init_episode()

        # Timing
        self.record_time = time.time()

    def log_step(self, reward, loss, q_value):
        self.curr_ep_reward += reward
        self.curr_ep_length += 1
        if loss:
            self.curr_ep_loss += loss
            self.curr_ep_q_value += q_value
            self.curr_ep_loss_length += 1
    
    def log_episode(self):
        #end of episode
        self.ep_rewards.append(self.curr_ep_reward)
        self.ep_lengths.append(self.curr_ep_length)
        if self.curr_ep_loss_length == 0:
            ep_avg_loss = 0
            ep_avg_q_value = 0
        else:
            ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
            ep_avg_q_value = np.round(self.curr_ep_q_value / self.curr_ep_loss_length, 5)
        self.ep_avg_losses.append(ep_avg_loss)
        self.ep_avg_qs.append(ep_avg_q_value)

        self.init_episode()

    def init_episode(self):
        self.curr_ep_reward = 0
        self.curr_ep_length = 0
        self.curr_ep_loss = 0
        self.curr_ep_q_value = 0
        self.curr_ep_loss_length = 0
    
    def record(self, episode, epsilon, step):
        mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
        mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
        mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
        mean_ep_q_value = np.round(np.mean(self.ep_avg_qs[-100:]), 3)

        self.moving_avg_ep_rewards.append(mean_ep_reward)
        self.moving_avg_ep_lengths.append(mean_ep_length)
        self.moving_avg_ep_avg_losses.append(mean_ep_loss)
        self.moving_avg_ep_avg_qs.append(mean_ep_q_value)

        last_record_time = self.record_time
        self.record_time = time.time()
        time_delta = np.round(self.record_time - last_record_time, 3)

        print(
            f"Episode {episode} - "
            f"Step {step} - "
            f"Epsilon {epsilon} - "
            f"Mean Reward {mean_ep_reward} - "
            f"Mean Length {mean_ep_length} - "
            f"Mean Loss {mean_ep_loss} - "
            f"Mean Q Value {mean_ep_q_value} - "
            f"Time Delta {time_delta}"
            f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
        )

        with open(self.save_dir, "a") as f:
            f.write(
                f"{episode:8d}{step:8d}{epsilon:10.3f}"
                f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_loss:15.3f}\n"
                f"{time_delta:15.3f}"
                f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
            )
        for metric in ["ep_rewards", "ep_lengths", "ep_avg_losses", "ep_avg_qs"]:
            plt.plot(getattr(self, f"moving_avg_{metric}"))
            plt.savefig(getattr(self, f"{metric}_plot"))
            plt.clf()
            

# Training the agent


In order to save the model we will use a callback function in order to keep good pratices and avoid losing the model in case of a crash or any other problem.

In [22]:
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")

save_dir = Path(CHECKPOINT_PATH) / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)
# (240, 256, 3)
agent = MarioAgent(state_dim=(4, 84, 84), action_dim=env.action_space.n)
logger = MetricLogger(save_dir)

episodes = 50000
for e in range(episodes):
    state = env.reset()
    
    while True:
        action = agent.act(state)
        # print(action)
        
        next_state, reward, done, trunc, info = env.step(action)

        agent.cache(state, next_state, action, reward, done)

        q, loss = agent.learn()

        logger.log_step(reward, loss, q)

        state = next_state

        if done or info["flag_get"]:
            break

    logger.log_episode()

    if e % 20 == 0:
        logger.record(episodes, agent.exploration_rate, agent.curr_step)

Using CUDA: True




Episode 50000 - Step 102 - Epsilon 0.9999745003219311 - Mean Reward 627.0 - Mean Length 102.0 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 0.504Time 2023-05-29T13:46:23




Episode 50000 - Step 3777 - Epsilon 0.9990561955456959 - Mean Reward 537.286 - Mean Length 179.857 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 22.94Time 2023-05-29T13:46:46




Episode 50000 - Step 7876 - Epsilon 0.9980329369629195 - Mean Reward 583.585 - Mean Length 192.098 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 23.093Time 2023-05-29T13:47:09




RuntimeError: Parent directory CHECKPOINT_PATH does not exist.

<Figure size 640x480 with 0 Axes>

# Testing the agent

#