In [2]:
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
import numpy as np
import matplotlib.pyplot as plt
import random

# Reset method for JoypadSpace
JoypadSpace.reset = lambda self, **kwargs: self.env.reset(**kwargs)

# Create the environment
env = gym_super_mario_bros.make('SuperMarioBros-v0', apply_api_compatibility=True, render_mode="human")
env = JoypadSpace(env, SIMPLE_MOVEMENT)

# Confirm the action space and observation space
print("Action space:", env.action_space)
print("Observation space shape:", env.observation_space.shape)
print("Available actions:", SIMPLE_MOVEMENT)

Action space: Discrete(7)
Observation space shape: (240, 256, 3)
Available actions: [['NOOP'], ['right'], ['right', 'A'], ['right', 'B'], ['right', 'A', 'B'], ['A'], ['left']]


  logger.warn(
  logger.warn(


In [1]:
# Define the Mario FSM class
class MarioFSM:
    def __init__(self):
        self.state = 'RUN'
        self.action_space = SIMPLE_MOVEMENT
        self.actions = {
            'right': 1,
            'A': 5,
            'left': 6,
            'NOOP': 0,
            'right_A': 2,
            'right_B': 3,
            'left_A': 4,
            'right_A_B': 4
        }
        self.current_action = self.actions['right']
    
    def next_action(self, state):
        if isinstance(state, tuple):
            state = state[0]
        
        if self.state == 'RUN':
            if self._should_jump(state):
                self.state = 'JUMP'
            elif self._should_duck(state):
                self.state = 'DUCK'
            elif self._should_run_left(state):
                self.state = 'RUN_LEFT'
            else:
                self.state = 'RUN'
            self.current_action = self._get_action_based_on_state()
        
        elif self.state == 'JUMP':
            if self._should_land(state):
                self.state = 'RUN'
            self.current_action = self.actions['A']
        
        elif self.state == 'DUCK':
            if self._should_stand(state):
                self.state = 'RUN'
            self.current_action = self.actions['left']
        
        elif self.state == 'RUN_LEFT':
            if self._should_stop_running_left(state):
                self.state = 'RUN'
            self.current_action = self.actions['left']
        
        else:
            self.current_action = self.actions['NOOP']
        
        return self.current_action
    
    def _get_action_based_on_state(self):
        if self.state == 'RUN':
            return self.actions['right']
        elif self.state == 'JUMP':
            return self.actions['A']
        elif self.state == 'DUCK':
            return self.actions['left']
        elif self.state == 'RUN_LEFT':
            return self.actions['left']
        else:
            return self.actions['NOOP']
    
    def _should_jump(self, state):
        if isinstance(state, tuple):
            state = state[0]
        jump_region = state[100:120, 150:170, 0]
        jump_threshold = 50
        return np.mean(jump_region) > jump_threshold

    def _should_duck(self, state):
        if isinstance(state, tuple):
            state = state[0]
        duck_region = state[200:220, 100:120, 0]
        duck_threshold = 50
        return np.mean(duck_region) > duck_threshold

    def _should_run_left(self, state):
        if isinstance(state, tuple):
            state = state[0]
        run_left_region = state[180:200, 60:80, 0]
        run_left_threshold = 50
        return np.mean(run_left_region) > run_left_threshold

    def _should_land(self, state):
        if isinstance(state, tuple):
            state = state[0]
        land_region = state[140:160, 150:170, 0]
        land_threshold = 30
        return np.mean(land_region) < land_threshold

    def _should_stand(self, state):
        if isinstance(state, tuple):
            state = state[0]
        stand_region = state[160:180, 100:120, 0]
        stand_threshold = 30
        return np.mean(stand_region) < stand_threshold

    def _should_stop_running_left(self, state):
        if isinstance(state, tuple):
            state = state[0]
        stop_run_left_region = state[100:120, 40:60, 0]
        stop_run_left_threshold = 30
        return np.mean(stop_run_left_region) < stop_run_left_threshold

  logger.warn(
  logger.warn(
  logger.warn(


AttributeError: 'NoneType' object has no attribute 'shape'

In [3]:
# Define the state discretization function
def discretize_state(state, buckets=(10, 10)):
    """
    Discretizes the state into a bucketed grid.
    :param state: The raw state from the environment.
    :param buckets: Number of buckets for each dimension.
    :return: Discretized state.
    """
    state = np.mean(state, axis=2)  # Convert to grayscale by averaging over the color channels
    state_lower_bounds = [0, 0]
    state_upper_bounds = [255, 255]
    state_height, state_width = state.shape
    state_scale_y = (state_upper_bounds[0] - state_lower_bounds[0]) / buckets[0]
    state_scale_x = (state_upper_bounds[1] - state_lower_bounds[1]) / buckets[1]

    # Calculate bucket indices and ensure they are within bounds
    state_y = int(min(buckets[0] - 1, max(0, state_height / state_scale_y)))
    state_x = int(min(buckets[1] - 1, max(0, state_width / state_scale_x)))

    discretized_state = (state_y, state_x)

    return discretized_state

In [4]:
# Define the Q-learning agent
class QLearningAgent:
    def __init__(self, action_space, state_shape, buckets=(10, 10), epsilon=0.1, alpha=0.1, gamma=0.99):
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.buckets = buckets
        self.q_table = np.zeros(buckets + (len(action_space),))
        self.action_space = action_space

    def choose_action(self, state):
        discretized_state = discretize_state(state, self.buckets)
        if np.random.rand() < self.epsilon:
            return random.choice(range(len(self.action_space)))
        else:
            return np.argmax(self.q_table[discretized_state])

    def update_q_table(self, state, action, reward, next_state):
        discretized_state = discretize_state(state, self.buckets)
        discretized_next_state = discretize_state(next_state, self.buckets)
        best_next_action = np.argmax(self.q_table[discretized_next_state])
        td_target = reward + self.gamma * self.q_table[discretized_next_state][best_next_action]
        td_error = td_target - self.q_table[discretized_state][action]
        self.q_table[discretized_state][action] += self.alpha * td_error

In [5]:
# Define the training function
def train_agent(env, agent, num_episodes=500):
    total_rewards = []

    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]
        done = False
        total_reward = 0

        while not done:
            action = agent.choose_action(state)
            step_result = env.step(action)  # Get the result from the step
            
            # Print the output of `env.step(action)` to understand its structure
            print(f"step_result: {step_result}")
            
            if len(step_result) == 5:
                next_state, reward, done, info, _ = step_result
            elif len(step_result) == 4:
                next_state, reward, done, info = step_result
            elif len(step_result) == 3:
                next_state, reward, done = step_result
                info = {}  # Create an empty dictionary if `info` is not returned
            else:
                raise ValueError(f"Unexpected step_result length: {len(step_result)}")
            
            if isinstance(next_state, tuple):
                next_state = next_state[0]
            
            agent.update_q_table(state, action, reward, next_state)
            state = next_state
            total_reward += reward

            # Render the game for visualization
            env.render()

        total_rewards.append(total_reward)
        print(f'Episode {episode + 1}: Total Reward: {total_reward}')

    return total_rewards

# Initialize Q-learning agent with discretized state space
agent = QLearningAgent(SIMPLE_MOVEMENT, env.observation_space.shape[:2])

# Train the agent
total_rewards = train_agent(env, agent, num_episodes=500)

# Close the environment
env.close()

# Plot rewards
plt.plot(total_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Total Reward per Episode')
plt.show()

  if not isinstance(terminated, (bool, np.bool8)):
  logger.warn(


step_result: (array([[[104, 136, 252],
        [104, 136, 252],
        [104, 136, 252],
        ...,
        [104, 136, 252],
        [104, 136, 252],
        [104, 136, 252]],

       [[104, 136, 252],
        [104, 136, 252],
        [104, 136, 252],
        ...,
        [104, 136, 252],
        [104, 136, 252],
        [104, 136, 252]],

       [[104, 136, 252],
        [104, 136, 252],
        [104, 136, 252],
        ...,
        [104, 136, 252],
        [104, 136, 252],
        [104, 136, 252]],

       ...,

       [[240, 208, 176],
        [228,  92,  16],
        [228,  92,  16],
        ...,
        [228,  92,  16],
        [228,  92,  16],
        [  0,   0,   0]],

       [[240, 208, 176],
        [228,  92,  16],
        [228,  92,  16],
        ...,
        [228,  92,  16],
        [  0,   0,   0],
        [  0,   0,   0]],

       [[228,  92,  16],
        [  0,   0,   0],
        [  0,   0,   0],
        ...,
        [  0,   0,   0],
        [  0,   0,   0],
        [2

  return (self.ram[0x86] - self.ram[0x071c]) % 256


step_result: (array([[[104, 136, 252],
        [104, 136, 252],
        [104, 136, 252],
        ...,
        [104, 136, 252],
        [104, 136, 252],
        [104, 136, 252]],

       [[104, 136, 252],
        [104, 136, 252],
        [104, 136, 252],
        ...,
        [104, 136, 252],
        [104, 136, 252],
        [104, 136, 252]],

       [[104, 136, 252],
        [104, 136, 252],
        [104, 136, 252],
        ...,
        [104, 136, 252],
        [104, 136, 252],
        [104, 136, 252]],

       ...,

       [[228,  92,  16],
        [240, 208, 176],
        [240, 208, 176],
        ...,
        [240, 208, 176],
        [228,  92,  16],
        [228,  92,  16]],

       [[228,  92,  16],
        [228,  92,  16],
        [228,  92,  16],
        ...,
        [240, 208, 176],
        [228,  92,  16],
        [228,  92,  16]],

       [[  0,   0,   0],
        [  0,   0,   0],
        [  0,   0,   0],
        ...,
        [228,  92,  16],
        [  0,   0,   0],
        [ 

KeyboardInterrupt: 

In [4]:
class MarioFSM:
    def __init__(self):
        self.state = 'RUN'
        self.action_space = SIMPLE_MOVEMENT
        self.actions = {
            'right': 1,
            'A': 5,
            'left': 6,
            'NOOP': 0,
            'right_A': 2,
            'right_B': 3,
            'left_A': 4,
            'right_A_B': 7
        }
        self.current_action = self.actions['right']
    
    def next_action(self, state):
        if isinstance(state, tuple):
            state = state[0]
        
        # Transition logic for FSM
        if self.state == 'RUN':
            if self._should_jump(state):
                self.state = 'JUMP'
            elif self._should_duck(state):
                self.state = 'DUCK'
            elif self._should_run_left(state):
                self.state = 'RUN_LEFT'
            elif self._should_attack(state):
                self.state = 'ATTACK'
            else:
                self.state = 'RUN'
            self.current_action = self._get_action_based_on_state()
        
        elif self.state == 'JUMP':
            if self._should_land(state):
                self.state = 'RUN'
            self.current_action = self.actions['A']
        
        elif self.state == 'DUCK':
            if self._should_stand(state):
                self.state = 'RUN'
            self.current_action = self.actions['NOOP']
        
        elif self.state == 'RUN_LEFT':
            if self._should_stop_running_left(state):
                self.state = 'RUN'
            self.current_action = self.actions['left']
        
        elif self.state == 'ATTACK':
            if self._should_stop_attacking(state):
                self.state = 'RUN'
            self.current_action = self.actions['right_A_B']
        
        else:
            self.current_action = self.actions['NOOP']
        
        return self.current_action
    
    def _get_action_based_on_state(self):
        if self.state == 'RUN':
            return self.actions['right']
        elif self.state == 'JUMP':
            return self.actions['A']
        elif self.state == 'DUCK':
            return self.actions['NOOP']
        elif self.state == 'RUN_LEFT':
            return self.actions['left']
        elif self.state == 'ATTACK':
            return self.actions['right_A_B']
        else:
            return self.actions['NOOP']
    
    def _should_jump(self, state):
        jump_region = state[100:120, 150:170, 0]
        jump_threshold = 50
        return np.mean(jump_region) > jump_threshold

    def _should_duck(self, state):
        duck_region = state[200:220, 100:120, 0]
        duck_threshold = 50
        return np.mean(duck_region) > duck_threshold

    def _should_run_left(self, state):
        run_left_region = state[180:200, 60:80, 0]
        run_left_threshold = 50
        return np.mean(run_left_region) > run_left_threshold

    def _should_attack(self, state):
        attack_region = state[150:170, 150:170, 0]
        attack_threshold = 50
        return np.mean(attack_region) > attack_threshold

    def _should_land(self, state):
        land_region = state[140:160, 150:170, 0]
        land_threshold = 30
        return np.mean(land_region) < land_threshold

    def _should_stand(self, state):
        stand_region = state[160:180, 100:120, 0]
        stand_threshold = 30
        return np.mean(stand_region) < stand_threshold

    def _should_stop_running_left(self, state):
        stop_run_left_region = state[100:120, 40:60, 0]
        stop_run_left_threshold = 30
        return np.mean(stop_run_left_region) < stop_run_left_threshold

    def _should_stop_attacking(self, state):
        stop_attack_region = state[140:160, 100:120, 0]
        stop_attack_threshold = 30
        return np.mean(stop_attack_region) < stop_attack_threshold

def train_fsm(env, fsm, num_episodes=500):
    total_rewards = []

    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]
        done = False
        total_reward = 0

        while not done:
            action = fsm.next_action(state)
            next_state, reward, done, info = env.step(action)
            if isinstance(next_state, tuple):
                next_state = next_state[0]
            
            state = next_state
            total_reward += reward

            # Render the game for visualization
            env.render()

        total_rewards.append(total_reward)
        print(f'Episode {episode + 1}: Total Reward: {total_reward}')

    return total_rewards

# Initialize FSM
fsm = MarioFSM()

# Train the FSM
total_rewards = train_fsm(env, fsm, num_episodes=500)

# Close the environment
env.close()

# Plot rewards
plt.plot(total_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Total Reward per Episode')
plt.show()


  if not isinstance(terminated, (bool, np.bool8)):


ValueError: too many values to unpack (expected 4)

In [3]:
result = env.step(action)
print(result)


NameError: name 'action' is not defined