In [1]:
%%capture
#!/usr/bin/env python
import sys
sys.path.insert(0, './gym-snake/')
import gym
import time
from optparse import OptionParser
import gym_snake
from gym_snake.envs.constants import GridType, Action4, Action6
from PyQt5.QtCore import Qt

from gym import wrappers
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

__author__ = "Malthe Faurschou Tøttrup"
__email__ = "malthe@toettrup.dk"

is_done = False


# Load the gym environment
env = gym.make('Snake-8x8-v0')

def resetEnv():
    global is_done

    is_done = False
    env.reset()

resetEnv()

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
#Reward shaping


In [3]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [4]:
resize = T.Compose([T.ToPILImage(),
                    T.Resize(16, interpolation=Image.BOX),
                    T.ToTensor()])
def get_screen():
    # Returned screen requested by gym is 400x600x3, but is sometimes larger
    # such as 800x1200x3. Transpose it into torch order (CHW).
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))

    # Convert to float, rescale, convert to torch tensor
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW) #TODO: CHANGE
    return resize(screen).unsqueeze(0)




In [8]:
"""
class DQN_CNN(nn.Module):
    
    def __init__(self, outputs):
        super(DQN_CNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 8, kernel_size=3, padding='same')
        self.maxpool1 = nn.MaxPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(8, 16, kernel_size=5, padding='same')
        self.maxpool2 = nn.MaxPool2d(kernel_size=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.fc1 = nn.Linear(256, outputs)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.conv1(x))
        x = self.maxpool1(x)
        x = F.relu(self.conv2(x))
        x = self.maxpool2(x)
        x = self.bn1(x)
        x = self.fc1(x.view(x.size(0), -1))
        return x
"""
"""
# create a ressidual block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


# create a resnet
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=4):
        self.in_channels = 16
        super(ResNet, self).__init__()
        self.conv = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0])
        self.layer2 = self.make_layer(block, 32, layers[1], stride=2)
        self.layer3 = self.make_layer(block, 64, layers[2], stride=2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))

        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = x.to(device)
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)

        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)

        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)

        return out
"""




'\n# create a ressidual block\nclass ResidualBlock(nn.Module):\n    def __init__(self, in_channels, out_channels, stride=1, downsample=None):\n        super(ResidualBlock, self).__init__()\n        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)\n        self.bn1 = nn.BatchNorm2d(out_channels)\n        self.relu = nn.ReLU(inplace=True)\n        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)\n        self.bn2 = nn.BatchNorm2d(out_channels)\n        self.downsample = downsample\n\n    def forward(self, x):\n        residual = x\n\n        out = self.conv1(x)\n        out = self.bn1(out)\n        out = self.relu(out)\n\n        out = self.conv2(out)\n        out = self.bn2(out)\n\n        if self.downsample:\n            residual = self.downsample(x)\n\n        out += residual\n        out = self.relu(out)\n\n        return out\n\n\n# create a resnet\nclass ResNet(nn.Module):\n    de

In [6]:
#create forwar pass test for resnet
def test():
    net = ResNet(ResidualBlock, env.action_space.n).to(device)
    y = net(torch.randn(1, 3, 16, 16).to(device))
    print(y.size())

    del net

test()


TypeError: 'int' object is not subscriptable

In [None]:
# Prioritized Experience Replay (PER)
class PER:
    def __init__(self, capacity):
        self.tree = SumTree(capacity)

    def store(self, transition):
        max_p = np.max(self.tree.tree[-self.tree.capacity:])
        if max_p == 0:
            max_p = 1e-5
        self.tree.add(max_p, transition)

    def sample(self, n):
        b_idx, b_memory, ISWeights = np.empty((n,), dtype=np.int32), np.empty((n, self.tree.data_pointer), dtype=object), np.empty((n, 1))
        pri_seg = self.tree.total_p / n  # priority segment
        self.min_prob = np.min(self.tree.tree[-self.tree.capacity:]) / self.tree.total_p  # for later calculate ISweight
        for i in range(n):
            a, b = pri_seg * i, pri_seg * (i + 1)
            v = np.random.uniform(a, b)
            idx, p, data = self.tree.get_leaf(v)
            prob = p / self.tree.total_p
            ISWeights[i, 0] = np.power(prob/self.min_prob, -0.5)
            b_idx[i], b_memory[i, :] = idx, data
        return b_idx, b_memory, ISWeights

    def batch_update(self, tree_idx, abs_errors):
        abs_errors += 1e-5  # convert to abs and avoid 0
        clipped_errors = np.minimum(abs_errors, 1)
        ps = np.power(clipped_errors, self.PER_e)
        for ti, p in zip(tree_idx, ps):
            self.tree.update(ti, p)

    def __len__(self):
        return self.tree.data_pointer


In [None]:
""" Hyperparameters """
BATCH_SIZE = 256
GAMMA = 0.95
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 5000
TARGET_UPDATE = 1000
learning_rate = 1e-4  # learning rate (default: 1e-4)
decay_rate = 0.99 # decay rate for RMSprop (default: 0.99)
num_episodes = 200
MEMORY_SIZE = 10000

In [None]:
""" Network initialization """

# Get screen size so that we can initialize layers correctly based on shape
# returned from AI gym. Typical dimensions at this point are close to 3x40x90
# which is the result of a clamped and down-scaled render buffer in get_screen()
# screen_height, screen_width, _ = screen = env.render(mode='rgb_array').shape 

# Get number of actions from gym action space
n_actions = env.action_space.n

""" Initialize policy net and target Resnet """
policy_net = ResNet(ResidualBlock, env.action_space.n).to(device)
target_net = ResNet(ResidualBlock, env.action_space.n).to(device)

# policy_net = DQN_CNN(n_actions).to(device)
# target_net = DQN_CNN(n_actions).to(device)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

""" Optimizer """
#ref: https://ai.stackexchange.com/questions/9298/neural-network-optimizers-in-reinforcement-learning-non-well-behaved-environment
#optimizer = optim.RMSprop(policy_net.parameters(), lr=learning_rate, weight_decay=decay_rate) 
optimizer = optim.RMSprop(policy_net.parameters())
# optimizer = optim.Adam(policy_net.parameters())
#optimizer = optim.SGD(policy_net.parameters())


""" Loss function """
#criterion = nn.CrossEntropyLoss()
#criterion = nn.NLLLoss()
#criterion = nn.SmoothL1Loss()
criterion = nn.MSELoss()

""" Initialize experince memory """
memory = ReplayMemory(MEMORY_SIZE)

steps_done = 0

def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)


episode_durations = []

def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    #if is_ipython:
    #    display.clear_output(wait=True)
    #    display.display(plt.gcf())

In [None]:
""" Model Optimization """

losses = []
mean_loss = []
def optimize_model():
        
    if len(memory) < BATCH_SIZE:
        return

    transitions = memory.sample(BATCH_SIZE)
    
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
                                          
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch
    
    # Compute Huber loss
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    losses.append(loss.to('cpu').detach().numpy())
    mean_loss.append(np.mean(losses))
    
    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()



In [None]:
""" Training """

num_model = "ResNet"
save_path = 'models\snakeai'+str(num_model)+'.pkl'

episode_reward = 0
rewards = []
moveing_average_rewards = []

for i_episode in range(num_episodes):

    episode_reward = 0

    if i_episode % 100 == 0 and i_episode != 0:
        print("playing episode: ", i_episode,"/",num_episodes, 
        " mean loss => ", round(mean_loss[-1],4),
        "mean reward => ", round(moveing_average_rewards[-1], 4))

    # Initialize the environment and state
    env.reset()
    last_screen = get_screen()
    current_screen = get_screen()
    state = current_screen + last_screen


    for t in count():
        # Select and perform an action
        action = select_action(state)
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        episode_reward += reward

        # Observe new state
        last_screen = current_screen
        current_screen = get_screen()
        if not done:
            next_state = current_screen + last_screen
        else:
            next_state = None

        # Store the transition in memory
        #if(len(memory) < 10000): #fixing the experience in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        
        optimize_model()
        
        if done:
            rewards.append(episode_reward.item())
            # Take the mean of the last 100 rewards
            moveing_average_rewards.append(np.mean(rewards[-100:]))
            episode_durations.append(t + 1)
            break

        # Update the target network, copying all weights and biases in DQN
        if t % TARGET_UPDATE == 0:
            #Save the model to save_path
            target_net.load_state_dict(policy_net.state_dict())
            # torch.save(policy_net.state_dict(), save_path)
            



print('Complete')
env.close()
plt.plot(rewards, label='rewards', marker='o', linestyle='None')
plt.plot(moveing_average_rewards)
plt.ioff()
plt.show()
plot_durations()

In [None]:
plt.plot(losses)
plt.plot(mean_loss)

In [None]:
""" Record 10 videos """

training = False

if(not training):
    def select_action(state):
        return target_net(state).max(1)[1].view(1, 1)

    num_episodes = 10

    env = gym.wrappers.RecordVideo(env, 'video/CNN', episode_trigger = lambda x: x % 1 == 0)

    episode_reward = 0
    rewards = []

    for i_episode in range(num_episodes):

        # Initialize the environment and state
        env.reset()
        last_screen = get_screen()
        current_screen = get_screen()
        state = current_screen + last_screen

        for t in count():
            # Select and perform an action
            action = select_action(state)
            _, reward, done, _ = env.step(action.item())
            reward = torch.tensor([reward], device=device)
            episode_reward += reward

            # Observe new state
            last_screen = current_screen
            current_screen = get_screen()
            if not done:
                next_state = current_screen + last_screen
            else:
                next_state = None

            # Move to the next state
            state = next_state

            if done:
                rewards.append(episode_reward.item())
                #plot_durations()
                break