<a href="https://colab.research.google.com/github/ddavis-2015/Gym-CartPole/blob/master/gym_cartpole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Q Network model for reinforcement learning


In [1]:
# -*- coding: utf-8 -*-
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

EPISODES = 1000
MAXSTEPS = 450
DENSE_NODES = 6

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=1000)
        self.gamma = 0.90    # discount rate
        self.epsilon = 1.0  # exploration rate (greedy epsilon)
        self.epsilon_min = 0.05
        self.epsilon_decay = 0.980
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(Dense(DENSE_NODES, input_dim=self.state_size, activation='relu'))
        model.add(Dense(DENSE_NODES, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse',
                      optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state, use_epsilon=True):
        if use_epsilon and (np.random.rand() <= self.epsilon):
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # returns action index

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=10, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


Using TensorFlow backend.


# Training Loop

The task is attempted a certain number of **EPISODES**

Each attempt has up to **MAXSTEPS** actions taken

The model is saved on each succesful attempt (reaching **MAXSTEPS** actions)

A difference from other DQN model examples is that here training only occurs on a failed attempt, using the replay buffer.  This generally uses less training time, whereas training after each action increases training time.

Failures are not penalized.  Instead the goal is simply to maximize the reward by keeping the pole upright for as long as possible.



In [2]:
!mkdir -p save

if __name__ == "__main__":
    env = gym.make('CartPole-v1')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    done = False
    batch_size = 64

    for e in range(EPISODES):
        state = env.reset()
        state = np.reshape(state, [1, state_size])
        for time in range(MAXSTEPS + 1):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            if done or time == MAXSTEPS:
                print("episode: {}/{}, score: {}, e: {:.3}"
                      .format(e, EPISODES, time, agent.epsilon))
                if done and (len(agent.memory) > batch_size):
                    agent.replay(batch_size)
                break

        if time == MAXSTEPS and not done:
            print('***saving***')
            agent.save("./save/cartpole-dqn.h5")





episode: 0/1000, score: 23, e: 1.0
episode: 1/1000, score: 17, e: 1.0
episode: 2/1000, score: 16, e: 1.0
episode: 3/1000, score: 12, e: 1.0








episode: 4/1000, score: 8, e: 0.98
episode: 5/1000, score: 15, e: 0.96
episode: 6/1000, score: 20, e: 0.941
episode: 7/1000, score: 23, e: 0.922
episode: 8/1000, score: 15, e: 0.904
episode: 9/1000, score: 16, e: 0.886
episode: 10/1000, score: 55, e: 0.868
episode: 11/1000, score: 14, e: 0.851
episode: 12/1000, score: 43, e: 0.834
episode: 13/1000, score: 26, e: 0.817
episode: 14/1000, score: 22, e: 0.801
episode: 15/1000, score: 25, e: 0.785
episode: 16/1000, score: 58, e: 0.769
episode: 17/1000, score: 48, e: 0.754
episode: 18/1000, score: 12, e: 0.739
episode: 19/1000, score: 33, e: 0.724
episode: 20/1000, score: 17, e: 0.709
episode: 21/1000, score: 13, e: 0.695
episode: 22/1000, score: 25, e: 0.681
episode: 23/1000, score: 13, e: 0.668
episode: 24/1000, score: 10, e: 0.654
episode: 25/1000, score: 53, e: 0.641
episode: 26/1000, sco

# Function to test the saved model

In [0]:
def test():
    env = gym.make('CartPole-v1')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    agent.load("./save/cartpole-dqn.h5")
    done = False
    success = 0.0

    for e in range(EPISODES):
        state = env.reset()
        state = np.reshape(state, [1, state_size])
        for time in range(MAXSTEPS + 1):
            #env.render()
            action = agent.act(state, use_epsilon=False)
            next_state, _, done, _ = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
            state = next_state
            if done or time == MAXSTEPS:
                if time == MAXSTEPS:
                    success += 1
                if (e + 1) % 20 == 0:
                    print("episode: {}/{}, score: {}, success {:.2f}%".format(e + 1, EPISODES, time, (success / (e+1)) * 100))
                break

# Report GPU availability and run the test of the model

In [4]:
import tensorflow as tf
print(tf.test.gpu_device_name())
from keras import backend as K
print(K.tensorflow_backend._get_available_gpus())
test()

/device:GPU:0
['/job:localhost/replica:0/task:0/device:GPU:0']
episode: 20/1000, score: 450, success 100.00%
episode: 40/1000, score: 450, success 100.00%
episode: 60/1000, score: 450, success 100.00%
episode: 80/1000, score: 450, success 100.00%
episode: 100/1000, score: 450, success 100.00%
episode: 120/1000, score: 450, success 100.00%
episode: 140/1000, score: 450, success 100.00%
episode: 160/1000, score: 450, success 100.00%
episode: 180/1000, score: 450, success 100.00%
episode: 200/1000, score: 450, success 100.00%
episode: 220/1000, score: 450, success 100.00%
episode: 240/1000, score: 450, success 100.00%
episode: 260/1000, score: 450, success 100.00%
episode: 280/1000, score: 450, success 100.00%
episode: 300/1000, score: 450, success 100.00%
episode: 320/1000, score: 450, success 100.00%
episode: 340/1000, score: 450, success 100.00%
episode: 360/1000, score: 450, success 100.00%
episode: 380/1000, score: 450, success 100.00%
episode: 400/1000, score: 450, success 100.00%
e