# Importing dependancies

In [9]:
import gym

import numpy as np
import random
from collections import namedtuple, deque

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


# Model definition

Using image as input

In [11]:
class DQN(nn.Module):

    def __init__(self, input_shape, output_size):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding='same')
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, padding='same')
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, padding='same')
        self.bn1 = nn.BatchNorm2d(16)
        self.bn2 = nn.BatchNorm2d(32)
        self.bn3 = nn.BatchNorm2d(32)
        self.flatten = nn.Flatten()
        linear_input = input_shape[1] * input_shape[2] * 32
        self.fn1 = nn.Linear(linear_input, 128)
        self.fn2 = nn.Linear(128, output_size)

    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.flatten(x)
        x = F.relu(self.fn1(x))
        x = F.relu(self.fn2(x))
        return x


# Input extraction

Taking the screen and turning it into something the model can understand

In [12]:
def get_screen(env, resize_shape):
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    resize = T.Compose([T.ToPILImage(),
                    T.Resize(resize_shape),
                    T.ToTensor()])
    return resize(screen)

# Memory
The agent should memorize what it learned before so it can learn from it

In [13]:
Experience = namedtuple('Experience', ('curr_screen', 'action', 'next_screen', 'reward', 'still_going'))

class Memory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def remember(self, *args):
        self.memory.append(Experience(*args))

    def recall(self, batch_size):
        experiences = random.sample(self.memory, batch_size)
        batch = Experience(*zip(*experiences))
        return batch

    def __len__(self):
        return len(self.memory)

# Creating the agent

In [14]:
class DQNAgent:
    def __init__(self, env):
        self.img_size = 40

        env.reset()
        sample_img = get_screen(env, self.img_size)

        self.state_shape = sample_img.numpy().shape
        self.action_size = env.action_space.n # output

        self.model = DQN(self.state_shape, self.action_size).to(device)
        
        self.loss_fn = nn.SmoothL1Loss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)

        self.exploration_rate = 1 # initial exploration rate, always leave at 1
        self.exploration_rate_decay = 0.9999 # rate at which the exploration decreases
        self.exploration_rate_min = 0.1 # minimun exploration rate
        
        self.gamma = 0.99 # falloff for Q score

        self.batch_size = 64
        self.num_epochs = 500

        self.memory = Memory(10000) # how many of the previous samples are used
    
    def act(self, screen):
        if random.random() < self.exploration_rate:
            action = random.randrange(self.action_size) # act randomly
        else:
            with torch.no_grad():
                action = self.act_ideal(screen) # act ideally
            
        self.exploration_rate *= self.exploration_rate_decay
        self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
        
        return action
    
    def act_ideal(self, screen):
        return self.model(screen).max(1)[1].item()
    
    def train_step(self):
        if len(self.memory) < self.batch_size:
            return
        batch = self.memory.recall(self.batch_size)
        
        curr_screens = torch.stack(batch.curr_screen).squeeze(1)
        actions = torch.Tensor(batch.action).to(device)
        next_screens = torch.stack(batch.next_screen).squeeze(1)
        rewards = torch.Tensor(batch.reward).to(device)
        still_goings = torch.Tensor(batch.still_going).to(device)
        
        # This is the fundamental logic behind calulating a deep Q value.
        curr_Q = self.model(curr_screens).mul(actions).sum(1)
        next_Q = self.model(next_screens).max(1)[0]
        expected_Q = rewards + still_goings * self.gamma * next_Q

        loss = self.loss_fn(expected_Q, curr_Q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def train(self, env, print_epochs=False):
        for epoch in range(self.num_epochs):
            done = False
            env.reset()
            score = 0
            
            while not done:
                curr_screen = get_screen(env, self.img_size)

                action = self.act(curr_screen.unsqueeze(0))
                _, reward, done, _ = env.step(action)
                next_screen = get_screen(env, self.img_size)
                action_encode = np.eye(2)[action].tolist()
                self.memory.remember(curr_screen, action_encode, next_screen, reward, 1 - done)
                      
                self.train_step()

                score += 1
            
            if print_epochs:
                print("Epoch: " + str(epoch + 1) + ". Score is: " + str(score))


# Training the agent

In [15]:
env = gym.make("CartPole-v0")
agent = DQNAgent(env)
agent.train(env, print_epochs=True)

Epoch: 1. Score is: 19
Epoch: 2. Score is: 15
Epoch: 3. Score is: 13
Epoch: 4. Score is: 42
Epoch: 5. Score is: 35
Epoch: 6. Score is: 13
Epoch: 7. Score is: 17
Epoch: 8. Score is: 23
Epoch: 9. Score is: 14
Epoch: 10. Score is: 40
Epoch: 11. Score is: 9
Epoch: 12. Score is: 31
Epoch: 13. Score is: 24
Epoch: 14. Score is: 14
Epoch: 15. Score is: 30
Epoch: 16. Score is: 55
Epoch: 17. Score is: 34
Epoch: 18. Score is: 30
Epoch: 19. Score is: 16
Epoch: 20. Score is: 29
Epoch: 21. Score is: 33
Epoch: 22. Score is: 14
Epoch: 23. Score is: 22
Epoch: 24. Score is: 33
Epoch: 25. Score is: 14
Epoch: 26. Score is: 15
Epoch: 27. Score is: 17
Epoch: 28. Score is: 59
Epoch: 29. Score is: 16
Epoch: 30. Score is: 22
Epoch: 31. Score is: 32
Epoch: 32. Score is: 25
Epoch: 33. Score is: 12
Epoch: 34. Score is: 20
Epoch: 35. Score is: 14
Epoch: 36. Score is: 32
Epoch: 37. Score is: 14
Epoch: 38. Score is: 36
Epoch: 39. Score is: 17
Epoch: 40. Score is: 21
Epoch: 41. Score is: 16
Epoch: 42. Score is: 66
Ep

# Testing the agent

In [None]:
test_scores = []

for i in range(100):
    done = False
    env.reset()
    count = 0
    
    while not done:
        curr_screen = get_screen(env, agent.img_size)
        action = agent.act_ideal(curr_screen.unsqueeze(0))
        _, _, done, _ = env.step(action)
        count += 1
        
    test_scores.append(count)
    
avg = sum(test_scores) / 100
print(avg)

env.close()

9.37
