In [1]:
%pip install -r ../../requirements.txt

Collecting BindsNET@ git+https://github.com/BindsNET/bindsnet.git@ead55217e05ba4c6ef27f45ff5dc7d61b4abaa13
  Cloning https://github.com/BindsNET/bindsnet.git (to revision ead55217e05ba4c6ef27f45ff5dc7d61b4abaa13) to /private/var/folders/j_/ygb_gxx970b1bfpk3v12gc980000gp/T/pip-install-jzhzxux7/bindsnet_1bb991471f55493b908504b71d2a24ec
  Running command git clone -q https://github.com/BindsNET/bindsnet.git /private/var/folders/j_/ygb_gxx970b1bfpk3v12gc980000gp/T/pip-install-jzhzxux7/bindsnet_1bb991471f55493b908504b71d2a24ec
  Running command git rev-parse -q --verify 'sha^ead55217e05ba4c6ef27f45ff5dc7d61b4abaa13'
  Running command git fetch -q https://github.com/BindsNET/bindsnet.git ead55217e05ba4c6ef27f45ff5dc7d61b4abaa13
  Running command git checkout -q ead55217e05ba4c6ef27f45ff5dc7d61b4abaa13
Collecting gym@ git+https://github.com/openai/gym.git@a5a6ae6bc0a5cfc0ff1ce9be723d59593c165022
  Cloning https://github.com/openai/gym.git (to revision a5a6ae6bc0a5cfc0ff1ce9be723d59593c165022)

Building wheels for collected packages: BindsNET, gym
  Building wheel for BindsNET (setup.py) ... [?25ldone
[?25h  Created wheel for BindsNET: filename=BindsNET-0.2.9-py3-none-any.whl size=99006 sha256=71ab8cc792aab74293295be81475041a556d6080a276d9ae9ae5c4fec84e51f9
  Stored in directory: /Users/camiloortiz/Library/Caches/pip/wheels/79/77/1c/7de963265cf055221c577a1a3e307cf596b4a087fa4f7b549c
  Building wheel for gym (setup.py) ... [?25ldone
[?25h  Created wheel for gym: filename=gym-0.18.0-py3-none-any.whl size=1657516 sha256=93dd8cbba97f708289b7d08e30d152013405602a2fa9ca02a49bb08b6aeaf545
  Stored in directory: /Users/camiloortiz/Library/Caches/pip/wheels/03/60/7f/33b8247a99209a2e0c1e977a24fa050c4d7859a6352f16a904
Successfully built BindsNET gym
Installing collected packages: typing-extensions, Pillow, numpy, kiwisolver, decorator, torch, toml, tifffile, threadpoolctl, scipy, PyWavelets, pyglet, py, protobuf, pluggy, pbr, networkx, matplotlib, joblib, iniconfig, imageio, filelock

In [1]:
import argparse

import cv2
import torch
import torch.nn
import numpy as np
import random
import gym
from collections import namedtuple
import matplotlib.pyplot as plt
from typing import List, Tuple
from collections import deque
from statistics import mean

In [2]:
Args = namedtuple('Args', ['gamma', 'env', 'n_episode', 'batch_size', 'hidden_dim', 'capacity', 'max_episode', 'min_eps'])
FLAGS = Args(gamma=0.99, env='BreakoutDeterministic-v4', n_episode=1000, batch_size=32, hidden_dim=12, capacity=50000, max_episode=1000, min_eps=0.01)

In [3]:
Transition = namedtuple("Transition", field_names=["state", "action", "reward", "next_state", "done"])

In [4]:
class ReplayMemory(object):

    def __init__(self, capacity: int) -> None:
        """Replay memory class
        """
        self.capacity = capacity
        self.cursor = 0
        self.memory = []

    def push(self,
             state: np.ndarray,
             action: int,
             reward: int,
             next_state: np.ndarray,
             done: bool) -> None:
        """Creates `Transition` and insert
        """
        if len(self) < self.capacity:
            self.memory.append(None)

        self.memory[self.cursor] = Transition(state,
                                              action, reward, next_state, done)
        self.cursor = (self.cursor + 1) % self.capacity

    def pop(self, batch_size: int) -> List[Transition]:
        """Returns a randomly sampled minibatch
        """
        return random.sample(self.memory, batch_size)

    def __len__(self) -> int:
        """Returns the length """
        return len(self.memory)

In [14]:
class DQN(torch.nn.Module):
    def __init__(self, input_shape: [int], output_dim: int, hidden_dim: int, batch_size: int) -> None:
        """DQN Network
        """
        super(DQN, self).__init__()
        w, h, c = input_shape
        kernel_size = 3
        padding = 1
        stride = 1
        out_channels = 3

        self.layer1 = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=1, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
            torch.nn.ReLU(),
            torch.nn.Flatten()
        )

        self.layer2 = torch.nn.Sequential(
            torch.nn.Linear(19200, hidden_dim),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.25)
        )

        self.final = torch.nn.Sequential(
            torch.nn.Linear(hidden_dim, output_dim),
            torch.nn.ReLU(),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Returns a Q_value
        """
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.final(x)

        return x

In [1]:
class Agent(object):

    def __init__(self, input_shape: [int], output_dim: int, hidden_dim: int, batch_size: int) -> None:
        """Agent class
        """
        self.dqn = DQN(input_shape, output_dim, hidden_dim, batch_size)
        self.input_dim = input_shape
        self.output_dim = output_dim

        self.loss_fn = torch.nn.MSELoss()
        self.optim = torch.optim.Adam(self.dqn.parameters())

    def _to_variable(self, x: np.ndarray) -> torch.Tensor:
        """torch.Variable syntax helper
        """
        return torch.autograd.Variable(torch.Tensor(x))

    def get_action(self, states: np.ndarray, eps: float) -> int:
        """Returns an action
        """
        if np.random.rand() < eps:
            return np.random.choice(self.output_dim)
        else:
            self.dqn.train(mode=False)
            scores = self.get_Q(np.array([states]))
            _, argmax = torch.max(scores.data, 1)
            return int(argmax.numpy())

    def get_Q(self, states: np.ndarray) -> torch.FloatTensor:
        """Returns `Q-value`
        """
        states = self._to_variable(states)
        self.dqn.train(mode=False)

        return self.dqn(states)

    def train(self, Q_pred: torch.FloatTensor, Q_true: torch.FloatTensor) -> float:
        """Computes `loss` and backpropagation
        """
        self.dqn.train(mode=True)
        self.optim.zero_grad()
        loss = self.loss_fn(Q_pred, Q_true)
        loss.backward()
        self.optim.step()
        return loss

NameError: name 'np' is not defined

In [2]:
def preprocess(states: np.ndarray):
    """Preprocesses gym state
    """
    # Crop
    states = states[34:194, 0:160, :]

    # Convert to grayscale
    states = cv2.cvtColor(states, cv2.COLOR_RGB2GRAY)

    # Subsample to 80x80
    states = cv2.resize(states, (80, 80))
    states = cv2.threshold(states, 0, 1, cv2.THRESH_BINARY)[1]

    states = states.reshape(1, states.shape[0], states.shape[1])
    return states

NameError: name 'np' is not defined

In [3]:
def train_helper(agent: Agent, minibatch: List[Transition], gamma: float) -> float:
    """ Train on minibatch data
    """
    states = np.array([x.state for x in minibatch])
    actions = np.array([x.action for x in minibatch])
    rewards = np.array([x.reward for x in minibatch])
    next_states = np.array([x.next_state for x in minibatch])
    Q_predict = agent.get_Q(states)
    Q_target = Q_predict.clone().data.numpy()
    Q_target[np.arange(len(Q_target)), actions] = rewards + gamma * np.max(agent.get_Q(next_states).data.numpy(),
                                                                           axis=1)
    Q_target = agent._to_variable(Q_target)

    return agent.train(Q_predict, Q_target)

NameError: name 'Agent' is not defined

In [4]:
def play_episode(env: gym.Env,
                 agent: Agent,
                 replay_memory: ReplayMemory,
                 eps: float,
                 batch_size: int) -> int:
    """Play an episode
    """
    s = env.reset()
    s = preprocess(s)
    done = False
    total_reward = 0

    while not done:
        a = agent.get_action(s, eps)
        s2, r, done, info = env.step(a)
        env.render()

        # Preprocessing step
        s2 = preprocess(s2)
        r = clip_reward(r)

        total_reward += r

        if done:
            r = -1
        replay_memory.push(s, a, r, s2, done)

        if len(replay_memory) > batch_size:
            minibatch = replay_memory.pop(batch_size)
            train_helper(agent, minibatch, FLAGS.gamma)

        s = s2

    return total_reward

NameError: name 'gym' is not defined

In [5]:
def get_env_dim(env: gym.Env) -> Tuple[int, int]:
    """Returns input_dim & output_dim
    """
    input_dim = env.observation_space.shape
    output_dim = env.action_space.n

    return input_dim, output_dim

NameError: name 'gym' is not defined

In [6]:
def epsilon_annealing(epsiode: int, max_episode: int, min_eps: float) -> float:
    """Returns 𝜺 for 𝜺-annealing
    1.0---|\
          | \
          |  \
    min_e +---+------->
              |
              max_episode
    """

    slope = (min_eps - 1.0) / max_episode
    return max(slope * epsiode + 1.0, min_eps)

In [7]:
def clip_reward(reward):
    """Clip reward so that it's in [-1, 1]
    """
    if reward < -1:
        reward = -1
    elif reward > 1:
        reward = 1
    return reward

In [None]:
try:
    env = gym.make(FLAGS.env)
    env = gym.wrappers.Monitor(env, directory="monitors", force=True)

    average_rewards = []
    q = deque(maxlen=100)

    input_dim, output_dim = get_env_dim(env)

    agent = Agent((80, 80, 1) , output_dim, FLAGS.hidden_dim, FLAGS.batch_size)
    replay_memory = ReplayMemory(FLAGS.capacity)

    for i in range(FLAGS.n_episode):
        eps = epsilon_annealing(i, FLAGS.max_episode, FLAGS.min_eps)
        r = play_episode(env, agent, replay_memory, eps, FLAGS.batch_size)
        print("[Episode: {:5}] Reward: {:5} 𝜺-greedy: {:5.2f}".format(i + 1, r, eps))

        q.append(r)
        if i % 100 == 0:
            average_rewards.append(mean(q))

    name = "DQN-cnn-{}-{}-{}-reward_clamping".format(FLAGS.env, FLAGS.n_episode, FLAGS.gamma)

    fig, ax = plt.subplots()
    ax.plot(average_rewards)

    ax.set(xlabel='Episode', ylabel='Reward',
           title='DQN (CNN) performance on {}'.format(FLAGS.env))
    plt.show()

finally:
    env.close()

[Episode:     1] Reward:   3.0 𝜺-greedy:  1.00
