In [None]:
"""
    algorithm: Noisy Dueling DDQN(Factorized Gaussian noise)
        Random noise is added to the connection weight(w) and bias(b) of the neural network linear layer to randomize 
the output Q function. The DQN only train the W and b of the network.But in the Noisy_DQN algorithm, we should train the
mean(mu) and std(sigma) of the former parameters.
        More details you can learn from the paper: https://arxiv.org/pdf/1706.10295v1.pdf
        key points:
            1. Add noise to the parameters in network, in order to aid efficient exploration.
        
    environment: CartPole-v0
    state:
        1.Cart Position:[-4.8,4.8],  2.Cart Velocity[-Inf,Inf],  3.Pole Angle[-24 deg, 24 deg]
        4.Pole Velocity [-Inf,Inf]
    action:
        0: left    
        1: right
    reward: 1 for every step    
        
    prerequisites:  tensorflow 2.2(tensorflow >= 2.0)
    notice：
    
    author: Xinchen Han
    date: 2020/7/29
    
    The implement of algorithm is faulty, I haven't solved this problem so far.
"""

In [None]:
from tensorflow import keras
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from collections import deque
import gym
import random

In [None]:
"""Environment"""
env = gym.make('CartPole-v0')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

"""Random seed"""
env.seed(6)
np.random.seed(6)
random.seed(6)
tf.random.set_seed(6)

"""Set hyperparameters"""
alpha = 0.9
gamma = 0.9
max_episodes = 500

render = True

In [None]:
class Noisy_DQN(object):
    def __init__(self):
        self.step = 0
        self.batch_size = 64
        self.update_freq = 100
        self.replay_size = 1000
        self.replay_queue = deque(maxlen=self.replay_size)
        self.model = self.create_model()
        self.target_model = self.create_model()
        self.optimizer = keras.optimizers.Adam(1e-3)

    def sub(self, args):
        return args[0] - args[1]

    def add(self, args):
        return args[0] + args[1]

    def create_model(self):
        input = keras.layers.Input(shape=state_dim)
        hidden1 = keras.layers.Dense(64, activation='relu')(input)
        hidden2 = keras.layers.Dense(16, activation='tanh')(hidden1)
        value = keras.layers.Dense(1)(self.Noisy_Dense(1, hidden2))

        adv = keras.layers.Dense(action_dim)(self.Noisy_Dense(action_dim, hidden2))
        mean = keras.layers.Lambda(lambda x: tf.reduce_mean(x, axis=1, keepdims=True))(adv)
        advantage = keras.layers.Lambda(self.sub)([adv, mean])

        output = keras.layers.Lambda(self.add)([value, advantage])
        model = keras.models.Model(inputs = [input], outputs = [output])

        return model

    def func(self, x):
        return tf.multiply(tf.sign(x), tf.pow(tf.abs(x), 0.5))

    def Noisy_Dense(self, units, input_data):

        w_shape = [units, input_data.shape[1]]
        epsilon_w_1 = tf.random.normal(shape = w_shape, mean=0.0,
                                       stddev=1 / np.power(input_data.shape[1], 0.5))
        epsilon_w_2 = tf.random.normal(shape = w_shape, mean=0.0,
                                       stddev=1 / np.power(input_data.shape[1], 0.5))

        epsilon_w = self.func(epsilon_w_1) * self.func(epsilon_w_2)
        mu_w = tf.random.truncated_normal(shape = w_shape, mean=0.0,
                                          stddev=1 / np.power(input_data.shape[1], 0.5))
        sigma_w = tf.constant_initializer(0.1 / np.power(input_data.shape[1], 0.5))

        b_shape = [units]
        epsilon_b = tf.random.normal(shape=b_shape, mean=0.0,
                                     stddev=1 / np.power(input_data.shape[1], 0.5))
        epsilon_b = self.func(epsilon_b)
        mu_b = tf.random.truncated_normal(shape = b_shape, mean=0.0,
                                          stddev=1 / np.power(input_data.shape[1], 0.5))
        sigma_b = tf.constant_initializer(0.1 / np.power(input_data.shape[1], 0.5))

        w = tf.add(mu_w, tf.multiply(sigma_w.value, epsilon_w))
        b = tf.add(mu_b, tf.multiply(sigma_b.value, epsilon_b))

        print("noise:{}".format(tf.matmul(input_data, tf.transpose(w)) + b))
        return tf.matmul(input_data, tf.transpose(w)) + b

    def choose_action(self, state):
        return np.argmax(self.model.predict(tf.convert_to_tensor([state], dtype=tf.float32))[0])

    def choose_max_action(self, state):
        return np.argmax(self.model.predict(tf.convert_to_tensor([state], dtype=tf.float32))[0])

    def fill_replay(self, state, action, state_, reward, done):
        self.replay_queue.append((state, action, state_, reward, done))

    def model_train(self):
        self.step += 1
        if self.step % self.update_freq == 0:
            self.target_model.set_weights(self.model.get_weights())

        replay_batch = random.sample(self.replay_queue, self.batch_size)
        state_batch = np.array([replay[0] for replay in replay_batch])
        action_batch = np.array([replay[1] for replay in replay_batch])
        next_state_batch = np.array([replay[2] for replay in replay_batch])
        reward_batch = np.array([replay[3] for replay in replay_batch])
        done_batch = np.array([replay[4] for replay in replay_batch])

        with tf.GradientTape() as tape:
            tape.watch(self.model.variables)
            Q = self.model(tf.convert_to_tensor(state_batch, dtype=tf.float32))
            Q_next = self.target_model(tf.convert_to_tensor(next_state_batch, dtype=tf.float32))

            max_action = tf.argmax(Q, axis=1)  # Nature_DQN is tf.argmax(Q_next, axis=1)
            Q = tf.reduce_sum(tf.one_hot(action_batch, action_dim) * Q, axis=1)
            Q_next = tf.reduce_sum(tf.one_hot(max_action, action_dim) * Q_next, axis=1)

            target_value = (1 - done_batch) * gamma * Q_next + reward_batch
            loss = tf.reduce_mean(tf.square(Q - target_value) * 0.5)

        grads = tape.gradient(loss, self.model.variables)
        self.optimizer.apply_gradients(zip(grads, self.model.variables))


In [None]:
if __name__ == "__main__":
    agent = Noisy_DQN()
    score_list = []
    for episode in range(max_episodes):
        state = env.reset()
        score = 0
        while True:
            action = agent.choose_action(state)
            if render:
                env.render()
            state_, reward, done, _ = env.step(action)
            agent.fill_replay(state, action, state_, reward, done)
            if len(agent.replay_queue) >= agent.replay_size:
                agent.model_train()
            state = state_
            score += reward
            if done:
                score_list.append(score)
                print('episode:', episode, 'score:', score, 'max_score:', np.max(score_list))
                if len(agent.replay_queue) >= agent.replay_size:
                    print("   Training   ....")
                break
        if np.mean(score_list[-10:]) > 180:
            break
    env.close()
    plt.plot(score_list, color='orange')
    plt.show()

