## Installations & Imports

In [1]:
#Install the Super Mario game environment
!pip install gym-super-mario-bros==7.3.0 opencv-python

import os
import copy
import torch
from torch import nn
from pathlib import Path
from collections import deque
import random, datetime, numpy as np, cv2

import numpy as np
import time, datetime
import matplotlib.pyplot as plt

#Import Gym: an OpenAI tool for Reinforcement Learning
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack, GrayScaleObservation, TransformObservation
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

#NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace

#Super Mario environment for OpenAI Gym
import gym_super_mario_bros






# Intialize The Environment 

In [2]:
#Make the Super Mario environment
env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')

#Limit the possible actions to move right and move jump right
env = JoypadSpace(env,[['right'],['right', 'A']])

#Define next_state, reward, done, and info variables
env.reset()
next_state, reward, done, info = env.step(action=0)
print(f'{next_state.shape},\n {reward},\n {done},\n {info}')

  logger.warn(


(240, 256, 3),
 0.0,
 False,
 {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'x_pos_screen': 40, 'y_pos': 79}


# Preprocess The Environment

In [3]:
#Create class to downsize the observations 
class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    #Resize the obersvation
    def observation(self, observation):
        observation = cv2.resize(observation, self.shape, interpolation=cv2.INTER_AREA)
        return observation

#Create class to skip intermediate frames since frames do not differ drastically. The rewards accumulated over each skipped frame are aggregated every skip-th frame
class SkipFrame(gym.Wrapper):
    #Returns only every skip-th frame
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip

    #Sums the rewards over the skipped frames
    def step(self, action):
        total_reward = 0.0
        done = False
        for i in range(self._skip):
            #Aggregate the reward and repeat the same action
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info

#Apply wrappers to the enviornment
#Make it so that only every 4th frame is observered
env = SkipFrame(env, skip=4)
#Gray scale the observation to reduce size of observation
env = GrayScaleObservation(env, keep_dim=False)
#Resize the obsersavtion to be 84 x 84 instead of 240 x 256
env = ResizeObservation(env, shape=84)
env = TransformObservation(env, f=lambda x: x / 255.)
#Make it so that 4 consecutive frames are transformed into a single stack that is fed to the model as one observation.
env = FrameStack(env, num_stack=4)

# Create The Agent

In [4]:
#Intialize class to represent agent and the agent's functions
class Mario:
    def __init__():
        pass

    def act(self, state):
        pass

    def cache(self, experience):
        pass

    def recall(self):
        pass

    def learn(self):
        pass

    def save(self):
        pass

    def load(self, load_path):
        pass

# Act Function

In [5]:
class Mario: 
    def __init__(self, state_dim, action_dim, save_dir):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.save_dir = save_dir

        self.use_cuda = torch.cuda.is_available()
    
        #Intialize the neural network to predict the most optimal action
        self.net = MarioNet(self.state_dim, self.action_dim).float()
        if self.use_cuda:
            self.net = self.net.to(device='cuda')

        #Define the intial exploration rate, its rate of decay, and its minimum possible value
        self.exploration_rate = 1
        self.exploration_rate_decay = 0.99999975
        self.exploration_rate_min = 0.1
        self.curr_step = 0

        #Define the number of experiences that must ellapse since the last time the model was saved for it to be saved again
        self.save_every = 5e5

    #Function to choose an epsilon-greedy action given a state
    def act(self, state):
        #If the model chooses to explore
        if np.random.rand() < self.exploration_rate:
            #Take a random action
            action_idx = np.random.randint(self.action_dim)

        #Otherwise if it chooses to exploit
        else:
            #Use the neural network to determine the optimal action
            state = torch.FloatTensor(state).cuda() if self.use_cuda else torch.FloatTensor(state)
            state = state.unsqueeze(0)
            action_values = self.net(state, model='online')
            action_idx = torch.argmax(action_values, axis=1).item()

        #Decay the exploration rate
        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)

        #Increment the step counter
        self.curr_step += 1
        return action_idx

# Cache & Recall Functions

In [6]:
class Mario(Mario): # subclassing for continuity
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.memory = deque(maxlen=100000)
        self.batch_size = 32
    
    #Function to cache an experience represented by the state, next_state, action, reward, done into in the model's memory
    def cache(self, state, next_state, action, reward, done):
        state = torch.FloatTensor(state).cuda() if self.use_cuda else torch.FloatTensor(state)
        next_state = torch.FloatTensor(next_state).cuda() if self.use_cuda else torch.FloatTensor(next_state)
        action = torch.LongTensor([action]).cuda() if self.use_cuda else torch.LongTensor([action])
        reward = torch.DoubleTensor([reward]).cuda() if self.use_cuda else torch.DoubleTensor([reward])
        done = torch.BoolTensor([done]).cuda() if self.use_cuda else torch.BoolTensor([done])
        self.memory.append( (state, next_state, action, reward, done,) )

    #Function to get a batch of randomly selected experiences from the memory
    def recall(self):
        batch = random.sample(self.memory, self.batch_size)
        state, next_state, action, reward, done = map(torch.stack, zip(*batch))
        return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()

# Neural Network

In [7]:
class MarioNet(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        c, h, w = input_dim

        #If the observation was of incorrect size, raise an error
        if h != 84:
            raise ValueError(f"Expecting input height: 84, got: {h}")
        if w != 84:
            raise ValueError(f"Expecting input width: 84, got: {w}")

        #Intialize Q_online
        self.online = nn.Sequential(
            nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim)
        )

        #Intialize Q_target to be a copy of Q_online
        self.target = copy.deepcopy(self.online)

        #Freeze the Q_target parameters
        for p in self.target.parameters():
            p.requires_grad = False

    #Function to return the value of the input variable of the model from either Q_target or Q_online
    def forward(self, input, model):
        if model == 'online':
            return self.online(input)
        elif model == 'target':
            return self.target(input)

## TD Estimate & Target

In [8]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.gamma = 0.9

    #Calculate an estimate for TD using Q_online
    def td_estimate(self, state, action):
        current_Q = self.net(state, model='online')[np.arange(0, self.batch_size), action]
        return current_Q

    #no_grad() used since no need to backpropagate on td_target
    @torch.no_grad()
    #Calculate the aggregate of the estimated TD in the next state and the current reward
    def td_target(self, reward, next_state, done):
        next_state_Q = self.net(next_state, model='online')
        best_action = torch.argmax(next_state_Q, axis=1)
        next_Q = self.net(next_state, model='target')[np.arange(0, self.batch_size), best_action]
        return (reward + (1 - done.float()) * self.gamma * next_Q).float()

# Update & Sync Model

In [9]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
        self.loss_fn = torch.nn.SmoothL1Loss()

    #Function to update Q_online based on estimate for TD and aggregate of TD estiamte and current reward
    def update_Q_online(self, td_estimate, td_target) :
        loss = self.loss_fn(td_estimate, td_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()

    #Function to sync Q_target with Q_online by having Q_target's parameters become a copy of Q_online's parameters
    def sync_Q_target(self):
        self.net.target.load_state_dict(self.net.online.state_dict())

# Save Function

In [10]:
class Mario(Mario):
    #Function to save the model at a certain checkpoint
    def save(self):
        #Define the path the model will be saved too
        save_path = self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.ckpt"
        #Save the model
        torch.save(
            dict(
                model=self.net.state_dict(),
                exploration_rate=self.exploration_rate
            ),
            save_path
        )
        #Output where the model was saved to
        print(f"MarioNet saved to {save_path} at step {self.curr_step}")

# Load Model

In [11]:
class Mario(Mario):
    #Function to load a previously trained model at a give path denoted by load_path
    def load(self, load_path):
        #If load_path doesn't exist, raise an error
        if not load_path.exists():
            raise ValueError(f"{load_path} does not exist")

        #Load the model
        chkp = torch.load(load_path, map_location=('cuda' if self.use_cuda else 'cpu'))
        exploration_rate = chkp.get('exploration_rate')
        state_dict = chkp.get('model')

        #Print that the model was loaded, where it was loaded from, and its current exploration rate
        print(f"Loading model at {load_path} with exploration rate {exploration_rate}")
        self.net.load_state_dict(state_dict)
        self.exploration_rate = exploration_rate

# Learn Function

In [12]:
class Mario(Mario):
    def __init__(self, state_dim, action_dim, save_dir):
        super().__init__(state_dim, action_dim, save_dir)
        #Define the minimum number of experiences before the model starts training
        self.burnin = 1e5
        #Define the number of experiences between updates of Q_online
        self.learn_every = 3
        #Define the number of experiences that must ellapse since the last time the Q_target and Q_online were synced for them to be synced again
        self.sync_every = 1e4
        
    #Function to have the model learn
    def learn(self):
        #If the right amount of experiences that have ellapsed since the last time the Q_target and Q_online were synced is a multiple of sync_every
        if self.curr_step % self.sync_every == 0:
            #Sync Q_target & Q_online
            self.sync_Q_target()

        #If the number experiences that have ellapsed since the last time the model was saved is a multiple of save_every
        if self.curr_step % self.save_every == 0:
            #Save the model
            self.save()

        #If the number of experiences is below the minimum number needed to begin learning return zero
        if self.curr_step < self.burnin:
            #Do not learn
            return None, None

        #If the number experiences that have ellapsed since the last time the model was saved isn't a multiple of learn_every
        if self.curr_step % self.learn_every != 0:
            #Do not learn
            return None, None

        #Have the model recall experiences from its memory
        state, next_state, action, reward, done = self.recall()

        #Evaluate the TD estiamte
        td_est = self.td_estimate(state, action)

        #Evaluate the TD target
        td_tgt = self.td_target(reward, next_state, done)

        #Calculate loss by backpropagating through Q_online 
        loss = self.update_Q_online(td_est, td_tgt)

        return (td_est.mean().item(), loss)


# Logger Class 

In [13]:
#Class to record metrics as the model learns and evolves
class MetricLogger():
    def __init__(self, save_dir):
        #Define and open the paths that the metric information will be saved to
        self.save_log = save_dir / "log"
        with open(self.save_log, "w") as f:
            f.write(
                f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
                f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
                f"{'TimeDelta':>15}{'Time':>20}\n"
            )
        #Create the directories where the plots of the metrics will be saved to
        self.ep_rewards_plot = save_dir / "reward_plot.jpg"
        self.ep_lengths_plot = save_dir / "length_plot.jpg"
        self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
        self.ep_avg_qs_plot = save_dir / "q_plot.jpg"

        #Define arrays of metrics where each index represents an episode
        self.ep_rewards = []
        self.ep_lengths = []
        self.ep_avg_losses = []
        self.ep_avg_qs = []

        #Define arrays of metric averages where each index represents an episode
        self.moving_avg_ep_rewards = []
        self.moving_avg_ep_lengths = []
        self.moving_avg_ep_avg_losses = []
        self.moving_avg_ep_avg_qs = []

        #Intialize current episode metrics
        self.init_episode()

        #Intialize time metric
        self.record_time = time.time()

    #Function to log information about the current step
    def log_step(self, reward, loss, q):
        self.curr_ep_reward += reward
        self.curr_ep_length += 1
        if loss:
            self.curr_ep_loss += loss
            self.curr_ep_q += q
            self.curr_ep_loss_length += 1
            
    #Function to log information about a episode once it has been completed
    def log_episode(self):
        #Append the metrics of the current episode to their corresponding arrarys
        self.ep_rewards.append(self.curr_ep_reward)
        self.ep_lengths.append(self.curr_ep_length)
        #If the model isn't learning set the averages to be zero
        if self.curr_ep_loss_length == 0:
            ep_avg_loss = 0
            ep_avg_q = 0
        #If the model is learning calculate the averages of the metrics
        else:
            ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
            ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
        #Append the averages of the metrics to their corresponding arrarys
        self.ep_avg_losses.append(ep_avg_loss)
        self.ep_avg_qs.append(ep_avg_q)
        #Reset the episode metrics
        self.init_episode()

    #Function to intialize a new episode
    def init_episode(self):
        self.curr_ep_reward = 0.0
        self.curr_ep_length = 0
        self.curr_ep_loss = 0.0
        self.curr_ep_q = 0.0
        self.curr_ep_loss_length = 0

    #Function to record information about all the ellapsed episodes
    def record(self, episode, epsilon, step):
        #Calculate the means of metrics
        mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
        mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
        mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
        mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)

        #Append the means of the metrics to their corresponding arrarys
        self.moving_avg_ep_rewards.append(mean_ep_reward)
        self.moving_avg_ep_lengths.append(mean_ep_length)
        self.moving_avg_ep_avg_losses.append(mean_ep_loss)
        self.moving_avg_ep_avg_qs.append(mean_ep_q)

        #Record the current time
        last_record_time = self.record_time
        self.record_time = time.time()
        time_since_last_record = np.round(self.record_time - last_record_time, 3)

        #Output the metrics as well as the episode, step, and epsilon value when they were calculated
        print(
            f"Episode {episode} - "
            f"Step {step} - "
            f"Epsilon {epsilon} - "
            f"Mean Reward {mean_ep_reward} - "
            f"Mean Length {mean_ep_length} - "
            f"Mean Loss {mean_ep_loss} - "
            f"Mean Q Value {mean_ep_q} - "
            f"Time Delta {time_since_last_record} - "
            f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
        )

        with open(self.save_log, "a") as f:
            f.write(
                f"{episode:8d}{step:8d}{epsilon:10.3f}"
                f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
                f"{time_since_last_record:15.3f}"
                f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
            )
        #Update the plots of the metrics
        for metric in ["ep_rewards", "ep_lengths", "ep_avg_losses", "ep_avg_qs"]:
            plt.plot(getattr(self, f"moving_avg_{metric}"))
            plt.savefig(getattr(self, f"{metric}_plot"))
            plt.clf()

# Training & Testing The Model

In [14]:
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()

#Define the path to the directory where the model will be saved
save_dir = Path('checkpoints') / datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')
#Create the directory if it does not exist
save_dir.mkdir(parents=True)
#Intialize the Agent
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)

#To Test Trained AI Model provide the path to it, and load the model at that path
checkpoint = Path('trained_model.ckpt')
mario.load(checkpoint)

#Intialize the logger to log metrics about the model as it trains
logger = MetricLogger(save_dir)

#Define the number of episodes that you want to train the model for
episodes = 20

#Define the number of episodes that must ellapse since the last time the metrics relating to all the episodes had been logged for them to be logged again
log_every = 20

#Run the model for the defined number of episodes
for e in range(episodes):

    #Reset the enviornment
    state = env.reset()

    #Play the game
    while True:
        #Render the enviornment
        #Do not do this while training, only when you want to observe a model
        env.render()
        
        #Define an action to be taken based on the current state of the enviornment
        action = mario.act(state)

        #Have the agent perform that action
        next_state, reward, done, info = env.step(action)

        #Cache information about that action
        mario.cache(state, next_state, action, reward, done)

        #Have the model learn from the action
        q, loss = mario.learn()

        #Log the metrics relating to the current step
        logger.log_step(reward, loss, q)

        #Update the state
        state = next_state

        #Stop playing if the game is done
        if done or info['flag_get']:
            break

    #Log the metrics relating to the episode that just finished
    logger.log_episode()

    #If log_every episodes have ellapsed since the last time the metrics relating to every episode that has ellapsed has been logged, log them again
    if e % log_every == 0:
        logger.record(
            episode=e,
            epsilon=mario.exploration_rate,
            step=mario.curr_step
        )

Using CUDA: True

Loading model at trained_model.ckpt with exploration rate 0.1


  state = torch.FloatTensor(state).cuda() if self.use_cuda else torch.FloatTensor(state)
  return (self.ram[0x86] - self.ram[0x071c]) % 256


Episode 0 - Step 417 - Epsilon 0.1 - Mean Reward 2330.0 - Mean Length 417.0 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 9.462 - Time 2023-11-05T21:16:51


  return (self.ram[0x86] - self.ram[0x071c]) % 256


KeyboardInterrupt: 

<Figure size 640x480 with 0 Axes>

In [15]:
#Close the enviornment
env.close()

In [None]:
#Save the model
mario.save()