# import libraries

In [None]:
import gym # env
from gym import wrappers

import keras # model creation

import numpy as np # handle matrix calculations

import os
# keep logs to a minimum
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

# define memory

the memory will house a stack of experiences that will be periodically sampled for experience replay learning

we sample non-sequential memories to get a broad generalization of the environment


In [2]:
import numpy as np


class Memory(
    object
):  # experience replay allows us to sample non-sequential memories that deters linear correlations
    def __init__(self, memory_size, state_size, action_space, one_hot_encoding=False):
        self.memory_size = memory_size # max num of batch of memories saved
        self.memory_counter = 0 # index of last memory saved

        # categorize actions into binary values to differentiate into easy computations
        self.one_hot_encoding = (
            one_hot_encoding
        )
        # create a matrix of zeros with specified shape for states
        self.state_memory = np.zeros(
            (self.memory_size, state_size)
        )

        # create a matrix of zeros with specified shape for new_states
        self.next_state_memory = np.zeros((self.memory_size, state_size))

        # discrete = definitive set of integers
        # continuous = real numeric value, use float 32 format because calculations run faster
        action_type = (
            np.int8 if self.one_hot_encoding else np.float32
        )  # continuous actions = valid integer nums

        self.action_memory = np.zeros(
            (self.memory_size, action_space), dtype=action_type
        )

        # scalar because simple tracking without shape
        self.reward_memory = np.zeros(self.memory_size)

        # if episode over, do NOT sample next state, reward = 0
        self.done_memory = np.zeros(
            self.memory_size, dtype=np.float32
        )

    def remember(
        self, state, action, reward, next_state, done
    ):  # (state, action, reward, next_state, done)

        # memory counter loops back to beginning to track & override stack within valid finite range
        index = self.memory_counter % self.memory_size  # stack

        # add/override current state to state_memory at array position index
        self.state_memory[index] = state
        self.next_state_memory[index] = next_state

        # store one-hot encoding if discrete = true
        # the one-hot encoding technique transforms nominal (order does not matter) categorical features & creates new binary columns for each observation
        # adding columns to your dataset of 1's and 0's
        # one-hot encoding is not always a good choice when there are to many categories
        if self.one_hot_encoding:
            actions = np.zeros(
                self.action_memory.shape[1]
            )
            actions[action] = 1.0
            self.action_memory[index] = actions
        else:
            self.action_memory[index] = action

        self.reward_memory[index] = reward
        self.done_memory[index] = 1 - int(done) # episode over = 0; env returns True = 1

        self.memory_counter += 1

    # collect subset of memories to periodically update Q-values
    def sample(self, batch_size):
        # sample only valid memories (non-zero); array from 0 - (max_mem-1)
        max_memories = min(
            self.memory_counter, self.memory_size
        )

        # randomize data selection to avoid overfitting / selecting same actions because of data concentration
        mini_batch = np.random.choice(max_memories, batch_size)

        # subset of experiences
        states_mb = self.state_memory[mini_batch]
        next_states_mb = self.next_state_memory[mini_batch]
        actions_mb = self.action_memory[mini_batch]
        rewards_mb = self.reward_memory[mini_batch]
        done_mb = self.done_memory[mini_batch]

        return states_mb, actions_mb, rewards_mb, next_states_mb, done_mb


# define the model

In [3]:
class ANN(object):
    def __init__(self, action_space, learning_rate, state_size):
        self.action_space = action_space
        self.alpha = learning_rate
        self.state_size = state_size # (width, height)
        self.model = keras.Sequential()

        # the sequence of the layers determine how the dataset is preprocessed
        # relu = rectifier function that allows for the account for non-linear effects: variables that influence each other but don't have a direct correlation
        self.model.add(
            keras.layers.Dense(units=256, input_shape=(self.state_size,), activation="relu")
        )
        # dense layer = by the inputs to the outputs by establishing & refining weights (relationships)
        # input_shape w hanging comma implies batch
        self.model.add(
            keras.layers.Dense(units=256, input_shape=(self.state_size,), activation="relu")
        )
        # final layer needs to match action space so that # of predictions match # of possible actions
        self.model.add(keras.layers.Dense(self.action_space, activation="softmax"))

        print(self.model.summary())
        # track gradient descent
        self.model.compile(optimizer=keras.optimizers.Adam(lr=self.alpha), loss="mse")


# deep Q-learning issues

using the same network to both choose the best action and evalue the quality of that action leads to learning instability

the bellman max function in calculating the target value inherently bias towards short-term high rewards

this can lead to the model getting stuck in a local minimum

![](https://drive.google.com/uc?id=1P2oEWUdGZRt6SNpvjQif240nlAmPcu8l)

**when we calculate the loss, no moving target**

- It’s like if you were a cowboy (the Q estimation) and you want to catch the cow (the Q-target) you must get closer (reduce the error).

- At each time step, you’re trying to approach the cow, which also moves at each time step (because you use the same parameters).

![](https://drive.google.com/uc?id=1Jru3e5bvzmf43FW2ayo6q0X0qxzJT0T5)

# Double Q-Learning with Dueling Architecture

take the argmax of the outputs of the online network, but the Q-value for this action is evaluated from the target network (bellman equation)

![](https://drive.google.com/uc?id=1QPApz4O9u6Tuez8k1lUuJMy2wkG2XpIF)



# define agent

In [4]:
class Agent(object):
    def __init__(self, actions, state_size):
        self.actions = actions # number of actions
        self.action_space = [i for i in range(actions)] # list of defined actions in env
        self.batch_size = 64
        self.epsilon = 1.0 # probability of random actions taken
        self.epsilon_decay = 0.95 # rate at which random actions are overtaken by trained actions
        self.epsilon_min = 0.1
        self.learning_rate = 0.0003
        self.gamma = 0.9 # discount factor
        self.memory = Memory(1000000, 8, self.actions, True)
        self.name = "DDQN.h5"
        self.state_size = state_size

        self.online_network = self.buildNetwork()  # takes actions and builds up experience
        self.target_network = self.buildNetwork()  # determines Q-value policy

        self.update_target = 100  # update target_network weights after online_network has collected x amount of experiences

    def buildNetwork(self):
        return ANN(self.actions, self.learning_rate, self.state_size)

    def chooseAction(self, state):

        state = state[np.newaxis, :] # allow for batch training and single memory
        random_num = np.random.random() # generate random number

        # epsilon-greedy action selection policy
        # if randomly generated number is less than epsilon, than take random action
        if random_num < self.epsilon:
            # choose random action
            action = np.random.choice(self.action_space)
        else:
            # else take trained actions predicted by neural network (Q-value of current state)
            actions = self.online_network.model.predict(
                state
            )
            # identify best action in set of possible actions per state
            action = np.argmax(actions)

        return action

    # copy online_network weights and replace for target_network weights
    def updateTargetWeights(self):
        self.target_network.model.set_weights(self.online_network.model.get_weights())

    def save(self):
        self.online_network.model.save("/models/" + self.name)

    def loadModel(self, file):
        self.online_network = tf.keras.models.load_model("/models/" + file)

        # make sure target_network and online_network always start with equal weights
        if self.epsilon == self.epsilon_min:
            self.updateTargetWeights()

    def learn(self):
        # pre-train process is waiting for the memory to fill up with experiences before starting training process
        if self.memory.memory_counter > self.batch_size:
            state, action, reward, next_state, done = self.memory.sample(
                self.batch_size
            )
            # convert actions back to regular encoding indices from one-hot encoding
            action_values = np.array(
                self.action_space, dtype=np.int8
            )
            action_labels = np.dot(action, action_values)

            # double deep Q-learning

            # best action in the next state
            Qs_next_online = self.online_network.model.predict(next_state)
            Qs_next_target = self.target_network.model.predict(next_state)

            # actual Q-value of actions taken from the current state
            Qs_actual = self.online_network.model.predict(state)

            # get max Q-values in next state
            Qs_target_next_state = np.argmax(Qs_next_online, axis=1)

            # initialize a Q-value matrix to calculate the loss in gradient descent
            Qs_target = Qs_actual

            # use done flag to identify if terminal state has been reached
            batch_index = np.arange(self.batch_size, dtype=np.int32)

            # use the bellman equation to calculate Q-target values[given_states, actions]
            # Q(s,a) = r + y * Q(s', a')
            Qs_target[batch_index, action_labels] = (
                reward
                + self.gamma
                * Qs_next_target[batch_index, Qs_target_next_state.astype(int)]
                * done
            )  # done ensures future states do not get sampled after terminal state

            # the stabalized Q-value target prediction from the target_network based on the weights of the online network
            # use the fit() to calculate the loss with mse error: 1/2*(actual-predicted)^2

            # process:
                # forward-propogate states through online cnn
                # take online prediction and compare it to Q-value target
                # comparison = loss
                # backpropogate loss to adjust weights
            _ = self.online_network.model.fit(
                state, Qs_target, verbose=0
            ) # verbose removes terminal info printing

            # after weight adjustments, reduce probability of random actions taken if greater than epsilon_min
            self.epsilon = (
                self.epsilon * self.epsilon_decay
                if self.epsilon > self.epsilon_min
                else self.epsilon_min
            )

            # update target_network weights if online_network has collected enough experiences
            if self.memory.memory_counter % self.update_target == 0:
                self.updateTargetWeights()

# setup main playthrough

In [0]:
env = gym.make("LunarLander-v2")

agent = Agent(actions=4, state_size=8)

total_episodes = 500
# agent.loadModel()
scores = []  # track agent progress
policy = []  # track epsilon greedy rate

env = wrappers.Monitor(
    env, "gameplay", video_callable=lambda episode_id: True, force=True
)

# training
for episode in range(total_episodes):
    done = False
    score = 0
    state = env.reset()  # start episode with initial state observation
    while not done:
        action = agent.chooseAction(state)
        next_state, reward, done, info = env.step(action)  # info = general env data

        score += reward

        agent.memory.remember(
            state, action, reward, next_state, done
        )  # store experience to memory for exp replay
        state = next_state  # transition to next state
        agent.learn()

    policy.append(agent.epsilon)
    scores.append(score)

    avg_score = np.mean(scores[max(0, episode - 100) : (episode + 1)])
    print(
        "episode: ", episode, "score: %.2f" % score, "avg score: %.2f" % avg_score
    )

    if episode % 10 == 0 and episode > 0:
        agent.save()

