In [1]:
import numpy as np

In [6]:
''' class for storing and fetching replay memory'''
class ReplayMemory:
    def __init__(self,max_size,state_space_shape,num_of_actions):
        total_state_space = [max_size] + list(state_space_shape)
        self.old_states = np.empty(total_state_space)
        self.new_states = np.empty(total_state_space)
        self.rewards = np.empty([max_size])
        self.actions = np.empty([max_size],dtype=int)
        self.is_finished = np.empty([max_size],dtype=bool)
        self.max_size = max_size
        self.size = 0
    
    def save_sample(self,old_state,new_state,reward,action,done):
        replacement_index = self.size
        
        if self.size < self.max_size:
            self.size = self.size + 1
        else:
            replacement_index = np.random.randint(low=0, high=self.size, size=1)
            
        self.old_states[replacement_index] = old_state
        self.new_states[replacement_index] = new_state
        self.rewards[replacement_index] = reward
        self.actions[replacement_index] = action
        self.is_finished[replacement_index] = done

    
    def get_sample(self,sample_size):
        if sample_size >= self.size:
            return self.old_states[:self.size],self.new_states[:self.size], self.rewards[:self.size],self.actions[:self.size],self.is_finished[:self.size]
        else:
            sample_indexes = np.random.randint(low=0, high=self.size, size=sample_size)
            return self.old_states[sample_indexes],self.new_states[sample_indexes], self.rewards[sample_indexes],self.actions[sample_indexes],self.is_finished[sample_indexes]

            
        

In [3]:
from keras.models import Sequential
from keras.activations import relu,sigmoid
from keras.layers import Dense,Activation
from keras.optimizers import Adam
from keras.callbacks import TensorBoard

Using TensorFlow backend.


In [77]:
''' Deep Q Network for learning Q values'''
def get_model(input_dim,output_dim):
    model = Sequential()
    model.add(Dense(activation='relu',input_dim=input_dim,units=16))
    model.add(Dense(activation='relu',units=32))
    model.add(Dense(activation='linear',units=output_dim))
    model.compile(loss='mse',
              optimizer=Adam(lr=0.001)
              )
    return model

In [8]:
''' method for taking epsilon greedy actions'''
def epsilon_greedy_action(model,epsilon,st):
    q_values = model.predict(np.expand_dims(st,axis=0))
    optimal_action = np.argmax(q_values)
    if np.random.random_sample() > epsilon:
        return optimal_action
    
    else:
        return np.squeeze(np.random.randint(2, size=1))
        

In [78]:
''' Cart pole Environment'''
import gym
env = gym.make('CartPole-v0')

In [184]:
''' initialisations for exploration factor, discount factor,exploration decay, model and replay memory, '''
max_memory_size = 5000
epsilon = 1
gamma = 0.95
epsilon_decay = 0.99
episodes = 300
max_steps = 500
negative_reward = -100
epochs = 32
sample_size = 2

total_rewards = np.empty(episodes)
env._max_episode_steps = max_steps
model = get_model(sum(env.observation_space.shape),env.action_space.n)
memory = ReplayMemory(max_memory_size,env.observation_space.shape,env.action_space.n)

In [187]:
for i_episode in range(episodes):
    current_state = env.reset()

    for t in range(max_steps):
        action = epsilon_greedy_action(model,epsilon,current_state)
        env.render()
        next_state, reward, done, info = env.step(action)
        
        # using negative rewards for last step
        reward = reward if not done else negative_reward
        memory.save_sample(current_state,next_state,reward,action,done)
        
        # training DQN from the replay memory
        for epoch in range(epochs):
            sample     = memory.get_sample(sample_size)
            old_states = sample[0]
            new_states = sample[1]
            rewards    = sample[2]
            actions    = sample[3]
            finished   = sample[4]

            old_estimates = model.predict(old_states)
            new_estimates = model.predict(new_states)
            
            # updating discounted reward for the greedy actions
            for i,action in enumerate(actions):
                if not finished[i]:
                    old_estimates[i,action] = rewards[i] + gamma*np.max(new_estimates[i])
                else:
                    old_estimates[i,action] = rewards[i]
            
            # fitting the model on previous states and bootstrapped Q values 
            model.fit(old_states,old_estimates,epochs=1,verbose=0)
        
        if done:
            total_rewards[i_episode] = t
            print("episode: {}/{}, score: {}"
                      .format(i_episode, episodes, t))
            break
            
        current_state = next_state
    
    epsilon *= epsilon_decay
            
    env.close()

[H[2J

In [191]:
from gym import wrappers

env = wrappers.Monitor(env, "./gym-results", force=True)
current_state = env.reset()

for t in range(max_steps):
    q_values = model.predict(np.expand_dims(current_state,axis=0))
    optimal_action = np.argmax(q_values)
    env.render()
    next_state, reward, done, info = env.step(optimal_action)

    if done:
        total_rewards[i_episode] = t
        print("score: {}"
                  .format(t))
        break

    current_state = next_state

env.close()

[H[2J

In [181]:
import io
import base64
from IPython.display import HTML

video = io.open('./gym-results/openaigym.video.%s.video000000.mp4' % env.file_infix, 'r+b').read()
encoded = base64.b64encode(video)
HTML(data='''
    <video width="360" height="auto" alt="test" controls><source src="data:video/mp4;base64,{0}" type="video/mp4" /></video>'''
.format(encoded.decode('ascii')))

In [169]:
model.save_weights('Cart pole.h5')