In [None]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.optimizers import Adam
import numpy as np

In [1]:
#Dueling Deep Q Networks do not require val computation of all state-action pairs
#Seperate val state, val action to cover state-space generally 
#Improves computation in scenario where ideal action is nothing, saves time over computing ALL state-action pairs
class DuelingDeepQNetwork(keras.Model):
    def __init__(self, n_actions, fc1_dims, fc2_dims):
        super(DuelingDeepQNetwork, self).__init__()
        self.dense1 = keras.layers.Dense(fc1_dims, activation='relu')
        self.dense2 = keras.layers.Dense(fc2_dims, activation='relu')
        self.Value = keras.layers.Dense(1, activation=None) #output raw val state
        self.Advantage = keras.layers.Dense(n_actions, activation=None) #importance of taking action
        
    #advantage & value streams are combined to form Q(s, a) estimate
    #cannot simply sum both, require addl op to compute & derive advantage & val from Q(s, a)
    #Q(s, a) still computed, ideally can apply normal/prioritized exp replay, other Q-learning algos
    def call(self, state):
        #feed-fwd through dense layers, then val, adv streams
        x = self.dense1(state)
        x = self.dense2(x)
        Value = self.Value(x)
        Advantage = self.Advantage(x)
        
        Q = (Value + (Advantage - tf.math.reduce_mean(Advantage, axis=1, keepdims=True)))
        return Q
    
    def advantage(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        Advantage = self.Advantage(x)
        
        return Advantage

NameError: name 'keras' is not defined

In [2]:
class ReplayBuffer():
    def __init__(self, max_size, input_shape):
        self.mem_size = max_size
        self.mem_cntr = 0

        self.state_memory = np.zeros((self.mem_size, *input_shape),
                                        dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_shape),
                                        dtype=np.float32)
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool)

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.terminal_memory[index] = done

        self.mem_cntr += 1

    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size, replace=False)

        states = self.state_memory[batch]
        new_states = self.new_state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        dones = self.terminal_memory[batch]

        return states, actions, rewards, new_states, dones

In [None]:
class Agent():
    #replace defines when target net params updated (copied from online net) to provide model stability 
    def __init__(self, learning_rate, gamma, n_actions, epsilon, batch_size, input_dims, 
                epsilon_dec=1e-3, eps_end=0.01, 
                mem_size=1000000, fname='dueling_dqn.h5', fc1_dims=128,
                fc2_dims=128, replace=100):
        self.action_space = [i for i in range(n_actions)]
        self.gamma = gamma
        self. epsilon = epsilon
        self.eps_dec = epsilon_dec
        self.eps_end = eps_end
        self.fname = fname
        self.replace = replace
        
        self.learn_step_counter = 0
        self.memory = ReplayBuffer(mem_size, input_dims)
        self.q_eval = DuelingDeepQNetwork(n_actions, fc1_dims, fc2_dims) #online net 
        self.q_next = DuelingDeepQNetwork(n_actions, fc1_dims, fc2_dims) #target net  
        
        self.q_eval.compile(optimizer=Adam(learning_rate=learning_rate), loss='mean_squared_error')
        self.q_next.compile(optimizer=Adam(learning_rate=learning_rate), loss='mean_squared_error')
        
    def store_transition(self, state, action, reward, new_state, done):
        self.memory.store_transition(state, action, reward, new_state, done)
        
    def choose_action(self, observation):
        if np.random.random() < self.epsilon:
            action = np.random.choice(self.action_space)
        else:
            state = np.array([observation]) #add dim
            actions = self.q_eval.advantage(state) #determine importance of all actions 
            action = tf.math.argmax(actions, axis=1).numpy()[0] #provide index greatest action, returns single-elem arr, deref & access val
    
    