# Mario with Deep Deterministic Policy Gradients 

In [2]:
import gym
import math
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from tqdm import trange
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import random, datetime, os, copy

# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

from IPython import display as ipythondisplay
from pyvirtualdisplay import Display

gym.__version__

'0.26.0'

In [4]:
# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace
# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT,COMPLEX_MOVEMENT 
# env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v1", render_mode='rgb_array', apply_api_compatibility=True)
# env = JoypadSpace(env, SIMPLE_MOVEMENT)
env = JoypadSpace(env, [["right"], ["right", "A"]])
n_actions = env.action_space.n
print("Action :",n_actions)

Action : 2


In [8]:
obs,info = env.reset()
obs.shape, info #3 dimension

((240, 256, 3), {})

## Grey Scale

In [9]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation

In [10]:
env = FrameStack(ResizeObservation(GrayScaleObservation(SkipFrame(env, skip=4)), shape=84), num_stack=4)
obs,info = env.reset()
obs.shape, info #3 dimension

((4, 84, 84), {})

In [11]:
import torch
device = torch.device("cuda:0" if (torch.cuda.is_available()) else "cpu")
device

device(type='cpu')

## Neural Network

In [12]:
class MarioSolver:
    def __init__(self, learning_rate):
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, env.action_space.n),
            nn.Softmax(dim=-1)
        ).to(device) #.cuda()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate, eps=1e-4)
        self.reset()

    def forward(self, x):
        return self.model(x)

    def reset(self):
        self.episode_actions = torch.tensor([], requires_grad=True).to(device) #.cuda()
        self.episode_rewards = []

    def save_checkpoint(self, directory, episode):
        if not os.path.exists(directory):
            os.makedirs(directory)
        filename = os.path.join(directory, 'checkpoint_{}.pth'.format(episode))
        torch.save(self.model.state_dict(), f=filename)
        print('Checkpoint saved to \'{}\''.format(filename))

    def load_checkpoint(self, directory, filename):
        self.model.load_state_dict(torch.load(os.path.join(directory, filename)))
        print('Resuming training from checkpoint \'{}\'.'.format(filename))
        return int(filename[11:-4])

    def backward(self):
        future_reward = 0
        rewards = []
        for r in self.episode_rewards[::-1]:
            future_reward = r + gamma * future_reward
            rewards.append(future_reward)
        rewards = torch.tensor(rewards[::-1], dtype=torch.float32).to(device) #.cuda()
        rewards = (rewards - rewards.mean()) / (rewards.std() + np.finfo(np.float32).eps)
        loss = torch.sum(torch.mul(self.episode_actions, rewards).mul(-1))
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.reset()

## Model

In [34]:
class ActorNet(nn.Module):
    def __init__(self, bn=False):
        super(ActorNet, self).__init__()
        self.input = nn.Linear(4, 256)
        self.fc = nn.Linear(256, 512)
        self.output = nn.Linear(512, env.action_space.n)
        
        # Batch normalization
        self.bn = bn
        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(512)
        
        # Get the action interval for clipping
        # self.min_action = min_action
        # self.max_action = max_action
    
    def forward(self, x):
        if self.bn:
            x = F.relu(self.bn1(self.input(x)))
            x = F.relu(self.bn2(self.fc(x)))
        else:
            x = F.relu(self.input(x))
            x = F.relu(self.fc(x))
    
        action = self.output(x)
        # action = torch.clamp(action, self.min_action, self.max_action)
        return action

class CriticNet(nn.Module):
    def __init__(self, bn=False):
        super(CriticNet, self).__init__()
        self.input = nn.Linear(4 + 4, 256)
        self.fc = nn.Linear(256, 512)
        self.output = nn.Linear(512, 1)
        
        # Batch normalization
        self.bn = bn
        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(512)
    
    def forward(self, x, u):
        x = torch.cat([x, u], 1)
        
        if self.bn:
            x = F.relu(self.bn1(self.input(x)))
            x = F.relu(self.bn2(self.fc(x)))
        else:
            x = F.relu(self.input(x))
            x = F.relu(self.fc(x))
        
        value = self.output(x)
        return value