# DQN on CartPole

In [None]:
!pip install gym



In [None]:
import gym
env = gym.make('CartPole-v0')
for i_episode in range(10):
    next_state = env.reset()
    for t in range(100):
        #env.render()
        action = env.action_space.sample()
        next_state, reward, done, info = env.step(action)
        #print(t, next_state, done, info, action)
        if done:
            break
env.close()

## Problem Statement

The objective is to balance a pole vertically. This is done by moving a cart upon which the pole is standing left to right. Success is deemed by the pole staying vertical for more than 500 frames while failure is deemed by the pole reaching an angle greater than 50 degrees from the vertical position or when the cart is more than 2.4 units from the centre. Every time we reach the vertical position our reward goes up by one; therefore the goal is to reach a reward of 500.

I chose to implemetn a DQN etc

CartPole is based on a Markov model. etc

In [None]:
import random
import gym
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense
from keras.optimizers import Adam, RMSprop
from keras.metrics import Accuracy

##Neural Network
This model uses a neural network that learns on the example input and output pairs. It detects some kind of pattern and builds a predicitve model for unseen inputs.

In [None]:
def nnModel(input_shape, action_space):

  x_input = Input(input_shape)

  # Dense is the basic form of a neural network layer
  # Input layer of state size 4 and Hidden Layer with 512 nodes
  x = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(x_input)

  # Hidden Layer with 256 nodes
  x = Dense(256, activation="relu", kernel_initializer='he_uniform')(x)

  # Hidden Layer with 64 nodes
  x = Dense(64, activation="relu", kernel_initializer='he_uniform')(x)

  # Output Layer with # of actions: 2 nodes (left, right)
  x = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(x)

  model = Model(inputs= x_input, outputs = x, name='CartPole DQN model')
  model.compile(loss="mse", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

  model.summary()
  return model


Our loss function is defined as

$loss=(r+\gamma maxQ'(s,a')-Q(s,a))^2$

We carry out an action a, observe the reward r and obtain a new state s. The result is used to calculate the target Q and then discount it so that the future reward is worth less than the immediate reward. Adding the current reward to the discounted future reward gives us our target value. Subtracting the current prediction from the target results in the loss. This is squared so that large loss is disencouraged and also so that negative values are treated the same as positive ones.

The target is defined by:



```
 target = reward + gamma * np.max(model.predict(next_state))
```



## Memory Function

In a DQN, the neural network collects experiences as outputs predictions. These experiences can be stored in a list and then can be sampled during training to update the Q value. This is experience replay. The memory function appends the information to the memory list.

In [None]:
def memory(self, state, action, reward, next_state, done):
  self.memory.append((self, state, action, reward, next_state, done))
  if len(self.memory) > self.train_start:
    if self.epsilon > self.epsilon_min:
      # Discount epsilon to maximize the discounted future reward
      self.epsilon *= self.epsilon_decay 

## Replay Function

This function trains the neural network using experiences from the memory. Experiences are sampled in mini batches which are selection of randomly sampled memories from the total batch size. 

In [None]:
def replay(self):
  if len(self.memory) < self.train_start:
    return

  # randomly sample minibatch from the memory
  minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))

  state = np.zeros((self.batch_size, self.state_size))
  next_state = np.zeros((self.batch_size, self.state_size))
  action, reward, done = [], [], []

  # this can be done using tensors for faster computational time
  for i in rnage(self.batch_size):
    state[i] = minibatch[i][0]
    action.append(minibatch[i][1])
    reward.append(minibatch[i][2])
    next_state[i] = minibatch[i][3]
    done.append(minibatch[i][4])

  # do batch prediction to save speed
  target = self.model.predict(state)
  target_next = self.model.predict(next_state)

  for i in range(self.batch_size):
    if done[i]:
      target[i][action[i]] = reward[i]
    else:
      # DQN chooses the max Q value among next actions
      # Q_max = max_a' Q_target(s', a')
      target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))
      
    # Train the Neural Network
    self.model.fit(state, target, batch_size=self.batch_size, verbose=0)



## Result

In [None]:
from keras.models import Model
from keras.layers import Input, Dense
from keras.optimizers import Adam, RMSprop
from collections import deque 
import numpy as np
import random

def nnModel(input_shape, action_space):

  x_input = Input(input_shape)

  # Dense is the basic form of a neural network layer
  # Input layer of state size 4 and Hidden Layer with 512 nodes
  x = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(x_input)

  # Hidden Layer with 256 nodes
  x = Dense(256, activation="relu", kernel_initializer='he_uniform')(x)

  # Hidden Layer with 64 nodes
  x = Dense(64, activation="relu", kernel_initializer='he_uniform')(x)

  # Output Layer with # of actions: 2 nodes (left, right)
  x = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(x)

  model = Model(inputs= x_input, outputs = x, name='CartPoleDQNmodel')
  model.compile(loss="mse", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

  model.summary()
  return model

class DQNAgent():
  def __init__(self):
    self.env = gym.make('CartPole-v1')
    # by default, CartPole-v1 has max episode steps = 500
    self.state_size = self.env.observation_space.shape[0]
    self.action_size = self.env.action_space.n
    self.EPISODES = 1000
    self.memory = deque(maxlen=2000)
    
    self.gamma = 0.95    # discount rate
    self.epsilon = 1.0  # exploration rate
    self.epsilon_min = 0.001
    self.epsilon_decay = 0.999
    self.batch_size = 64
    self.train_start = 1000

    # create main model
    self.model = nnModel(input_shape=(self.state_size,), action_space = self.action_size)

###############################################################################################
###############################################################################################

  def remember(self, state, action, reward, next_state, done):
    self.memory.append((self, state, action, reward, next_state, done))
    if len(self.memory) > self.train_start:
      if self.epsilon > self.epsilon_min:
        # Discount epsilon to maximize the discounted future reward
        self.epsilon *= self.epsilon_decay 

###############################################################################################
###############################################################################################

  def act(self, state):
    if np.random.random() <= self.epsilon:
      return random.randrange(self.action_size)
    else:
      return np.argmax(self.model.predict(state))

###############################################################################################
###############################################################################################

  def replay(self):
    if len(self.memory) < self.train_start:
      return

    # randomly sample minibatch from the memory
    minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))
    state = np.zeros((self.batch_size, self.state_size))
    next_state = np.zeros((self.batch_size, self.state_size))
    action, reward, done = [], [], []
    
    # this can be done using tensors for faster computational time
    for i in range(self.batch_size):
      state[i] = minibatch[i][1]
      action.append(minibatch[i][2])
      reward.append(minibatch[i][3])
      next_state[i] = minibatch[i][4]
      done.append(minibatch[i][5])

    # do batch prediction to save speed
    target = self.model.predict(state)
    target_next = self.model.predict(next_state)

    for i in range(self.batch_size):
      if done[i]:
        target[i][action[i]] = reward[i]
      else:
        # DQN chooses the max Q value among next actions
        # Q_max = max_a' Q_target(s', a')
        target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))
        
      # Train the Neural Network
      self.model.fit(state, target, batch_size=self.batch_size, verbose=0)

###############################################################################################
###############################################################################################

  def load(self, name):
    self.model = load_model(name)

###############################################################################################
###############################################################################################

  def save(self, name):
    self.model.save(name)

###############################################################################################
###############################################################################################

  def run(self):
    for e in range(self.EPISODES):
      state = self.env.reset()
      state = np.reshape(state, [1, self.state_size])
      done = False
      i = 0
      while not done:
        action = self.act(state)
        next_state, reward, done, _ = self.env.step(action)
        next_state = np.reshape(next_state, [1, self.state_size])
       # if not done or i == 1000:
       #   reward = reward
       # else:
       #   reward = -100
        self.remember(state, action, reward, next_state, done)
        state = next_state
        i += 1
        if done:
          print("episode: {}/{}, score: {}, e: {:.2}".format(e, self.EPISODES,i, self.epsilon))
          if i == 500:
            print("Saving trained model as cartpole-dqn.h5")
            self.save("cartpole-dqn.h5")
            return
        self.replay()

###############################################################################################
###############################################################################################

  def test(self):
    self.load("cartpole-dqn.h5")
    for e in range(self.EPISODES):
      state = self.env.reset()
      state = np.reshape(state, [1, self.state_size])
      done = False
      i = 0
      while not done:
        #self.env.render()
        action = np.argmax(self.model.predict(state))
        next_state, reward, done, _ = self.env.step(action)
        state = np.reshape(next_state, [1, self.state_size])
        i += 1
        if done:
          print("episode: {}/{}, score: {}".format(e, self.EPISODES, i))
          break
          


In [None]:
if __name__ == "__main__":
  agent = DQNAgent()
  agent.run()
  #agent.test()

Model: "CartPoleDQNmodel"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         [(None, 4)]               0         
_________________________________________________________________
dense_24 (Dense)             (None, 512)               2560      
_________________________________________________________________
dense_25 (Dense)             (None, 256)               131328    
_________________________________________________________________
dense_26 (Dense)             (None, 64)                16448     
_________________________________________________________________
dense_27 (Dense)             (None, 2)                 130       
Total params: 150,466
Trainable params: 150,466
Non-trainable params: 0
_________________________________________________________________
episode: 0/1000, score: 31, e: 1.0
episode: 1/1000, score: 15, e: 1.0
episode: 2/1000, score: 19, e: 1.0
episo