In [None]:
import random
import pickle

import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam

In [None]:
# Create a class for the environment
class Env():
    def __init__(self, grid_size=21, max_steps=500):
        self.grid_size = grid_size
        self.max_steps = max_steps
        
        self.obstacles = np.array([[3, 0], [3, 1], [3, 2], [3, 3], [3, 4], [3, 5], [3, 6], [3, 7], [3, 8], [3, 9],
             [8, 9], [8, 10], [8, 11], [8, 12], [8, 13], [8, 14], [8, 15], [8, 16], [8, 17], [8, 18], [8, 19], [8, 20],
             [14, 0], [14, 1], [14, 2], [14, 3], [14, 4], [14, 5], [14, 6], [14, 7], [14, 8], [14, 9]])
        # self.rewards = np.zeros((grid_size, grid_size))
        self.reset()
        self.reset_goal()

    def check(self, test, array):
        return any(np.array_equal(x, test) for x in array)

    def reset(self):
        self.pos = np.random.randint(0, self.grid_size, size=2)
        while self.check(self.pos, self.obstacles):
            self.pos = np.random.randint(0, self.grid_size, size=2)
        self.steps = 0
        self.done = False
        return self.pos
    
    def reset_goal(self):
        self.goal = np.random.randint(0, self.grid_size, size=2)
        while self.check(self.goal, self.obstacles) or np.array_equal(self.pos, self.goal):
            self.goal = np.random.randint(0, self.grid_size, size=2)
        print('Goal:', self.goal)
        # for i in range(self.grid_size):
        #     for j in range(self.grid_size):
        #         self.rewards[i, j] = -self.euclidean_distance_from_goal(np.array([i, j]))
        # self.rewards[self.goal[0], self.goal[1]] = 100
        return self.goal
    
    # def step(self, action):
    #     self.steps += 1
    #     if action == 0 and self.pos[0] < self.grid_size - 1: # right
    #         self.pos[0] += 1
    #     elif action == 1 and self.pos[0] > 0: # left
    #         self.pos[0] -= 1
    #     elif action == 2 and self.pos[1] > 0: # down
    #         self.pos[1] -= 1
    #     elif action == 3 and self.pos[1] < self.grid_size - 1: # up
    #         self.pos[1] += 1
    #     else:
    #         pass
    #         # raise ValueError('Invalid action')
    #     if np.array_equal(self.pos, self.goal):
    #         self.done = True
    #         reward = 0
    #         # reward = 100
    #     elif self.steps >= self.max_steps:
    #         self.done = True
    #         reward = self.rewards[self.pos[0], self.pos[1]]
    #     else:
    #         reward = self.rewards[self.pos[0], self.pos[1]]
    #     return self.pos, reward, self.done

    def step(self, action): # As per the paper
        self.steps += 1
        prev_pos = self.pos.copy()
        if action == 0 and self.pos[0] < self.grid_size - 1 and not (self.check(self.pos + np.array([1, 0]), self.obstacles)): # right
            self.pos[0] += 1
        elif action == 1 and self.pos[0] > 0 and not (self.check(self.pos - np.array([1, 0]), self.obstacles)): # left
            self.pos[0] -= 1
        elif action == 2 and self.pos[1] > 0 and not (self.check(self.pos - np.array([0, 1]), self.obstacles)): # down
            self.pos[1] -= 1
        elif action == 3 and self.pos[1] < self.grid_size - 1 and not (self.check(self.pos + np.array([0, 1]), self.obstacles)): # up
            self.pos[1] += 1
        else:
            reward = -150
            self.done = False
            return self.pos, reward, self.done, True # TODO: The episode is not terminated.
        if np.array_equal(self.pos, self.goal):
            self.done = True
            reward = 500
        elif self.steps >= self.max_steps:
            self.done = True
            reward = 0
        else:
            if self.euclidean_distance_from_goal(self.pos) < self.euclidean_distance_from_goal(prev_pos):
                reward = 10
            else:
                reward = -10
        return self.pos, reward, self.done, False
        

    def euclidean_distance_from_goal(self, pos):
        dist = np.sqrt(np.sum((pos - self.goal) ** 2))
        return dist

    def get_orientation(self, prev_state, curr_state):
        if curr_state[0] > prev_state[0]:
            return 1 # Rotate 90 degrees clockwise
        elif curr_state[0] < prev_state[0]:
            return 3 # Rotate 90 degrees anti-clockwise
        elif curr_state[1] < prev_state[1]:
            return 2 # Rotate 180 degrees
        elif curr_state[1] > prev_state[1]:
            return 0 # No roation
        else:
            return 0 # No rotation

In [None]:
# Create an agent class
class Agent():
    def __init__(self, env, model, target_model):
        self.env = env
        self.model = model
        self.target_model = target_model
        self.target_model.set_weights(self.model.get_weights())
        self.gamma = 0.7
        self.epsilon = 1.0
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.0003
        self.batch_size = 64
        self.memory = deque(maxlen=1000)
        

    def add_to_memory(self, state, goal, action, reward, next_state, done):
        self.memory.append((state, goal, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(0, 4)
        else:
            # Pre-process the input
            inputt = np.concatenate((state, self.env.goal))
            inputt = tf.convert_to_tensor(inputt)
            inputt = tf.expand_dims(inputt, 0)

            return np.argmax(self.model.predict(inputt, verbose=0)[0]) # TODO: check the predict output

    def predict(self, inputt):

        return np.argmax(self.model.predict(inputt, verbose=0)[0])

    def replay(self):
        batch = random.sample(self.memory, self.batch_size)
        for state, goal, action, reward, next_state, done in batch:
            
            target = reward

            if not done:
                # Pre-process the next state input
                input_next = np.concatenate((next_state, goal))
                input_next = tf.convert_to_tensor(input_next)
                input_next = tf.expand_dims(input_next, 0)

                target += self.gamma * np.amax(self.target_model.predict(input_next, verbose=0)[0])

            # Pre-process the current state input
            inputt = np.concatenate((state, goal))
            inputt = tf.convert_to_tensor(inputt)
            inputt = tf.expand_dims(inputt, 0)

            cur_q_value = self.model.predict(inputt, verbose=0) # Q-value of current state
            cur_q_value[0][action] = target # TODO: check the predict output
            
            self.model.fit(inputt, cur_q_value, epochs=1, verbose=0)
            

In [None]:
# Initialize the agent

try:
    model = tf.keras.models.load_model('models/ql_fed_dqn.h5')
    print("Loaded model from disk")
    agent = Agent(Env(), model=model, target_model=model)
except:
    print("Creating new model")
    agent = Agent(Env(), model=create_model(input_shape=(4,), num_actions=4), target_model=create_model(input_shape=(4,), num_actions=4))

In [None]:
# Test the agent
success = 0
crash = 0
no_tests = 1000
for i in range(no_tests):
    state = agent.env.reset()
    goal = agent.env.reset_goal()
    print('Goal:', goal)
    prev_state = state.copy()
    for step in range(50):
        print('State:', state)
        if agent.env.get_orientation(prev_state, state) == 1:
            
        inputt = np.concatenate((state, goal))
        inputt = tf.convert_to_tensor(inputt)
        inputt = tf.expand_dims(inputt, 0)
        action = agent.predict(inputt)
        next_state, reward, done, terminate = agent.env.step(action)
        prev_state, state = state.copy(), next_state.copy()
        if done:
            print('State:', state)
            print('Steps: ', step+1)
            if np.array_equal(agent.env.goal, agent.env.pos):
                print('Reached the goal!')
                success += 1
            break
        elif terminate:
            print('State:', state)
            print('Steps: ', step+1)
            print('Terminated!')
            crash += 1
            break

print('Success rate:', success/no_tests)
print('Crash rate:', crash/no_tests)
print('Oscillate rate:', (no_tests - success - crash)/no_tests)