# Mario with Double Deep Q-Network

In [1]:
import gym
import math
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from tqdm import trange
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import random, datetime, os, copy

# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

from IPython import display as ipythondisplay
from pyvirtualdisplay import Display

gym.__version__

'0.26.0'

In [2]:
# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace
# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT,COMPLEX_MOVEMENT 
# env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v3", render_mode='rgb_array', apply_api_compatibility=True)
env = JoypadSpace(env, SIMPLE_MOVEMENT)
n_actions = env.action_space.n
print("Action :",n_actions)

  logger.warn(


Action : 7


In [3]:
obs,info = env.reset()
obs.shape, info #3 dimension

((240, 256, 3), {})

## Test Gym is worked or not

In [4]:
# env.reset()
# done = True
# for step in range(500):
#     if done:
#         next_state = env.reset()
#     next_state, reward, done,_, info = env.step(env.action_space.sample())
#     env.render()
#     #img=env.render(mode="rgb_array")
    
# env.close()
# print('Done')

## Grey Scale

In [5]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation

In [13]:
# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace
# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT,COMPLEX_MOVEMENT 
# env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v1", render_mode='human', apply_api_compatibility=True)
env = JoypadSpace(env, SIMPLE_MOVEMENT)
n_actions = env.action_space.n
print("Action :",n_actions)

obs,info = env.reset()
obs.shape, info #3 dimension

  logger.warn(


Action : 7


((240, 256, 3), {})

In [14]:
# env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')
# env = JoypadSpace(env, [["right"], ["right", "A"]])
env = FrameStack(ResizeObservation(GrayScaleObservation(SkipFrame(env, skip=4)), shape=84), num_stack=4)
# env.seed(42)
# env.action_space.seed(42)
# torch.manual_seed(42)
# torch.random.manual_seed(42)
# np.random.seed(42)

obs,info = env.reset()
obs.shape, info #3 dimension

((4, 84, 84), {})

In [15]:
import torch
device = torch.device("cuda:0" if (torch.cuda.is_available()) else "cpu")
device

device(type='cpu')

## Neural Network

In [16]:
class DDQNSolver(nn.Module):
    def __init__(self, output_dim):
        super().__init__()
        self.online = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim),
        ).to(device) 
        self.target = copy.deepcopy(self.online)
        for p in self.target.parameters():
            p.requires_grad = False

    def forward(self, input, model):
        if model == "online":
            return self.online(input)
        elif model == "target":
            return self.target(input)

In [17]:
class DDQNAgent:
    def __init__(self, action_dim, save_directory):
        self.action_dim = action_dim
        self.save_directory = save_directory
        self.net = DDQNSolver(self.action_dim) #.cuda()
        self.exploration_rate = 1.0
        self.exploration_rate_decay = 0.99
        self.exploration_rate_min = 0.01
        self.current_step = 0
        self.memory = deque(maxlen=100000)
        self.batch_size = 32
        self.gamma = 0.95
        self.sync_period = 1e4
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025, eps=1e-4)
        self.loss = torch.nn.SmoothL1Loss()
        self.episode_rewards = []
        self.moving_average_episode_rewards = []
        self.current_episode_reward = 0.0

    def log_episode(self):
        self.episode_rewards.append(self.current_episode_reward)
        self.current_episode_reward = 0.0

    def log_period(self, episode, epsilon, step):
        self.moving_average_episode_rewards.append(np.round(np.mean(self.episode_rewards[-checkpoint_period:]), 3))
        print(f"Episode {episode} - Step {step} - Epsilon {epsilon} - Mean Reward {self.moving_average_episode_rewards[-1]}")
        plt.plot(self.moving_average_episode_rewards)
        plt.savefig(os.path.join(self.save_directory, f"episode_rewards_plot_{episode}.png"))
        plt.clf()

    def remember(self, state, next_state, action, reward, done):
        self.memory.append((torch.tensor(state.__array__()), torch.tensor(next_state.__array__()),
                            torch.tensor([action]), torch.tensor([reward]), torch.tensor([done])))

    def experience_replay(self, step_reward):
        self.current_episode_reward += step_reward
        if self.current_step % self.sync_period == 0:
            self.net.target.load_state_dict(self.net.online.state_dict())
        if self.batch_size > len(self.memory):
            return
        state, next_state, action, reward, done = self.recall()
        q_estimate = self.net(state.to(device), model="online")[np.arange(0, self.batch_size), action.to(device)]
        with torch.no_grad():
            best_action = torch.argmax(self.net(next_state.to(device), model="online"), dim=1)
            next_q = self.net(next_state.to(device), model="target")[np.arange(0, self.batch_size), best_action]
            q_target = (reward.to(device) + (1 - done.to(device).float()) * self.gamma * next_q).float()
        loss = self.loss(q_estimate, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def recall(self):
        state, next_state, action, reward, done = map(torch.stack, zip(*random.sample(self.memory, self.batch_size)))
        return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()

    def act(self, state):
        if np.random.rand() < self.exploration_rate:
            action = np.random.randint(self.action_dim)
        else:
            action_values = self.net(torch.tensor(state.__array__()).unsqueeze(0).to(device), model="online") #.cuda()
            action = torch.argmax(action_values, dim=1).item()
        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
        self.current_step += 1
        return action

    def load_checkpoint(self, path):
        checkpoint = torch.load(path)
        self.net.load_state_dict(checkpoint['model'])
        self.exploration_rate = checkpoint['exploration_rate']

    def save_checkpoint(self):
        filename = os.path.join(self.save_directory, 'checkpoint_{}.pth'.format(episode))
        torch.save(dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate), f=filename)
        print('Checkpoint saved to \'{}\''.format(filename))

## Model

In [18]:
checkpoint_period = 10
save_directory = "mario_ql"
load_checkpoint = 'checkpoint_320.pth'
agent = DDQNAgent(action_dim=env.action_space.n, save_directory=save_directory)
if load_checkpoint is not None:
    agent.load_checkpoint(save_directory + "/" + load_checkpoint)

## Train

In [19]:
episode = 320
while True:
    state,info = env.reset()
    while True:
        action = agent.act(state)
        env.render()
        next_state, reward, done, _, info = env.step(action)
        agent.remember(state, next_state, action, reward, done)
        agent.experience_replay(reward)
        state = next_state
        if done:
            print('Clear Stage')
            episode += 1
            agent.log_episode()
            if episode % checkpoint_period == 0:
                agent.log_period(episode=episode, epsilon=agent.exploration_rate, step=agent.current_step)
                agent.save_checkpoint()
            break

Clear Stage
Clear Stage
Clear Stage
Clear Stage
Clear Stage
Clear Stage
Clear Stage
Clear Stage


KeyboardInterrupt: 