In [1]:
%load_ext autoreload
%autoreload 2
# Use black magic

In [2]:
# Use gym to load first Super Mario Bros. level
import gym_super_mario_bros
env = gym_super_mario_bros.make('SuperMarioBros-1-1-v3') 

# Explore env to understand what it's made of
print(f"Dimensions of each frame: {env.observation_space.shape}")
print(f"Number of possible actions: {env.action_space.n}")

# Generic gym stuff:

# env.step() returns four values:
# - observation (object): e.g. pixels data of camera, angle and velocity values,
# board state in chess
# - reward (float): reward achieved by previous action
# - done (boolean): whether it's time to reset (i.e. when episode is over)
# - info (dict): additional info that can be useful for debugging
# (but not for learning)




Dimensions of each frame: (240, 256, 3)
Number of possible actions: 256


In [3]:
# Use gym's Wrapper class to subsample and speed up learning
# To modify specific aspects at a time, use the classes gym.ObservationWrapper, 
# gym.RewardWrapper, gym.ActionWrapper
from wrappers import wrappers
from gym_super_mario_bros.actions import RIGHT_ONLY
# ^ simplify action space to the maximum
from nes_py.wrappers import JoypadSpace

def make_env(env):
    env = wrappers.MaxAndSkipEnv(env)  # repeat action over four frames
    env = wrappers.ProcessFrame84(env)  # size to 84 * 84 and greyscale
    env = wrappers.ImageToPyTorch(env)  # convert to (C, H, W) for PyTorch
    env = wrappers.BufferWrapper(env, 4)  # stack four frames in one 'input'
    env = wrappers.ScaledFloatFrame(env)  # normalise RGB values to [0, 1]
    return JoypadSpace(env, RIGHT_ONLY)

# Based on this:
# - A state consists of 4 contiguous 84*84 pixel frames
# - There are five possible actions (RIGHT_ONLY)

envp = make_env(env)
print(f"Dimensions of each frame: {envp.observation_space.shape}")
print(f"Number of possible actions: {envp.action_space.n}")



Dimensions of each frame: (4, 84, 84)
Number of possible actions: 5


In [4]:
# Build DDQN network for approximating the Q function
from agent.solver import DQNetwork

# Check out network is indeed as we expect it
net = DQNetwork(
    input_shape=envp.observation_space.shape,
    n_actions=envp.action_space.n,
)
print(net)


DQNetwork(
  (conv): Sequential(
    (0): Conv2d(4, 32, kernel_size=(8, 8), stride=(4, 4))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
    (3): ReLU()
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
  )
  (fc): Sequential(
    (0): Linear(in_features=3136, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=5, bias=True)
  )
)


In [5]:
# Check out how to return an action in torch
import random
import torch

envp.reset()
state, reward, terminal, info = envp.step(0)  # action 0

torch.tensor([[random.randrange(envp.action_space.n)]]) # -> e.g. tensor([[4]])
torch.argmax(net(torch.Tensor([state]).to("cpu"))).unsqueeze(0).unsqueeze(0).cpu()

  torch.argmax(net(torch.Tensor([state]).to("cpu"))).unsqueeze(0).unsqueeze(0).cpu()


tensor([[2]])

## Run model

In [46]:
# from tqdm import tqdm
from agent import DDQNAgent

agent = DDQNAgent(
    state_space=envp.observation_space.shape,
    action_space=envp.action_space.n,
    max_memory_size=30000,
    batch_size=32,
    gamma=0.9,
    lr=0.002,
    dropout=0.,
    exploration_max=1.0,
    exploration_min=0.2,
    exploration_decay=0.99,
)

# Code run step
is_training = True
num_episodes = 10
envp.reset()
final_rewards = []

for episode in range(num_episodes):
    state = envp.reset()
    state = torch.Tensor([state])
    reward_episode = 0
    steps_episode = 0
    done = False

    while not done:

        action = agent.act(state)
        steps_episode += 1
        state_next, reward, done, info = envp.step(int(action[0]))
        reward_episode += reward
    
        # Format to pytorch tensors
        state_next = torch.Tensor([state_next])
        reward = torch.tensor([reward]).unsqueeze(0)
        done = torch.tensor([int(done)]).unsqueeze(0)

        if is_training:
            agent.remember(state, action, reward, state_next, done)
            agent.experience_replay()
        
        state = state_next
    
    print(f"Final reward after episode {episode}: {reward_episode:.2f}")
    
    # Record reward achieved in n-th episode
    final_rewards.append(reward_episode)



Final reward after episode 0: 2325.00
Final reward after episode 1: 814.00
Final reward after episode 2: 252.00
Final reward after episode 3: 251.00
Final reward after episode 4: 618.00
Final reward after episode 5: 1432.00
Final reward after episode 6: 599.00
Final reward after episode 7: 619.00
Final reward after episode 8: 247.00
Final reward after episode 9: 608.00
