In [0]:
import random
import gym
import numpy as np
from collections import deque
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import os


In [0]:
env = gym.make('MountainCar-v0')

In [0]:

from tensorflow.keras import backend as K
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import multiply
from tensorflow.keras.layers import InputSpec
from tensorflow.keras import initializers


# Layer Implementation: https://github.com/chucnorrisful/dqn/blob/master/noisyNetLayers.py

class NoisyDense(Dense):
    def __init__(self, units, **kwargs):
        self.output_dim = units
        super(NoisyDense, self).__init__(units, **kwargs)

    def build(self, input_shape):
        assert len(input_shape) >= 2
        self.input_dim = input_shape[-1]

        self.kernel = self.add_weight(shape=(self.input_dim, self.units),
                                      initializer=self.kernel_initializer,
                                      name='kernel',
                                      regularizer=None,
                                      constraint=None)

        
        self.kernel_sigma = self.add_weight(shape=(self.input_dim, self.units),
                                      initializer=initializers.Constant(0.017),
                                      name='sigma_kernel',
                                      regularizer=None,
                                      constraint=None)

        if self.use_bias:
            self.bias = self.add_weight(shape=(self.units,),
                                        initializer=self.bias_initializer,
                                        name='bias',
                                        regularizer=None,
                                        constraint=None)

            
            self.bias_sigma = self.add_weight(shape=(self.units,),
                                        initializer=initializers.Constant(0.017),
                                        name='bias_sigma',
                                        regularizer=None,
                                        constraint=None)
        else:
            self.bias = None

        self.input_spec = InputSpec(min_ndim=2, axes={-1: self.input_dim})
        self.built = True

    def call(self, inputs):
        
        self.kernel_epsilon = K.random_normal(shape=(self.input_dim, self.units))

        w = self.kernel + (self.kernel_sigma * self.kernel_epsilon)
        output = K.dot(inputs, w)

        if self.use_bias:
            
            self.bias_epsilon = K.random_normal(shape=(self.units,))

            b = self.bias + (self.bias_sigma * self.bias_epsilon)
            output = output + b
        if self.activation is not None:
            output = self.activation(output)
        return output
    
    def compute_output_shape(self, input_shape):
        
        return (input_shape[0], self.output_dim)


In [0]:
state_size= env.observation_space.shape[0]
state_size

In [0]:
action_size= env.action_space.n
action_size

In [0]:
batch_size = 32


In [0]:
n_episodes= 70000

In [0]:
output_dir= 'model_output/MountainCar'

In [0]:
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [0]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.env= env
        self.state_size= state_size
        self.action_size= action_size
        
        self.memory= deque(maxlen=200000)
        
        self.gamma= 0.99
        
        self.epsilon = 1.0
        self.epsilon_decay= .92
        self.epsilon_min=0.00001
        
        self.learning_rate= 0.001251
        self.model= self._build_model()
        self.target_model=self._build_model()
        
        self.update_target_model()
        
    def _build_model(self):
        state_shape= self.env.observation_space.shape
        input_layer= tf.keras.Input(shape=(state_shape))
        fc1= tf.keras.layers.Dense(24, activation='relu') (input_layer)

        fc2= NoisyDense(48, activation='relu') (fc1) 
        
        advantage =tf.keras.layers.Dense(self.action_size, activation='linear') (fc2)
        avgadv= tf.keras.layers.Lambda(lambda x: tf.keras.backend.mean(x, axis=1, keepdims=True))(advantage)
        subadv= tf.keras.layers.Subtract() ([advantage, avgadv])
        
        value = tf.keras.layers.Dense(1, activation='linear') (fc2)
        qvals = tf.keras.layers.Add() ([value, subadv])
        
        model = tf.keras.Model(inputs=input_layer, outputs= qvals)
        
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate))
        
        return model
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    def act(self, state):
        act_values= self.model.predict(state)
        return np.argmax(act_values[0])
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        states=[]
        targets=[]
        
        for state, action, reward, next_state, done in minibatch:
            target=reward
            if not done:
                target = (reward + self.gamma * np.amax(self.target_model.predict(next_state)[0]))
            target_f= self.model.predict(state)
            target_f[0][action]= target
            states.append(state[0])
            targets.append(target_f[0])
            
        self.model.fit(np.array(states), np.array(targets), epochs=1, verbose=0)
            
        
    def load(self, name):
        self.model.load_weights(name)
    def save(self, name):
        self.model.save_weights(name)
            
    
        

In [0]:
agent= DQNAgent(state_size, action_size)

In [0]:
done = False
counter=0 
scores_memory= deque(maxlen=100)
test_score= deque(maxlen=100)
for e in range(n_episodes):
    state=env.reset()

    state= np.reshape(state, [1, state_size])
    
    for time in range(7000):
        #if e % 50==0:
            #env.render()
        action= agent.act(state)
        next_state, reward, done, halp =env.step(action)
        
        next_state = np.reshape(next_state, [1, state_size])

        
        agent.remember(state, action, reward, next_state, done)
            
        if len(agent.memory)>batch_size:
            agent.replay(batch_size)

        
        state = next_state

        if done:
            if time == 199:
                    time=time+1
            scores_memory.append(time)
            scores_avg= np.mean(scores_memory)*-1

            
            print('Episode: {}/{}, score: {}, state: {}, 100 episode score avg: {}'.format(e+1, n_episodes, time, state, scores_avg))

            break
        
        
    if e % 50==0:
        agent.save(output_dir + 'weights_final' + '{:04d}'.format(e) + ".hdf5")
        
    for ep in range(100):
        state=env.reset()

        state= np.reshape(state, [1, state_size])
        for timee in range(7000):
            action= agent.act(state)
            next_state, reward, done, halp =env.step(action)
        
            next_state = np.reshape(next_state, [1, state_size])

        
            state = next_state

            if done:
                if timee == 199:
                    timee=timee+1
                test_score.append(timee)

                break
    
    tscores_avg= np.mean(test_score)*-1
    print('Test_Episodes: {}/{}, score: {}, state: {}, 100 episode score avg: {}'.format(ep+1, ep+1, timee, state, tscores_avg))
    if tscores_avg >=-110:
      agent.save(output_dir + 'weights_final' + '{:04d}'.format(e+1) + ".hdf5")
    agent.update_target_model()

In [0]:
#DQN and building around OpenAIgym reference:https://www.youtube.com/watch?v=OYhFoMySoVs
#Double DQN reference: https://jaromiru.com/2016/11/07/lets-make-a-dqn-double-learning-and-prioritized-experience-replay/