In [None]:
import gym
import time
import os
import types
import gym_super_mario_bros
#import matplotlib.pyplot as plt
import torch
import numpy as np
import torch.nn as nn
import torchvision.transforms as T
import torch.nn.functional as F
from nes_py.wrappers import JoypadSpace
from collections import deque
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from torch.utils.tensorboard import SummaryWriter

# To properly run/create your own model you need gym version 0.23.0
# and need to install Microsoft C++ Build Tools with the Windows 10 SDK & C++ x64/x86 build tools

Custom Reward Function

In [None]:
def custom_reward(info, max_x, t, flag):
    # Easily the most important/critical part of the entire file. Any broken or not fully thought out code
    # will result in a failed model. This right here took the longest to get right. The agent will ALWAYS
    # exploit any weaknesses in the code. This will cause enexpected behavior and complete loss of model
    # Default reward function does not penelize stalling or reward getting to flagpole. It also uses
    # a current x pos minus previous x pos reward which allows the agent to shuffle back and forth to gain
    # rewards and not make progress. While not clipped the max reward per frame is >10 and max penelty >-15

    x_pos, y_pos, flag_get = info['x_pos'], info['y_pos'], info['flag_get'] # grab some current values
    delta = x_pos - max_x # calculate total progress made based off pixels traveled in a frame
    ch = 0 # checkpoint reward
    v = 0 # v(elocity) reward
    y = 0 # y_position penelty/reward
    done = False # end episode when stalled to long
    reward = 0.0 

    if delta > 1: # made progress, over 1 pixels (prevents mini progress on pipes but still stuck)
        v = delta
        max_x = x_pos
        t = 0

    # elif x_pos > 2870 and max_x >= 3000: # transition levels break progress reward, this mitigates that
    #     max_x = x_pos
    #     v = x_pos - max_x
    #     t = 0

    else: # not making progress, start counter
        t += 1
        if t > 60: # if after 60 frames the agent has not made progress, begin penelties
            
            y = -2 + (2.85 * ((y_pos - 79) / 240)) # to prevent the agent from essentially giving up when getting
                                                   # stuck, we give a mini reward for jumping. Without this, because
                                                   # the penelty would be the same regardless of what action is taken
                                                   # the qvalues will be very similar for all actions
                                                   # (they would all give the same reward/penelty) this is because
                                                   # the only real way the agent gains rewards is based off x_pos
                                                   # progress. So to make sure the model knows there are better
                                                   # actions, we reward jumps when stalled with the higher jump
                                                   # the better reward

            y = min(max(y, -2), -0.1) # Still, the model is stuck and not progressing so jumping only lowers
                                      # penelties, not eliminate them. This is enough info to make the model jump

        if t > 500: # if after 500 frames (about 10 seconds?) end episode and apply death penelty
            done = True

    c = -0.1 # time penelty

    if x_pos > 1320 and not flag: # reached checkpoint give reward once
        ch = 10
        flag = True

    reward = v + ch + (y / 4) + c # all reward, since this is per frame and the rewards given to model is for 4 frames
                                  # we lower the stall penelty so the agent doesnt get overwhelmed

    if flag_get:
        reward += 25  # large reward for completing the level
        done = True
        print("WE DID IT") # Once the model consistantly reaches this, its essentially done

    #print(f"v: {v:.3f}, y: {y:.3f}, c: {c:.3f}, t: {t}, x: {x_pos:.3f}") # metrics
    return reward, max_x, t, flag, done

Setup Enviroment

In [None]:
class FrameStackAndSkip(gym.Wrapper):
    # Modifies the original enviroment function to: Implement frameskip, stack the skipped frames as 1 NN
    # input and modifies reward and death function. Orignal death function skipped death frames leading to
    # insufficient learning on the NN as it didn't know what death looks like.
    # For this we need to grab certain ram values and obtain the raw enviroment with no functions applied.

    def __init__(self, env, skip=4, target_size=(96, 96)): # As neural network input we use a 4x96x96 image array
        super().__init__(env)
        self._skip = skip
        self.frames = deque(maxlen=skip)
        raw = env
        while hasattr(raw, 'env'):
            raw = raw.env
        
        # the env.get_done function has a frame advance line in it essentially skipping the frame where the
        # agent dies. This makes the model skip what caused a death and a death penelty. We intercept this
        # function to use our own and not advance the frame.

        # keep a reference to the original _get_done
        raw._orig_get_done = raw._get_done

        # override _get_done to end episode on dying state
        def _get_done_override(self):
            # if Mario is starting to die or already dead, signal done
            if self._is_dying or self._is_dead:
                return True
            # otherwise fall back to original logic
            return self._orig_get_done()

        raw._get_done = types.MethodType(_get_done_override, raw)
        self.in_death = False
        self.transform = T.Compose([ # Preprocessing images: converts to tensor and resizes
            T.ToPILImage(),
            T.Resize(target_size),
            T.ToTensor()
        ])
    
    def reset(self, **kwargs):
        self.in_death = False
        obs = self.env.reset(**kwargs)
        processed = self.transform(obs)
        for _ in range(self._skip):
            self.frames.append(processed)
        return torch.cat(list(self.frames), dim=0)  # Stack along the channel dimension
    
    def step(self, action, max_x, t, flag, died_flag):
        # Per step aka action per frame, we need some unaccessable items like y_viewport (which is the y position
        # of mario) and player_state (what the agents state is which includes whether they died or not)
        # We also apply our custom reward function here per frame as to not lose temporal data
        # This is where death is seen and penelties applied we also apply a reward if agent has beat level

        total_reward = 0.0
        done = False
        for _ in range(self._skip):
            if done:
                break
            obs, _, done, info = self.env.step(action)
            info['player_state'] = self.env.unwrapped.ram[0x000e]  # Access RAM directly, 11 & 6 means dead/dying
            info['y_viewport'] = self.env.unwrapped.ram[0x00b5] # anything >1 means falling in pit aka dead
            if (info['player_state'] == 11 or info['player_state'] == 6 or info['y_viewport'] > 1) and not died_flag: # is dying or died
                total_reward = -10 # died
                died_flag = True
                processed = self.transform(obs)
                self.frames.append(processed)
                break

            if died_flag:
                processed = self.transform(obs)
                self.frames.append(processed)
                break

            reward, max_x, t, flag, done = custom_reward(info, max_x, t, flag)
            total_reward += reward
            processed = self.transform(obs)
            self.frames.append(processed) # combines the frameskipped frames then concatenate them on return
            if done: # sometimes if the agent wins or dies in the middle of a skipped frame it might not get caught
                if info['flag_get']: # so we call this again to really make sure
                    total_reward = 25 # won
                else:
                    total_reward = -10
                break
        return torch.cat(list(self.frames), dim=0), total_reward, done, info, max_x, t, flag, died_flag

# Initialize the game environment, initialize tensorboard for metrics and convert images given by original
# step method to grayscale as color doesn't really matter in this game, we also apply the 'controller'
# the agent will use called "SIMPLE_MOVEMENT" which allows multiple button presses. Exact actions are in training
# loop. Then we apply our custom step method and the enviroment initialization is complete.

current_dir = os.getcwd()
writer = SummaryWriter(log_dir=os.path.join(current_dir, "runs"))
env = gym_super_mario_bros.make('SuperMarioBros-2-2-v1') 
env = gym.wrappers.GrayScaleObservation(env, keep_dim=True)
env = JoypadSpace(env, SIMPLE_MOVEMENT)
env = FrameStackAndSkip(env)
print("")

Define Parameters/Hyperparameters

In [4]:
# Initialize hyper parameters

alpha = 2e-4 # learning rate 1e-4 & 2e-4 were used depending on how many episodes were needed
epsilon = 0.99 # best action vs random action, begin with only random then lower as agent learns to play
decay = 0.999985 # epsilon is decayed per step* (not per episode) this is for custom epsilon reset in training
gamma = 0.95 # prioritize future rewards
replay_buffer2 = 75000 # the size of experience replay buffer
batch_size = 128 # samples grabbed from experience replay
targ_updater = 125 # update target nn every 125 steps

Implement Experience Replay

In [None]:
class FastReplayBuffer:
    # Experience replay breaks temporal correlation bias and allows the model to learn each frame
    # independantly from the others resulting in a more robust and generalizable model. We only need
    # a few functions which is push to add a state after an action and sample aka push when grabbing
    # some RANDOM states for the model training.

    def __init__(self, capacity, state_shape, device):
        self.capacity = capacity
        self.device   = device
        self.states      = torch.zeros((capacity, *state_shape), dtype=torch.float32, device=device) # Pre‑allocate tensors on GPU
        self.next_states = torch.zeros_like(self.states, device=device)
        self.actions     = torch.zeros((capacity,), dtype=torch.int64,   device=device)
        self.rewards     = torch.zeros((capacity,), dtype=torch.float32, device=device)
        self.dones       = torch.zeros((capacity,), dtype=torch.bool,    device=device)
        self.pos  = 0
        self.full = False

    def push(self, state, action, reward, next_state, done):
        idx = self.pos
        self.states[idx]      = state
        self.next_states[idx] = next_state
        self.actions[idx]     = action
        self.rewards[idx]     = reward
        self.dones[idx]       = done
        self.pos = (idx + 1) % self.capacity
        if self.pos == 0:
            self.full = True

    def sample(self, batch_size):
        max_idx = self.capacity if self.full else self.pos
        idx = torch.randint(0, max_idx, (batch_size,), device=self.device) # sample indices directly on GPU
        return (
            self.states[idx],
            self.actions[idx],
            self.rewards[idx],
            self.next_states[idx],
            self.dones[idx],
        )

    def __len__(self):
        return self.capacity if self.full else self.pos

Create Main & Target Q-Network

In [None]:
class DuelingDQN(nn.Module):
    # Some NN were tried like MLP, CNNs and now for the final iteration, a dueling DQN with CNNs.
    # Dueling DQNs are split into two heads: value and advantage. The value head calculates how good of a
    # state it is currently in. Obviously some states are better than others like falling down a pit (bad) vs
    # close to the flagpole with velocity (great). The advantage head then calculates the reward or penelty
    # of each action. For example, if the agent is over a pit, during those frames the advantage head will 
    # calculate the actions right+run+jump and left (along with all others but we will use these for the example)
    # the model will learn that falling into pit = very bad so the 'left' action will probably have a negative
    # value during these frames while the other will have a high positive value, leading the model to choose this.
    
    def __init__(self, output_size):
        super(DuelingDQN, self).__init__()
        # Since we are using images as the input, we need to perform some convolutions so this is the cnn
        # section of the model
        self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4)
        self.bn1   = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.bn2   = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.bn3   = nn.BatchNorm2d(64)

        self.flatten_size = self._get_conv_output((4, 96, 96))

        # --- Advantage Head ---
        self.adv_fc1 = nn.Linear(self.flatten_size, 256)
        self.adv_fc2 = nn.Linear(256, output_size)

        # --- Value Head ---
        self.val_fc1 = nn.Linear(self.flatten_size, 256)
        self.val_fc2 = nn.Linear(256, 1)

    def _get_conv_output(self, shape):
        # When evaluating we use a batch size of 1, when using experience replay we use batchsize 128
        # this will determine the input shape to the model
        with torch.no_grad():
            x = torch.zeros(1, *shape)
            x = self.bn1(self.conv1(x))
            x = self.bn2(self.conv2(x))
            x = self.bn3(self.conv3(x))
            return int(x.flatten(1).shape[1])

    def forward(self, x):
        # Convolute, normalize, ReLU then flatten and calculate value and advantage finally return the function below
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))

        # flatten
        x = x.flatten(start_dim=1)

        # advantage branch
        adv = F.relu(self.adv_fc1(x))
        adv = self.adv_fc2(adv)  # shape [B, num_actions]

        # value branch
        val = F.relu(self.val_fc1(x))
        val = self.val_fc2(val)  # shape [B, 1]

        # combine into Q-values
        # Q(s,a) = V(s) + (A(s,a) - Σ_a A(s,a))
        # The action with the highest value on the second part of the equation will usually get picked
        return val + (adv - adv.mean(dim=1, keepdim=True))

# Instantiate your networks exactly as before:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
main_dqn = DuelingDQN(len(SIMPLE_MOVEMENT)).to(device)
target_dqn = DuelingDQN(len(SIMPLE_MOVEMENT)).to(device)
target_dqn.load_state_dict(main_dqn.state_dict()) # target is just a clone of main or "online" model

optimizer = torch.optim.Adam(main_dqn.parameters(), lr=alpha)
criterion = nn.MSELoss()
memory = FastReplayBuffer(
    capacity=replay_buffer2,
    state_shape=(4, 96, 96),
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
)

# Used if want to continue training/load model/ test model

# main_dqn.load_state_dict(torch.load(os.path.join(current_dir, 'main_dqn_1-1.pt')))
# target_dqn.load_state_dict(torch.load(os.path.join(current_dir, 'targ_dqn_1-1.pt')))

Develop Training Loop

In [None]:
# This is where all training occurs. Many, many iterations of this were made but this is what is currently
# the best. Each line will be commented for its importance

num_episodes = 4000 # Number of episodes to train (varies based on level complexity)
replay_active = False # exp. replay doesnt activate till it has enough samples, this ensures metrics dont break
global_steps = 0 # metric to monitor number of total steps across all episode

global_max_x = 40 # Sometimes levels are so complex, the agent only learns part of the level before epsilon
                  # reaches its minimum. This keeps the maximum x position the agent has made across ALL episodes.

max_x_dq = deque(maxlen=25) # some levels have trasitions that change the agents x position instantly and can
                            # cause large penelties based on reward logic

counter = 0 # used for training logic above

try:
    for episode in range(num_episodes):
        # initialize certain variables per episode
        max_x, t, flag, died_flag = 40, 0, False, False # for reward logic
        done, episode_reward, step_count = False, 0, 0
        o_flag = False 
        obs = env.reset() # reset environment back to beginning
        state = obs.clone().detach().unsqueeze(0) # add a dimension for batch size (1, 4, 96, 96)
        # prev_stage, prev_world = 1, 1

        while not done: # per step (frame) loop
            env.render() # shows the game
            global_steps += 1
  
            if 0.1 >= 0.99 * (decay **global_steps):
                # Even when the model has learned the entire level, some randomness is good
                # the minimum episilon is 0.1 always during training and when enough steps have passed.
                # However if the model has reached the minimum and has not seen all the level yet
                # the model will not be able to choose the best action in that state since it hasnt been
                # determined. When this happens its best to up epsilon back up until the model has sufficiently
                # learned that area.

                if (info['x_pos'] > global_max_x) and not o_flag: # if in a never before seen area...
                    epsilon = 0.5 # raise epsilon
                    decay = 0.998 # now uses much more aggressive per step decay since it will only last for 1 episode
                    o_flag = True # mark this flag true so it doesnt keep setting epsilon to 0.5
                else:
                    epsilon = 0.1
       
            if np.random.rand() < epsilon: # Choose a random action (explore)
                action = np.random.randint(0, len(SIMPLE_MOVEMENT))
            else:
                with torch.no_grad(): # Choose action based on model (exploit)
                    q_values = main_dqn(state.to(device))
                    action = torch.argmax(q_values).item()

            epsilon = max(0.1, epsilon * decay) # decay epsilon per step

            # Step in the environment, grabs rewards, next state, info, and whether the episode is done or not
            # the others are for reward tracking
            next_obs, reward, done, info, max_x, t, flag, died_flag = env.step(action, max_x, t, flag, died_flag)

            # if (info['stage'] > prev_stage) or (info['world'] > prev_world): # used for multi-level models
            #     max_x = 40

            next_state = next_obs.clone().detach().unsqueeze(0)  # Add extra dimension (1, 4, 96, 96)

            memory.push(state, action, reward, next_state, done) # Store experience in replay buffer

            state = next_state # We've reached the end of the step loop, update future state to current
            episode_reward += reward

            # prev_stage = info['stage'] # Used for multi-level models
            # prev_world = info['world']

            if len(memory) > batch_size: # Begin training with experience replay
                replay_active = True
                states, actions, rewards, next_states, dones = memory.sample(batch_size) # Grab (batchsize) 128 samples

                with torch.no_grad(): # Calculate qvalue with target network and use bellman equation
                    next_q_values = target_dqn(next_states).max(dim=1)[0]
                    targets = rewards + gamma * next_q_values * (~dones)
                
                q_values = main_dqn(states).gather(1, actions.unsqueeze(1)).squeeze(1) # Resize output for loss calcs

                loss = criterion(q_values, targets) # MSE loss
                optimizer.zero_grad()
                loss.backward()
                optimizer.step() # Optimize model, backpropagate, etc

        if episode % targ_updater == 0: # Periodically update target network
            target_dqn.load_state_dict(main_dqn.state_dict())

        if global_max_x < max_x: # Some levels have transitions that break the reward function, this mitigates/fixes
                                 # because of the random actions, its theoretically possible the agent can beat the 
                                 # level randomly, we use a counter the agent doesnt randomly reach a max_x luckily
                                 # then we choose the minimum based off 25 maximum x positions taken from 25 episodes
            counter += 1
            max_x_dq.append(max_x)
            if counter == 25:
                if global_max_x >= min(max_x_dq):
                    counter = 0
                else:
                    global_max_x = min(max_x_dq)
                    counter = 0

        if replay_active: # write to tensorboard logs
            writer.add_scalar("Reward", episode_reward, episode)
            writer.add_scalar("Loss", loss.item(), episode)
            writer.add_scalar("Epsilon", epsilon, episode)
            writer.add_scalar("GMax_X", global_max_x, episode)
            writer.add_scalar("Steps", global_steps, episode)

except KeyboardInterrupt:
    print("\nTraining interrupted.")

writer.close()
# actions are [nothing, right, right+jump, right+run, right+run+jump, jump, left]

Evaluate the Model

In [None]:
# Evaluating the model is probably the easiest part, first we lower epsilon to 0.05 as we now want
# the majority of actions to be made by the model, however the model actually expects some random
# actions since thats how it was trained. Lowering it to 0 will break the model unless it is overfit for
# that specific level, then we lower the speed of the envitonment so that it is watchable. The
# function is otherwise the same as training without the actual training part, experience replay is also not used
# or needed here. If you just want to test the model, skip training and use this

num_episodes = 200 
epsilon = 0.05 # reduce epsilon to allow model to make most decisions

try:
    for episode in range(num_episodes):
        max_x, t, flag, died_flag = 40, 0, False, False
        #prev_stage, prev_world = 1, 1
        stime = time.time()
        obs = env.reset()
        state = obs.clone().detach().unsqueeze(0)

        done, episode_reward, step_count = False, 0, 0

        while not done:
            env.render()
            step_count += 1
            time.sleep(1/18) # slow down rendering to about normal

            if np.random.rand() < epsilon:
                action = np.random.randint(0, len(SIMPLE_MOVEMENT))
            else:
                with torch.no_grad():
                    q_values = main_dqn(state.to(device))  
                    action = torch.argmax(q_values).item()

            next_obs, reward, done, info, max_x, t, flag, died_flag = env.step(action, max_x, t, flag, died_flag)

            # if (info['stage'] > prev_stage) or (info['world'] > prev_world):
            #     max_x = 40

            next_state = next_obs.clone().detach().unsqueeze(0)
            
            state = next_state
            episode_reward += reward
            # prev_stage = info['stage']
            # prev_world = info['world']
            
        etime = time.time()
        print(f"Episode: {episode} Reward: {episode_reward:.2f} Time: {(etime - stime):.2f}")

except KeyboardInterrupt:
    print("\nEval interrupted.")

# action is [nothing, right, right+jump, right+run, right+run+jump, jump, left]

In [None]:
#torch.save(main_dqn.state_dict(), "main_dqn_-.pt") # save our models to a file
#torch.save(target_dqn.state_dict(), "targ_dqn_-.pt")