In [29]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
import gym

In [None]:
class ReplayBuffer():
    def __init__(self, max_size, input_dims):
        self.mem_size = max_size
        self.mem_cntr = 0
        
        #*input_shape = expecting a tuple and will unpack
        self.state_memory = np.zeros((self.mem_size, *input_dims), 
                                     dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_dims), 
                                         dtype=np.float32)
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        
        #Done flags from the environemnt,so the agent recieves no future rewards
        #once it encounters the terminal state (0) as the game is over
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.int32)

    #Function to store our tuple in our agents memory
    def store_transition(self, state, action, reward, state_, done):

        #position of first unoccupied memory
        index = self.mem_cntr % self.mem_size
        
        #set index value to what we pass in
        self.state_memory[index] = state
        self.new_state_memory[index] = state_       
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.terminal_memory[index] = 1 - int(done)

        #increment our memory counter by one
        self.mem_cntr += 1

    #As we have a finite subset of memories, we dont want to initialise zeros,
    #you only want to sample valid data, as we wont learn anything from zeros.
    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size, replace=False)
        
        states = self.state_memory[batch]
        states_ = self.new_state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        terminal = self.terminal_memory[batch]        
​
        return states, actions, rewards, states_, terminal

In [None]:
def build_dqn(lr, neurons_layer1, neurons_layer2, neurons_layer3, n_actions, input_dims):
    model = keras.Sequential([
            keras.layers.Dense(fc1_dims, activation='relu'),
            keras.layers.Dense(fc2_dims, activation='relu'),
            keras.layers.Dense(n_actions, activation=None)])
    
    model.compile(optimizer=Adam(learning_rate=lr), loss='mean_squared_error')
    
    return model

In [36]:
# Agent class: 

# Where the bulk of the functionality lives:
## The agent class has a DQN, memory, functionality for choosing actions, storing memories and learning

In [None]:
class Agent():
    def __init__(self, lr, gamma, n_actions, epsilon, batch_size, 
                  input_dims, epsilon_dec=0.9, epsilon_end=0.01, 
                  mem_size=10000, replace_target=100, fname='rl_control_model.h5'):

    #Dont train both networks
    #Only train the target network that you use to choose actions, 
    #replace the weights of the target network every 100 episodes.
        
        self.action_space = [i for i in range(n_actions)]
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_dec = epsilon_dec
        self.epsilon_min = epsilon_end
        self.batch_size = batch_size
        self.replace_target = replace_target
        self.model_file = fname
        self.memory = ReplayBuffer(mem_size, input_dims)
        self.q_eval = build_dqn(lr, 8, 8, 8, n_actions, input_dims)
        self.q_targ = build_dqn(lr, 8, 8, 8 n_actions, input_dims)


    #Stores the the ​state-action-reward
    def store_transition(self, state, action, reward, state_, done):
        self.memory.store_transition(state, action, reward, state_, done)

    #Choose an action, i.e. a policy, chooses an action based on its current state
    def choose_action(self, observation):
        if np.random.random() < self.epsilon:
            action = np.random.choice(self.action_space)
        else:
            state = np.array([observation])
            actions = self.q_eval.predict(state)
            action = np.argmax(actions)
            
        return action
    
    def learn(self):

        #When do we perform learning? Once the max_size is reach and memory is full?
        #If the number of memories saved is less than the batch size you will end up
        #sampling a single memory batch size times (single memory 64 times), you will
        #end up sampling the same batch 64 times - not good for training.

        #Double Q = q-value of a q-value
        
        if self.memory.mem_cntr > self.batch_size:
​
            state, actions, rewards, state_, done = \
                                self.memory.sample_buffer(self.batch_size)

            action_values = np.array(self.action_space, dtype=np.int8)
            action_indices = np.dot(action, action_values)
            
            q_next = self.q_target.predict(state_)
            q_eval = self.q_eval.predict(state_)

            q_pred = self.q_eval.predict(state)
     
            max_actions = np.argmax(q_eval, axis=1)

            q_target = np.copy(q_prednp)
​
            batch_index = np.arange(self.batch_size, dtype=np.int32)
            
            q_target[batch_index, actions] = rewards + \
                self.gamma * q_next[batch_index, max_actions.astype(int)]*done
            
            losses = self.q_eval.fit(state, q_target, verbose=0)

            if self.memory.mem_cntr % self.replace_target == 0:                
                self.update_network()
                
            return losses
​
    def update_epsilon(self):
        self.epsilon = self.epsilon*self.epsilon_dec if self.epsilon > \
            self.epsilon_min else self.epsilon_min 
        
    def update_network(self):  
            self.q_targ.set_weights(self.q_eval.get_weights()) 
        
   
    def save_model(self):
        self.q_eval.save(self.model_file)
            
    def load_model(self):
        self.q_eval = load_model(self.model_file) 
        
        if self.epsilon <= self.epsilon_min:
            self.update_network()