<h2> Importing Libraries

In [1]:
import gym
import random
import numpy as np
import tensorflow as tf
import time
from collections import deque
from gym.envs.registration import register
from IPython.display import clear_output


<h2> Initialising Environment

In [2]:
env_name = "CartPole-v0"
env = gym.make(env_name)
print("Observation space:", env.observation_space)
print("Action space:", env.action_space)


Observation space: Box(-3.4028234663852886e+38, 3.4028234663852886e+38, (4,), float32)
Action space: Discrete(2)


<h2> Nural Network for predicting Q(s,a)

In [3]:
class NNmodel(tf.keras.Model):
    def __init__(self,action_size):
        super(NNmodel,self).__init__(name='')
        self.hidden1=tf.keras.layers.Dense(100,activation = 'relu',kernel_initializer=tf.keras.initializers.he_normal())
        
        self.hidden3=tf.keras.layers.Dense(50,activation = 'relu',kernel_initializer=tf.keras.initializers.he_normal())
        
        self.output_layer=tf.keras.layers.Dense(action_size,activation = 'linear',kernel_initializer=tf.keras.initializers.he_normal())
        self.action_size=action_size
        
    def call(self,state):
        state = np.reshape(state,(-1,4))
        
        x = self.hidden1(state)
        
        x = self.hidden3(x)
        
        return self.output_layer(x)

In [4]:
loss_object = tf.keras.losses.MeanSquaredError()

In [5]:
def loss(model, x, y , action, action_size):
    y_ = model(x)
    action = tf.one_hot(action, depth = action_size)
    action = tf.reshape(action,(action_size,-1))
    y_= tf.reduce_sum(tf.matmul(y_,action),axis=1)
    return loss_object(y_true=y, y_pred=y_)

In [6]:
def grad(model, inputs, targets , action, action_size):
    with tf.GradientTape() as tape:
        loss_value = loss(model, inputs,targets, action, action_size)
    return loss_value, tape.gradient(loss_value, model.trainable_variables)

<h2> Replay Buffer for memorising 

In [7]:
class ReplayBuffer():
    def __init__(self, maxlen):
        self.buffer = deque(maxlen=maxlen)
        
    def add(self, experience):
        self.buffer.append(experience)
        
    def sample(self, batch_size):
        sample_size = min(len(self.buffer), batch_size)
        samples = random.choices(self.buffer, k=sample_size)
        return map(list, zip(*samples))

<h2> Random Agent

In [8]:
class Agent():
    def __init__(self,env):
        if(type(env.action_space) == gym.spaces.discrete.Discrete):
            self.is_discrete = True
        else:
            self.is_discrete = False
        if(self.is_discrete):
            self.action_size = env.action_space.n
            print("action size" ,self.action_size)
        else:
            self.action_low = env.action_space.low
            self.action_high = env.action_space.high
            self.action_shape = env.action_space.shape
            
    
    def get_action(self,state):
        if(self.is_discrete):
            action = random.choice(range(self.action_size))
        else:
            action = random.uniform(self.action_low,self.action_high,self.action_shape)
        return action    

<h2> Deep Q learning Agent for taking action

In [9]:
class DQNagent(Agent):
    def __init__(self,env,discount_rate=0.8,learning_rate=0.001):
        super().__init__(env)
        
        self.replay_buffer = ReplayBuffer(maxlen=10000)
        self.eps = 1.0
        self.discount_rate = discount_rate
        self.learning_rate = learning_rate
        self.lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=self.learning_rate,decay_steps=10000,decay_rate=0.99)
        self.build_agent()
        self.optimizer=tf.keras.optimizers.Adam(learning_rate=self.lr_schedule)
       

    def build_agent(self):
        self.model = NNmodel(self.action_size)
        
        
    def get_action(self,state):
        
        q_values = self.model(state)
        q_values = q_values.numpy()
        
        max_ind = []
        max_val = np.max(q_values[0])
        for i in range(len(q_values[0])):
            if q_values[0][i]==max_val:
                max_ind.append(i)
            
        action = random.choice(max_ind)
        
        rand_action = super().get_action(state)
        if random.random()<self.eps:
            
            return rand_action
        else:
            return action
        
    def train(self,experience):
        state , action , next_state , reward , done ,total_reward = experience
        self.update_weights(experience)
        self.replay_buffer.add((state, action, next_state, reward, done , total_reward))
        states, actions, next_states, rewards, dones , total_rewards = self.replay_buffer.sample(10)
        for i in range(len(states)):
            state=states[i]
            action=actions[i]
            next_state = next_states[i]
            reward=rewards[i]
            done=dones[i]
            total_reward = total_rewards[i]

            self.update_weights((state, action, next_state, reward, done,total_reward))
            
    def update_weights(self,experience):
        state , action , next_state , reward , done ,total_reward= experience
        q_next = self.model(next_state)
        q_next = q_next.numpy()
        
        if total_reward>=199:
            done = False
        q_next[done] = np.zeros([self.action_size])
        
        q_target = reward + self.discount_rate * np.max(q_next)
        loss_value, grads = grad(self.model, state, q_target,action,self.action_size)
        
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        loss_value, grads = grad(self.model, state, q_target,action,self.action_size)
       
        if done: 
            self.eps *=0.99

In [12]:
agent = DQNagent(env)

action size 2


<h2> Training and testing

In [13]:
scores = []
for ep in range(400):
    state = env.reset()
    done = False
    total_reward=0
    while not done:
        
        action = agent.get_action(state)
        next_state, reward, done, info = env.step(action)
        n_reward=reward
        agent.train((state,action,next_state,reward,done,total_reward))
        state = next_state
        total_reward += n_reward
        #env.render()
    
    scores.append(total_reward)
    
    if ep==0:
		    mean = scores[ep]
	  
    if ep>0:
		    mean = np.mean(scores[np.maximum(0,ep-100):ep])#calculating rolling average
    
    if ep%20==0:
        print("at episode {} reward ={} mean ={}".format(ep,total_reward,mean))
    
    if ep>98 and mean > 180:
        print("at episode {} reward ={} mean ={}".format(ep,total_reward,mean))
        break  




To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

at episode 0 reward =30.0 mean =30.0
at episode 20 reward =10.0 mean =13.3
at episode 40 reward =10.0 mean =11.95
at episode 60 reward =37.0 mean =16.683333333333334
at episode 80 reward =61.0 mean =24.8
at episode 100 reward =82.0 mean =32.53
at episode 120 reward =200.0 mean =61.69
at episode 140 reward =200.0 mean =98.05
at episode 160 reward =200.0 mean =128.19
at episode 180 reward =200.0 mean =153.93
at episode 200 reward =200.0 mean =161.71
at episode 220 reward =162.0 mean =167.64
at episode 240 reward =200.0 mean =164.87
at episode 260 reward =199.0 mean =165.75
at episode 280 reward =200.0 mean =161.78
at episode 300 reward =200.0 mean =172.35
at episode 320 reward =200.0 mean =16

<b> average reward of last 100 steps = 171.75