In [None]:
%matplotlib inline
import argparse
from collections import namedtuple
from copy import deepcopy
from itertools import count
import math
import random
from time import time

from PIL import Image
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

import gym
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as T


use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor


def prepro(I):
    I = I[35:195]
    # 2x downsample
    I = I[::2,::2,0]
    # erase background (background type 1)
    I[I == 144] = 0
    # erase background (background type 2)
    I[I == 109] = 0
    # everything else (paddles, ball) just set to 1
    I[I != 0] = 1

    return I.astype(np.float)

In [None]:
parser = argparse.ArgumentParser()
parser.add_argument('--batch_size', type=int, default=128)
parser.add_argument('--gamma', type=float, default=0.999)
parser.add_argument('--eps_start', type=float, default=0.9)
parser.add_argument('--eps_end', type=float, default=0.05)
parser.add_argument('--eps_decay', type=int, default=200)
parser.add_argument('--replay_memory_size', type=int, default=10000)
parser.add_argument('--num_episodes', type=int, default=10)


args, _ = parser.parse_known_args()

BATCH_SIZE = args.batch_size
GAMMA = args.gamma
EPS_START = args.eps_start
EPS_END = args.eps_end
EPS_DECAY = args.eps_decay

In [None]:
env = gym.make('Pong-v0').unwrapped

# action means:
# 'NOOP', 'FIRE': noop
# 'RIGHT', 'RIGHTFIRE': go up
# 'LEFT', 'LEFTFIRE': go down
# 0, 2, 3 だけを使えばよい．
print(env.unwrapped.get_action_meanings())

In [None]:
observation = env.reset()
print(observation.shape)
plt.imshow(observation)
plt.show()

In [None]:
observation = env.reset()
observation = prepro(observation)
print(observation.shape)

plt.imshow(observation, cmap='gray')
plt.show()

In [None]:
# model
class DQN(nn.Module):

    def __init__(self):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)
        self.head = nn.Linear(1568, 3)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))

        return self.head(x.view(x.size(0), -1))
    
    
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
model = DQN()

if use_cuda:
    model.cuda()

optimizer = optim.RMSprop(model.parameters())
memory = ReplayMemory(args.replay_memory_size)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        return model(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1].view(1, 1)
    else:
        return LongTensor([[random.randrange(3)]])

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)

    # Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for detailed explanation).
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    non_final_mask = ByteTensor(tuple(map(lambda s: s is not None, batch.next_state)))

    # We don't want to backprop through the expected action values and volatile
    # will save us on temporarily changing the model parameters'
    # requires_grad to False!
    non_final_next_states = Variable(torch.cat([s for s in batch.next_state if s is not None]), volatile=True)
    state_batch = Variable(torch.cat(batch.state))
    action_batch = Variable(torch.cat(batch.action))
    reward_batch = Variable(torch.cat(batch.reward))

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken
    state_action_values = model(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    next_state_values = Variable(torch.zeros(BATCH_SIZE).type(Tensor))
    next_state_values[non_final_mask] = model(non_final_next_states).max(1)[0]
    # Now, we don't want to mess up the loss with a volatile flag, so let's
    # clear it. After this, we'll just end up with a Variable that has
    # requires_grad=False
    next_state_values.volatile = False
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in model.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [None]:
for i_episode in range(args.num_episodes):
    start_time = time()
    # エピソード内の報酬の合計
    g = 0
    
    # 初期状態
    # 1ステップ前の観測
    last_screen = env.reset()
    last_screen = prepro(last_screen)[np.newaxis, np.newaxis, :, :]
    last_screen = torch.from_numpy(last_screen).type(FloatTensor)
    
    # 現ステップの観測
    observation, reward, done, _ = env.step(random.randint(0, 5))
    current_screen = prepro(observation)[np.newaxis, np.newaxis, :, :]
    current_screen = torch.from_numpy(current_screen).type(FloatTensor)
    
    # 差分
    state = current_screen - last_screen

    for t in count():
        # プレイ画面を表示する場合には，以下のコメントアウトを外す．
        # env.render()
        # Select and perform an action
        action = select_action(state)
        _action = 0 if action[0, 0] else action[0, 0] + 1
        observation, reward, done, _ = env.step(_action)
        g += reward
        reward = Tensor([reward])

        # Observe new state
        last_screen = current_screen
        current_screen = prepro(observation)[np.newaxis, np.newaxis, :, :]
        current_screen = torch.from_numpy(current_screen).type(FloatTensor)
        if not done:
            next_state = current_screen - last_screen
        else:
            next_state = None

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the target network)
        optimize_model()
        print('\repisode: {} \t time step: {} \t reward: {}'.format(i_episode, t + 1, reward.cpu().numpy()[0]), end='')
        if done:          
            break
        
    print('Episode {:6d} {:5d} time steps {:5.2f}s Return = {:5.2f}'.format(i_episode + 1, t + 1, time() - start_time, g))

print('Complete')
env.render(close=True)
env.close()
plt.ioff()
plt.show()