# DEEPDQN Agent


In [25]:
import numpy as np
import random
import torch
from torch import nn

import matplotlib.pyplot as plt
from matplotlib import animation

## Global variables

In [26]:
import enum

## Py torch

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

EPOCHS = 100
BACTH_SIZE = 64

# Define the learning rate
LR = 5e-3

# Define the discount factor
GAMMA = 0.99

# Memory capacity
MEM_CAP = 10000
MEM_MIN = 500 # learn after having this amount of memory

class CActions(enum.Enum):
   Sell = 0
   Nothing = 1
   Buy = 2

ACTIONS = {CActions.Sell: -1, CActions.Nothing: 0, CActions.Buy: 1}
LEN_ACTIONS = len(ACTIONS)
INPUT_DIM = 28

## Duelling DQN Class

In [27]:
class NN_DuellingDQN(nn.Module):
    def __init__(self):
        super(NN_DuellingDQN, self).__init__()
        self.input_dim = INPUT_DIM
        self.output_dim = LEN_ACTIONS

        self.l1 = nn.Linear(self.input_dim, 500)
        self.l2 = nn.Linear(500, 500)
        self.l3 = nn.Linear(500, 300)
        self.l4 = nn.Linear(300, 200)
        self.l5 = nn.Linear(200, 10)

        self.ls = nn.Linear(10, 1)
        self.lp = nn.Linear(10, self.output_dim)
        # self.lo = nn.Linear(self.output_dim + 1, self.output_dim)

        self.relu = nn.ReLU()
        # self.tanh = nn.Tanh()
        # self.sig = nn.Sigmoid()
        # self.sm = nn.Softmax(dim=1)

    def forward(self, state):
        x = self.relu(self.l1(state))
        x = self.relu(self.l2(x))
        x = self.relu(self.l3(x))
        x = self.relu(self.l4(x))
        x = self.relu(self.l5(x))
        xs = self.relu(self.ls(x))
        xp = self.relu(self.lp(x))

        x = xs + xp - xp.mean()
        return x

## Short Memory Class
This class gives memory to agent and know what previous actions led to. This will affect the choice and avoid trully random decisions.

In [28]:
from collections import deque

class ShortMemory:
    def __init__(self, capacity=MEM_CAP):
        self.memory = deque(maxlen=capacity) # queue to remove older mem cells

    def store(self, output : tuple): # contains everything from env output
        self.memory.append(output)
        # state, new_state, action, reward, done

    def sample(self, n):
        output = random.sample(self.memory, n)
        return output

    def __len__(self):
        return len(self.memory)

# Agent Class

In [29]:
class Agent:
    def __init__(self):
        self.nnql = NN_DuellingDQN().to(DEVICE) # local
        self.nnqt = NN_DuellingDQN().to(DEVICE) # target
        self.nnqt.load_state_dict(self.nnql.state_dict())
        self.nnqt.eval()

        self.memory = ShortMemory()

        self.criterion = nn.MSELoss()
        self.optim = torch.optim.Adam(self.nnql.parameters(), lr=LR)

        self.epsilon = 1.0

    def inference_step(self, state): # forward pass
        self.nnql.eval() # don't update nn
        with torch.no_grad(): # don't update gradients
            actions = self.nnql(state)
        self.nnql.train() # THEN update => forward
        return actions

    def epsilon_greedy_pol(self, state) -> int:
        state = torch.from_numpy(state).float().to(DEVICE).view(1, -1)

        actions = self.inference_step(state)
        if random.random() > self.epsilon:
            act = np.argmax(actions.cpu().data.numpy())
        else:
            act = random.choice(np.arange(LEN_ACTIONS))
        self.epsilon = max(self.epsilon * 0.995, 0.05)
        return ACTIONS[int(act)]

    def calc_bellman(self, states, next_states, actions, rewards, dones) -> tuple:
        next_state_values = self.nnqt(next_states).max(1)[0].unsqueeze(1)
        y = rewards + (1-dones) * GAMMA * next_state_values
        state_values = self.nnql(states).gather(1, actions.type(torch.int64))
        return y, state_values

    def learn(self):
        if len(self.memory) <= MEM_MIN:
            return

        # todo : SET LEARNING UPDATE RATE

        batch = self.memory.sample(BACTH_SIZE)

        states = np.vstack([t[0] for t in batch])
        states = torch.from_numpy(states).float().to(DEVICE)

        next_states = np.vstack([t[1] for t in batch])
        next_states = torch.from_numpy(next_states).float().to(DEVICE)

        actions = np.vstack([t[2] for t in batch])
        actions = torch.from_numpy(actions).float().to(DEVICE)

        rewards = np.vstack([t[3] for t in batch])
        rewards = torch.from_numpy(rewards).float().to(DEVICE)

        dones = np.vstack([t[4] for t in batch]).astype(np.uint8)
        dones = torch.from_numpy(dones).float().to(DEVICE)

        # Bellman :
        y, state_values = self.calc_bellman(states, next_states, actions, rewards, dones)

        loss = self.criterion(y, state_values)
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()

        # todo : update rate
        self.update_param()

    def update_param(self):
        for target_param, local_param in zip(self.nnqt.parameters(), self.nnql.parameters()):
            target_param.data.copy_(LR * local_param.data + (1.0 - LR) * target_param.data)

    def train(self, env, epochs : int) -> list:
        scores = []
        for e in range(epochs):
            score = 0
            state = env.reset()
            state = state.reshape(-1, INPUT_DIM)
            while True:
                action = self.epsilon_greedy_pol(state)
                next_state, reward, done, _ = env.step(action)
                next_state = next_state.reshape(-1, INPUT_DIM)

                self.memory.store((state, next_state, action, reward, done))
                self.learn()

                state = next_state
                score += reward
                if done:
                    break
            scores.append(score)
        return scores

    def test(self, env) -> int:
        score = 0
        state = env.reset()
        state = state.reshape(-1, INPUT_DIM)
        for i in range(100000): # don't get stuck at inf loop for testing
            action = self.epsilon_greedy_pol(state)
            next_state, reward, done, _ = env.step(action)
            next_state = next_state.reshape(-1, INPUT_DIM)
            state = next_state
            score += reward
            if done:
                break
        return score