In [14]:
#Dueling DQN
#improves upon DQN and double DQN

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import gym


In [15]:
env_name = 'CartPole-v1' 
env = gym.make(env_name)

print(f'env : {env}')
state_shape, action_shape = env.observation_space.shape, env.action_space.shape
print('State shape: {}'.format(state_shape))
print('Action shape: {}'.format(action_shape))
print(f'action space {env.action_space} observation space : {env.observation_space}')
state_dim = env.observation_space.shape[0]
n_actions = env.action_space.n
print(state_dim)
print(n_actions)

env : <TimeLimit<CartPoleEnv<CartPole-v1>>>
State shape: (4,)
Action shape: ()
action space Discrete(2) observation space : Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
4
2


In [16]:
class DuelingQnet(tf.keras.Model):
    def __init__(self, state_dim, n_actions):
        super(DuelingQnet,self).__init__()
        self.fc1 = tf.keras.layers.Dense(128,activation = 'relu')
        self.fc2 = tf.keras.layers.Dense(256, activation = 'relu')
        
        self.fc_val1 = tf.keras.layers.Dense(64, activation = 'relu')
        self.fc_val2 = tf.keras.layers.Dense(1, activation='linear')

        self.fc_adv1 = tf.keras.layers.Dense(64,activation = 'relu')
        self.fc_adv2 = tf.keras.layers.Dense(n_actions, activation = 'linear')

    def call(self, state):
        x = self.fc1(state)
        x = self.fc2(x)

        #value network
        v = self.fc_val1(x)
        v = self.fc_val2(v)

        #advantage network
        adv = self.fc_adv1(x)
        adv = self.fc_adv2(adv)

        avg_adv = tf.reduce_mean(adv, axis=-1)
        avg_adv = tf.reshape(avg_adv,(-1,1))
        #print(f'adv {adv} shape : {adv.shape} avg = {avg_adv} shape {avg_adv.shape} v : {v} shape : {v.shape}')
        #subtrac average advantage from value, 
        qval = tf.math.subtract(v,avg_adv)
        #add the advantage values to the qval calculated, this gives us qvals of each action
        qvals = tf.math.add(qval, adv)
        return qvals 

In [17]:
class ReplayBuffer:
    def __init__(self, size=10000):
        self.size = int(size) #max number of items in buffer
        self.buffer = [] #array to hold buffer
        self.counter = 0
        self.next_id = 0
    
    def __len__(self):
        return len(self.buffer)
    
    def add(self, state, action, reward, next_state, done):
        item = (state, action, reward, next_state, done)
        #print(f'size = {self.size}')
        #print(f'buffer length : {len(self.buffer)}')
        
        #if len(self.buffer) < self.size:
        if self.counter < self.size:
            self.buffer.append(item)
        else:
            self.buffer[self.next_id] = item
        self.next_id = (self.next_id + 1) % self.size
        if self.counter < self.size:
            self.counter += 1
        
    def sample(self, batch_size=32):
        idxs = np.random.choice(self.counter, batch_size)
        samples = [self.buffer[i] for i in idxs]
        states, actions, rewards, next_states, done_flags = list(zip(*samples))
        #print(f' states {states} actions : {actions} rewards : {rewards}:  next_states {next_states} dones flags : {done_flags}')

        return np.array(states,np.float32), np.array(actions,np.float32), np.array(rewards,np.float32), np.array(next_states,np.float32), np.array(done_flags)
    
    def to_tensors(self, state_dim, act_dim=0):
        states, actions, rewards, next_states, done_flags = self.sample()
        #print(type(states))
        states = np.array(states,np.float32)
        states = np.reshape(states, (-1, state_dim))
    
        actions = np.reshape(actions, (-1))
        rewards = np.reshape(rewards,(-1,1))
        rewards = rewards.squeeze()

        next_states = np.array(next_states,np.float32)
        next_states = np.reshape(next_states, (-1, state_dim))
    
        done_flags = np.reshape(done_flags, (-1,1))
        done_flags = np.squeeze(done_flags)

        #print(f' states {states} actions : {actions} rewards : {rewards}: next_states {next_states} dones flags : {done_flags}')

        state_ts = tf.convert_to_tensor(states, dtype= tf.float32)
        action_ts = tf.convert_to_tensor(actions, dtype=tf.int32)
        reward_ts = tf.convert_to_tensor(rewards, dtype=tf.float32)
        next_state_ts = tf.convert_to_tensor(next_states,dtype=tf.float32)
    
        #print(f'Tensor states {state_ts} actions : {action_ts} rewards : {reward_ts}:  next_states {next_state_ts} dones flags : {done_flags}')

        return state_ts, action_ts, reward_ts, next_state_ts, done_flags
    
    def initialize(self,env, initial_steps=500):
        state = env.reset()
        for i in range(initial_steps):
            action = env.action_space.sample()
            next_state, reward, done, _ = env.step(action)
        #   print(f' s: {state} action {action} reward {reward} next state : {next_state} done : {done}')
            self.add(state, action, reward, next_state, done)
            if done:
                state = env.reset()
            state = next_state

In [18]:
class Utils:
    def __init__(self):
        pass
    def epsilon_greedy_policy(self,state, env, agent, eps = 0.5):
        if np.random.rand() < eps:
        #print('rnd')
            return env.action_space.sample()        
        else:
            q_val = agent(state[np.newaxis])
        #print(q_val[0])
            return np.argmax(q_val[0])

    #this can be adjusted as per need
    def epsilon_schedule(self, episode,limit = 500):
        return max(1-episode/400,0.01)
        
    def test_agent(self,env, network, num_test_episodes, max_ep_len, disp=False):
        ep_rets, ep_lens = [], []
        for j in range(num_test_episodes):
            state, done, ep_ret, ep_len = env.reset(), False, 0, 0
            while not(done or (ep_len == max_ep_len)):
                if disp:
                    env.render()
                #print(state)
                qvals = network(state[np.newaxis])
                action = np.argmax(qvals.numpy()[0])
                #print(action)
                #act1 = np.array(act1, np.float32)    
                ##act1 = act1.squeeze(1)    
                state_, reward, done, _ = env.step(action)
                state = state_           
                ep_ret += reward
                ep_len += 1
            ep_rets.append(ep_ret)
            ep_lens.append(ep_len)
        return np.mean(ep_rets), np.mean(ep_lens)

In [21]:
class Agent:
    def __init__(self, env, buffer_size = 10000):
        self.env = env
        self.replay_buffer = ReplayBuffer(buffer_size)
        self.state_dim = env.observation_space.shape[0]
        self.n_actions = env.action_space.n
        
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
        self.network = DuelingQnet(self.state_dim,self.n_actions) 
        self.target_net = DuelingQnet(self.state_dim,self.n_actions)

        self.num_episodes = 500
        self.num_steps = 300
        self.target_update_ctr = 10
        self.rewards = []
        self.loss_per_episode = []        
        self.eps_loss = 0.00

        self.util = Utils()

        self.target_net.set_weights(self.network.get_weights())

        s = env.reset()
        q1 = self.network(s[np.newaxis])
        q2 = self.target_net(s[np.newaxis])

    def compute_loss_ddqn(self, states, actions, rewards, next_states, done_flags, gamma = 0.99):
        
        pred_qs = self.network(states)
        indices = tf.range(len(actions))
        pred_indices = tf.transpose([indices, actions])
        qval_preds = tf.gather_nd(pred_qs,pred_indices)
        
        target_qs = self.network(next_states)
        target_acts = tf.argmax(target_qs, axis=-1, output_type = tf.dtypes.int32)

        target_qvals = self.target_net(next_states)
        #t_indices = tf.range(len(actions))
        tar_indices = tf.transpose([indices,target_acts])
        

        qval_target = tf.gather_nd(target_qvals,tar_indices)

        calc_tar = rewards + gamma*(1-done_flags)*qval_target

        loss = tf.keras.losses.MSE(qval_preds,calc_tar)

        return loss

    def dqn(self, stop_at_reward = 240):
        rets = 0
        for episode in range(self.num_episodes):
            state = self.env.reset()
            self.replay_buffer.initialize(self.env)            
            eps = self.util.epsilon_schedule(episode)
            self.eps_loss = 0.0

            for step in range(self.num_steps):
                action = self.util.epsilon_greedy_policy(state,self.env,self.network,eps)

                next_state, reward, done, _ = self.env.step(action)
                self.replay_buffer.add(state,action,reward,next_state,done)

                if done:
                    break

                s,a,r,s_,d = self.replay_buffer.to_tensors(state_dim)
                state = next_state

                with tf.GradientTape() as tape:
                    loss = self.compute_loss_ddqn(s,a,r,s_,d)
                gradients = tape.gradient(loss, self.network.trainable_variables)
                self.optimizer.apply_gradients(zip(gradients,self.network.trainable_variables))

                if step % self.target_update_ctr == 0:
                    self.target_net.set_weights(self.network.get_weights())

                self.eps_loss += loss
                
            self.rewards.append(step)
            self.loss_per_episode.append(self.eps_loss)
            #print(f'episode : {episode} reward : {step}')
            if episode % 20 == 0:
                rets, len = self.util.test_agent(env,self.network,5,250)
                print(f'average return after {episode} : {rets} length : {len}')
                if rets > stop_at_reward:
                    return

In [20]:
agent = Agent(env)
agent.dqn()



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

average return after 0 : 9.6 length : 9.6
average return after 20 : 9.4 length : 9.4
average return after 40 : 9.2 length : 9.2
average return after 60 : 9.4 length : 9.4
average return after 80 : 9.6 length : 9.6
average return after 100 : 9.6 length : 9.6
average return after 120 : 9.6 length : 9.6
average return after 140 : 26.4 length : 26.4
average return after 160 : 221.0 length : 221.0
av