# **Mario**
### This project aims to crete a Mario Agent that will beat the entirity of Super Mario Bros using Deep Reinforcement Learning and Double Deep Q-Networks

**Eve Collier\
AI II - Spring 2025\
Final Project**

![Mario](https://media3.giphy.com/media/v1.Y2lkPTc5MGI3NjExdmFvcTg0dDBnbnQ1b3BjbzlnemJnbGVrdm02a29pbDF4a3ExcGF2eiZlcD12MV9pbnRlcm5hbF9naWZfYnlfaWQmY3Q9Zw/DqqHabAaTHRII/giphy.gif)

First, we need to install the game and import some stuff:

In [1]:
!pip install gym_super_mario_bros==7.3.0 nes_py torch opencv-python
import gym
import gym_super_mario_bros
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import cv2
import random
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from collections import deque
from gym import Wrapper




In [2]:
!pip install gym_super_mario_bros==7.3.0 nes_py
import gym_super_mario_bros
import nes_py
from nes_py.wrappers import JoypadSpace # Wrap the game 
#from gym_super_mario_bros.actions import RIGHT_ONLY
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT # simplify what a Mario agent can do, 256 actions it can do otherwise



Cool.
Now, wrappers:

In [3]:
# Custom Wrappers
class SkipFrame(Wrapper):
    def __init__(self, env, skip=4):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        done = False
        for _ in range(self._skip):
            step_result = self.env.step(action)
            if len(step_result) == 4:  # Old API
                obs, reward, done, info = step_result
                terminated, truncated = done, False
            else:  # New API
                obs, reward, terminated, truncated, info = step_result
                done = terminated or truncated
            
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info

    def reset(self):
        return self.env.reset()

class GrayScaleObservation(Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84), dtype=np.uint8)

    def observation(self, obs):
        if isinstance(obs, tuple):  # Handle newer gym API
            obs = obs[0]
        obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
        return obs

    def step(self, action):
        step_result = self.env.step(action)
        if len(step_result) == 4:  # Old API
            obs, reward, done, info = step_result
        else:  # New API
            obs, reward, terminated, truncated, info = step_result
            done = terminated or truncated
        return self.observation(obs), reward, done, info

    def reset(self):
        obs = self.env.reset()
        return self.observation(obs)

class FrameStack(Wrapper):
    def __init__(self, env, num_stack=4):
        super().__init__(env)
        self.num_stack = num_stack
        self.frames = deque(maxlen=num_stack)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(num_stack, 84, 84), dtype=np.uint8)

    def reset(self):
        obs = self.env.reset()
        for _ in range(self.num_stack):
            self.frames.append(obs)
        return np.stack(self.frames)

    def step(self, action):
        step_result = self.env.step(action)
        if len(step_result) == 4:  # Old API
            obs, reward, done, info = step_result
        else:  # New API
            obs, reward, terminated, truncated, info = step_result
            done = terminated or truncated
        self.frames.append(obs)
        return np.stack(self.frames), reward, done, info

Define our Neural Net

In [4]:
# Neural Network 
class MarioNet(nn.Module):
    def __init__(self, input_shape, n_actions):
        super().__init__()
        c, h, w = input_shape
        self.net = nn.Sequential(
            nn.Conv2d(c, 32, kernel_size=8, stride=4), nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU(),
            nn.Flatten(),
            nn.Linear(7 * 7 * 64, 512), nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def forward(self, x):
        return self.net(x)
    #def __init__(self, input_channels, num_actions):
        #super().__init__()
        #self.online = nn.Sequential(
            #nn.Conv2d(input_channels, 32, kernel_size=8, stride=4),
            #nn.ReLU(),
            #nn.Conv2d(32, 64, kernel_size=4, stride=2),
            #nn.ReLU(),
            #nn.Conv2d(64, 64, kernel_size=3, stride=1),
            #nn.ReLU(),
            #nn.Flatten(),
            #nn.Linear(7*7*64, 512),
            #nn.ReLU(),
            #nn.Linear(512, num_actions)
        #)

    #def forward(self, x):
        #return self.online(x)

Now our Mario agent:

In [5]:
# Mario Agent with jumping heuristic
class Mario:
    def __init__(self, state_shape, num_actions):
        self.state_shape = state_shape
        self.num_actions = num_actions
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.net = MarioNet(state_shape, num_actions).to(self.device)
        self.jump_cooldown = 0
        self.last_status = 'small'
        self.step_counter = 0
        self.last_x_pos = 0
        self.stuck_counter = 0
        self.target_net = MarioNet(state_shape, num_actions).to(self.device)
        self.target_net.load_state_dict(self.net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.net.parameters(), lr=1e-4)
        self.memory = deque(maxlen=100_000)
        self.batch_size = 32
        self.gamma = 0.99

        # ε-greedy parameters
        self.epsilon = 1.0
        self.epsilon_min = 0.1
        self.epsilon_decay = 1e-5
        self.steps_done = 0
        self.sync_every = 10_000  # Update target network
        
    def detect_pit(self, state):
        last_frame = state[-1]
        pit_region = last_frame[70:80, 50:60]
        avg_brightness = np.mean(pit_region)
        return avg_brightness < 20

    def detect_enemy(self, state):
        last_frame = state[-1]
        enemy_region = last_frame[40:60, 50:65]
        mean_val = np.mean(enemy_region)
        std_val = np.std(enemy_region)
        return 40 < mean_val < 150 and std_val > 10
        
    def act(self, state, info=None):
        # Mario Agent with jumping heuristic
        self.step_counter += 1
        if info is not None:
            self.update_status(info)
            current_x_pos = info.get('x_pos', 0)
    
            if current_x_pos == self.last_x_pos:
                self.stuck_counter += 1
            else:
                self.stuck_counter = 0
            self.last_x_pos = current_x_pos
    
        # Get action from model
        state_t = torch.from_numpy(state).float().to(self.device) / 255.0
        state_t = state_t.unsqueeze(0)
        with torch.no_grad():
            action_values = self.net(state_t)
        action = torch.argmax(action_values).item()
    
        # Heuristic triggers
        jump_actions = [2, 3, 4, 5]
        should_jump = False
    
        if self.step_counter % 5 == 0:
            should_jump = True
    
        if self.jump_cooldown <= 0:
            if self.last_status == 'small':
                should_jump = True
            if self.stuck_counter > 5:
                should_jump = True
            if self.detect_pit(state):
                should_jump = True
            if self.detect_enemy(state):
                should_jump = True
    
            if should_jump:
                action = 4  # ['right', 'A', 'B'] - strong jump
                self.jump_cooldown = 10
    
        if self.jump_cooldown > 0:
            self.jump_cooldown -= 1
    
        return action

    def cache(self, state, next_state, action, reward, done):
        state = np.array(state)
        next_state = np.array(next_state)
        self.memory.append((state, next_state, action, reward, done))

    def learn(self):
        if len(self.memory) < self.batch_size:
            return

        batch = random.sample(self.memory, self.batch_size)
        states, next_states, actions, rewards, dones = zip(*batch)

        states = torch.FloatTensor(np.array(states) / 255.0).to(self.device)
        next_states = torch.FloatTensor(np.array(next_states) / 255.0).to(self.device)
        actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        dones = torch.BoolTensor(dones).unsqueeze(1).to(self.device)

        q_values = self.net(states).gather(1, actions)
        next_q_values = self.target_net(next_states).max(1, keepdim=True)[0]
        expected_q = rewards + self.gamma * next_q_values * (~dones)

        loss = nn.functional.mse_loss(q_values, expected_q)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Periodically update target net
        if self.steps_done % self.sync_every == 0:
            self.target_net.load_state_dict(self.net.state_dict())

    def save(self, path='mario.pth'):
        torch.save(self.net.state_dict(), path)

Putting it all together:

In [6]:
# Initialize Environment with rendering
env = gym_super_mario_bros.make('SuperMarioBros-v0', 
                               apply_api_compatibility=True,
                               render_mode='human')
env = JoypadSpace(env, SIMPLE_MOVEMENT)
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = FrameStack(env, num_stack=4)

# Initialize Mario
mario = Mario(state_shape=(4, 84, 84), num_actions=env.action_space.n)

# Main game loop
done = True
for step in range(100000):
    if done:
        state, info = env.reset(), {}
    
    action = mario.act(state)  # Pass info to act()
    next_state, reward, done, info = env.step(action)
    
    mario.cache(state, next_state, action, reward, done)
    mario.learn()

    
    
                                             
    state = next_state
    if step % 1000 == 0:
        mario.save()
        
    
    env.render()

env.close()

  logger.warn(
  logger.warn(
  from .autonotebook import tqdm as notebook_tqdm
  logger.warn(
  return (self.ram[0x86] - self.ram[0x071c]) % 256


KeyboardInterrupt: 