In [67]:
import numpy as np
import matplotlib.pyplot as plt
import random
import tensorflow as tf
from tensorflow import keras
from tqdm import tqdm
from tensorflow.keras import layers
import numpy as np
from collections import deque
plt.style.use("seaborn")

GRID_SIZE = np.zeros((4,12))

ACTIONS = np.array([[-1, 0],   #0 up
                    [1, 0],    #1 down
                    [0, -1],   #2 left
                    [0, 1]])   #3 right

actions = np.arange(4)   #represent ACTIONS

EPOCHS = 100

START = np.array([3,0])

TARGET = np.array([3,11])

class Environment:
    def step(self, State, Action):
        pos = State + Action
        for i in range(len(pos)):
            if pos[i] < 0:
                pos[i] = max(0,pos[i])
            else:
                pos[i] = min(GRID_SIZE.shape[i]-1,pos[i])
        
        done = False
        if (pos == TARGET).all() == True:
            done = True

        if pos[0] == 3 and 1 <= pos[1] <= 10: #fall off the cliff
            pos = START
            return pos, -100, done
        else:
            return pos, -1, done

In [68]:
class MemoryBuffer:
    def __init__(self, maxlen):
        self.buffer = deque(maxlen=maxlen)

    def add(self, experience):
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        indices = np.arange(len(self.buffer))
        sample_indices = random.choices(indices, k = batch_size)
        samples = np.array(self.buffer, dtype = object)[sample_indices]
    
        return map(np.array, zip(*samples))

In [69]:
class DQN_agent:
    def __init__(self, n_states, n_actions):
        self.n_states = n_states
        self.n_actions = n_actions
        self.q_network = self.build_q_network()
        self.t_q_network = self.build_q_network()
        self.buffer = MemoryBuffer(1000)
        self.optimizer = keras.optimizers.Adam(learning_rate = 1e-3, clipnorm=1.0)
        self.batch_size = 16
        self.epsilon = 1.0
        # timestep in an episode
        self.frame_count = 0
        self.gamma = 1.0
    
    def build_q_network(self):
        # Network architecture
        inputs = keras.Input(shape = self.n_states)
        x = layers.Dense(units = 8, activation = 'relu')(inputs)
        q_value = layers.Dense(units = self.n_actions)(x)

        return keras.Model(inputs = inputs, outputs = q_value)
    
    def ε_greedy_policy(self, t, state):
        # exploration and exploitation
        self.epsilon = 1.0
        if  self.epsilon >= np.random.rand(1)[0]:
            action = np.random.choice(self.n_actions)
        else:
            action_values = self.q_network(np.expand_dims(state, axis=(0)))
            action = np.argmax(action_values)

        return action

    def store(self, state, action, next_state, reward, done):
        # store training data
        self.buffer.add((state, action, reward, next_state, done))
    

    def train_q_network(self):
        # sample
        (states, actions, rewards, next_states, dones) = self.buffer.sample(self.batch_size)
        future_rewards = self.t_q_network.predict(next_states)
        # Q value = reward + discount factor * expected future reward
        updated_q_values = rewards + self.gamma * tf.reduce_max(future_rewards, axis=1)
        
        # set last q value to 0
        updated_q_values = updated_q_values*(1 - dones)
        masks = tf.one_hot(actions, self.n_actions)

        with tf.GradientTape() as tape:
          # Train the model on the states and updated Q-values
          q_values = self.q_network(states)
          # only update q-value which is chosen
          q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
          # calculate loss between new Q-value and old Q-value
          loss = tf.reduce_mean(tf.math.square(q_action - updated_q_values))
        
        # Backpropagation
        grads = tape.gradient(loss, self.q_network.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.q_network.trainable_variables))

    def update_target_network(self):
        # update per update_target_network steps
        self.t_q_network.set_weights(self.q_network.get_weights())

In [70]:
def DQN_training(epochs = EPOCHS):
    agent = DQN_agent(2,4)
    for t in tqdm(range(epochs)):
        env = Environment()
        state = START
        while True:
            agent.frame_count += 1
            action = agent.ε_greedy_policy(t+1,state)
            next_state, reward, done = env.step(state, ACTIONS[action])
            
            agent.store(state, action, next_state, reward, done)

            if agent.frame_count%16 == 0 and len(agent.buffer.buffer) >= agent.batch_size:
                agent.train_q_network()
            
            if agent.frame_count%100 == 0:
                agent.update_target_network()

            state = next_state
            if done == True:
                break
            
        agent.update_target_network()

    return agent

In [71]:
def print_optimal_policy(q_network):
    # display the optimal policy
    optimal_policy = []
    for i in range(0, 4):
        optimal_policy.append([])
        for j in range(0, 12):
            if ([i, j] == TARGET).all():
                optimal_policy[-1].append('G')
                continue
            action_values = q_network(np.expand_dims([i,j], axis=(0)))
            bestAction = np.argmax(action_values)
            if bestAction == 0:
                optimal_policy[-1].append('U')
            elif bestAction == 1:
                optimal_policy[-1].append('D')
            elif bestAction == 2:
                optimal_policy[-1].append('L')
            elif bestAction == 3:
                optimal_policy[-1].append('R')
    for row in optimal_policy:
        print(row)

In [72]:
agent = DQN_training()
print('Q-Learning Optimal Policy:')
print_optimal_policy(agent.q_network)

100%|██████████| 100/100 [38:15<00:00, 22.95s/it] 

Q-Learning Optimal Policy:
['R', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D']
['R', 'R', 'R', 'R', 'R', 'D', 'D', 'D', 'D', 'D', 'D', 'D']
['L', 'U', 'U', 'U', 'L', 'L', 'L', 'L', 'U', 'U', 'U', 'U']
['U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'G']



