In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.animation as animation

import time
import random
import copy

## Environments (Planet)

In [2]:
# U D L R
ACTIONS = {0: (-1, 0), 1: (1, 0), 2: (0, -1), 3: (0, 1)}
class Planet(object):
    def __init__(self, planet_width, num_robots):
        # start with defining your planet
        self.planet_width = planet_width
        self.planet_edge = planet_width - 1
        self.planet = np.zeros((planet_width, planet_width)).astype(int)
        
        #Traps
        #self.planet[3, 4] = 1
        #self.planet[2, 1] = 1
        #self.planet[7, 2] = 1
        #self.planet[4, 8] = 1
        
        #Robots
        self.num_robots = num_robots
        self.robot_positions = []
        self.agent_setup()
        
        
        self.steps = 0 # contains num steps robots took
        self.allowed_states = None # for now, this is none
        self.construct_allowed_states() 
    
    def agent_setup(self):
        for r in range(self.num_robots):
            found = False
            val_r = r
            while found == False:  
                temp_y = val_r // self.planet_width
                temp_x = val_r % self.planet_width
                if self.planet[temp_y, temp_x] == 0:
                    found = True
                else:
                    val_r += 1
                    
            self.robot_positions.append((temp_y, temp_x))
            self.planet[temp_y, temp_x] = 2
            
    
    def is_allowed_move(self, state, action):
        y, x = state
        y += ACTIONS[action][0]
        x += ACTIONS[action][1]
        # moving off the board
        if y < 0 or x < 0 or y > self.planet_edge or x > self.planet_edge:
             return False
        # moving into empty space
        elif self.planet[y, x] != 0:
            return False
        else:
            return True
        
    def construct_allowed_states(self):
        allowed_states = {}
        for y, row in enumerate(self.planet):
            for x, col in enumerate(row):
                # iterate through all valid spaces
                if self.planet[(y,x)] != 1:
                    allowed_states[(y,x)] = []
                    for action in ACTIONS:
                        if self.is_allowed_move((y, x), action):
                            allowed_states[(y,x)].append(action)
        self.allowed_states = allowed_states
        
    def update_planet(self, action, robot_num):
        y, x = self.robot_positions[robot_num]
        old_y, old_x = self.robot_positions[robot_num]
        y += ACTIONS[action][0]
        x += ACTIONS[action][1]
        self.planet[y, x] = 2
        self.planet[old_y, old_x] = 0
        self.robot_positions[robot_num] = (y, x)
        
    def invalid_move(self, action, robot_num):
        y, x = self.robot_positions[robot_num]
        y += ACTIONS[action][0]
        x += ACTIONS[action][1]
        return (y, x)  
                
    def is_game_over(fully_covered):
        return fully_covered
    
    def give_reward(self, state_history, robot_num, visited_locations):
        location = self.robot_positions[robot_num]
        if location not in visited_locations:
            return 1
        else:
            return -0.05
    def get_state_and_reward(self, state_history, robot_num, visited_locations):
        return self.robot_positions[robot_num], self.give_reward(state_history, robot_num, visited_locations)

## Agent Class (Robots)

In [3]:
# U D L R
ACTIONS = {0: (-1, 0), 1: (1, 0), 2: (0, -1), 3: (0, 1)}
class Agent(object):
    def __init__(self, states, alpha=0.15, gamma = 0.9, random_factor=0.2, robot_num=9, planet_width=6):
        self.state_history = [((0, 0), 0, 0, (0, 0))] # state, reward, action, next_state
        self.initial_alpha = alpha
        self.alpha = alpha
        self.learning_decay = 1
        self.gamma = gamma
        self.random_factor = random_factor
        self.robot_num = robot_num
        
        # start the rewards table
        self.qtable = np.zeros(((planet_width*planet_width), 4)).tolist()
        
        self.seen_locations = []

    def update_state_history(self, state, reward, action, next_state):
        self.state_history.append((state, reward, action, next_state))
        
    def learn(self, state, reward, action, next_state, episode, planet_width):
        a = self.alpha
        g = self.gamma
        y, x = state
        state_index = (y * planet_width) + x
        current_qReward =  self.qtable[state_index][action]
        Ny, Nx = next_state       
        next_state_index = (Ny * planet_width) + Nx
        self.qtable[state_index][action] = current_qReward + (a * (reward + (g * (max(self.qtable[next_state_index]))) - current_qReward))
        self.state_history = [] # reset the state_history
        self.random_factor = self.random_factor*0.9995 # decrease random_factor
        if self.random_factor <= 0.001:
            self.random_factor = 0.001 
        self.alpha = (1/(1+self.learning_decay*episode))*self.initial_alpha
            
    def choose_action(self, state, allowed_moves, state_history, planet, visited_locations):        
        next_move = None
        n = np.random.uniform()
        # Random
        if n < self.random_factor:
            if not allowed_moves:
                valid = False
            else:
                next_move = np.random.choice(allowed_moves)
                valid = True
        # Choose best action
        else:
            y, x = state
            state_index = (y * planet.planet_width) + x
            
            unvisited_actions = []
            
            # Compile actions that vist unknown locations
            action = 0
            while action < 4:
                temp_y, temp_x = state
                temp_y += ACTIONS[action][0]
                temp_x += ACTIONS[action][1]
                if not (temp_y < 0 or temp_x < 0 or temp_y > planet.planet_edge or temp_x > planet.planet_edge):
                    if planet.planet[(temp_y,temp_x)] == 0: 
                        if not ((temp_y, temp_x) in visited_locations):
                            action_reward = self.qtable[state_index][action]
                            unvisited_actions.append((action_reward, action))
                action += 1
            
            #If no unvisited locations looks at all possibilites
            if unvisited_actions == []:
                # Compile all possible actions
                action = 0
                while action < 4:
                    temp_y, temp_x = state
                    temp_y += ACTIONS[action][0]
                    temp_x += ACTIONS[action][1]
                    if not (temp_y < 0 or temp_x < 0 or temp_y > planet.planet_edge or temp_x > planet.planet_edge):
                        if planet.planet[(temp_y,temp_x)] == 0: 
                            action_reward = self.qtable[state_index][action]
                            unvisited_actions.append((action_reward, action))
                    action += 1                   
                # If still no possible moves
                if unvisited_actions == []:
                    valid = False
                else:
                    action_label = unvisited_actions[unvisited_actions.index(max(unvisited_actions))]
                    unvisited_actions.remove(action_label)
                    next_move = action_label[1]    
                    valid = True                       
            else:
                action_label = unvisited_actions[unvisited_actions.index(max(unvisited_actions))]
                unvisited_actions.remove(action_label)
                next_move = action_label[1]    
                valid = True
        return next_move, valid           

## Server

In [4]:
class Server(object):
    def __init__(self):
        # Variables
        self.fully_covered = False
        
        # Track visited locations
        self.no_visited_locations = 0
        self.visited_locations = []
        
    def reset_count(self):
        # Reinitialise varaibles
        self.fully_covered = False
        self.no_visited_locations = 0
        self.visited_locations = []

## Learning Loop

In [None]:
if __name__ == '__main__':  
    # Hyper Parameters
    Learning_Rate = 0.001
    Error_Rate = 0.9
    Epsilon = 0.5
    Episodes = 1000
    
    # Simulation Parameters
    planet_size = 10
    num_robots = 3
    Movement_Budget = 500 #planet_size * 100    
    completed_count = 0
    
    print("Hello")
    
    planet = Planet(planet_width=planet_size, num_robots=num_robots)   
    server = Server()
    print("Intialised Planet")
    print(planet.planet)
    
    # Initialise Robots 
    swarm = []
    for r in range(num_robots):
        temp_robot = Agent(planet.planet, alpha=Learning_Rate, gamma=Error_Rate, random_factor=Epsilon, robot_num=r, planet_width=planet.planet_width)
        swarm.append(temp_robot)
    
    moveHistory = []
    epsilonHistory = []
    
    # Possible visitable locations
    visitable_locations = (np.size(planet.planet) - np.count_nonzero(planet.planet == 1))
    
    for i in range(Episodes):
        if i % 500 == 0:                    
            print("Episode",i)
            print("Completed", completed_count) 
            completed_count = 0
        
        #Intialise Variables
        movementBudget = Movement_Budget
        server.reset_count()
        # vist all starting locations
        for r in range(num_robots):
            if planet.robot_positions[r] not in server.visited_locations:
                server.visited_locations.append(planet.robot_positions[r])
                server.no_visited_locations += 1
        
        while not server.fully_covered:  
            for n in range(num_robots):
                robot = swarm[n]
                sHistory = robot.state_history
                state, _ = planet.get_state_and_reward(sHistory, robot.robot_num, server.visited_locations) # get the current state
                action, valid = robot.choose_action(state, planet.allowed_states[state], sHistory, planet, server.visited_locations) # choose an action (explore or exploit)
                # There is a possible move for the robot
                if valid:
                    planet.update_planet(action, robot.robot_num) # update the planet according to the action
                    next_state, reward = planet.get_state_and_reward(sHistory, robot.robot_num, server.visited_locations) # get the new state and reward    
                    if next_state not in server.visited_locations:
                        server.visited_locations.append(next_state)
                        server.no_visited_locations += 1
                        if server.no_visited_locations >= visitable_locations:
                            completed_count += 1
                            reward = 10
                    robot.learn(state, reward, action, next_state, i, planet.planet_width)
                
            planet.steps += 1
            movementBudget -= 1
            if server.no_visited_locations >= visitable_locations: 
                server.fully_covered = True
            elif movementBudget <= 0:
                # End the episode if the movement budget runs out
                server.fully_covered = True        

        moveHistory.append(planet.steps) # get a history of number of steps taken to plot later
        epsilonHistory.append(robot.random_factor)
        
        # Intialise the variables
        planet = Planet(planet_width=planet_size, num_robots=num_robots) 
        

print("Episode",i)
print("Epsilon",robot.random_factor)
print("Completed", completed_count) 
        
print("Visited Locations:", server.no_visited_locations, "Steps:", moveHistory[-1])  
plt.figure(figsize=(18, 10))
plt.semilogy(moveHistory, color="b")
#plt.semilogy(epsilonHistory, color="r")
plt.show()

## Testing and Simulation

In [None]:
testHistory = []
server.reset_count()
planet = Planet(planet_width=planet_size, num_robots=num_robots)
colors = plt.cm.jet(np.linspace(0,1,num_robots))

pathData = []

# Trap Locations
tx, ty = [], []
# Generate Arrays for Robot Locations
locations_array = []
pathData = []
for r in range(num_robots):
    # Appends pasty, pastx, currety, currentx
    locations_array.append(([],[],[],[]))
    # Used to show paths of robots
    pathData.append([])
    
for y, row in enumerate(planet.planet):
    for x, col in enumerate(row):
        if planet.planet[(y,x)] == 1:
            tx.append(x)
            ty.append(y)

# Intialise original positions
for r in range(num_robots):
    pathData[r].append((planet.robot_positions[r][1],planet.robot_positions[r][0]))
    locations_array[r][0].append(planet.robot_positions[r][0])
    locations_array[r][1].append(planet.robot_positions[r][1])
    plt.scatter(locations_array[r][1], locations_array[r][0], marker='s', color=colors[r])
     
plt.xlim([-1, planet_size])
plt.ylim([-1, planet_size])   
plt.show()

vist_count = 0
action_history = []
#Intialise Variables
movementBudget = Movement_Budget
server.reset_count()
#Graph data
coverData = []
movementData = []
dataLength = 100# Movement_Budget

# vist all starting locations
for r in range(num_robots):
    if planet.robot_positions[r] not in server.visited_locations:
        server.visited_locations.append(planet.robot_positions[r])
        server.no_visited_locations += 1

coverData.append((server.no_visited_locations/visitable_locations)*100)
movementData.append(((Movement_Budget - movementBudget)/Movement_Budget)*100)
        
while not server.fully_covered:  
    for n in range(num_robots):
        robot = swarm[n]
        sHistory = robot.state_history
        state, _ = planet.get_state_and_reward(sHistory, robot.robot_num, server.visited_locations) # get the current state 
        action, valid = robot.choose_action(state, planet.allowed_states[state], sHistory, planet, server.visited_locations) # choose an action (explore or exploit)
        # There is a possible move for the robot
        if valid:
            planet.update_planet(action, robot.robot_num) # update the planet according to the action
            next_state, reward = planet.get_state_and_reward(sHistory, robot.robot_num, server.visited_locations) # get the new state and reward    
            if next_state not in server.visited_locations:
                server.visited_locations.append(next_state)
                server.no_visited_locations += 1
                if server.no_visited_locations >= visitable_locations:
                    completed_count += 1
                    reward = 10
            robot.learn(state, reward, action, next_state, i, planet.planet_width)
            

    planet.steps += 1
    movementBudget -= 1
    coverData.append((server.no_visited_locations/visitable_locations)*100)
    movementData.append(((Movement_Budget - movementBudget)/Movement_Budget)*100)
    if server.no_visited_locations >= visitable_locations: 
        server.fully_covered = True
    elif movementBudget <= 0:
        # End the episode if the movement budget runs out
        server.fully_covered = True      

    # Get locations of robots for plotting   
    for r in range(num_robots):
        pathData[r].append((planet.robot_positions[r][1],planet.robot_positions[r][0]))
        locations_array[r][0].append(planet.robot_positions[r][0])
        locations_array[r][1].append(planet.robot_positions[r][1])
        
for r in range(num_robots):
    pathData[r].append((planet.robot_positions[r][1],planet.robot_positions[r][0]))
    locations_array[r][0].append(planet.robot_positions[r][0])
    locations_array[r][1].append(planet.robot_positions[r][1])

# Fill the arrays to the complete size 
while len(coverData) != dataLength:
    coverData.append(100)

while len(movementData) != dataLength:
    movementData.append(((Movement_Budget - movementBudget)/Movement_Budget)*100)

# cover%/t
plt.figure(figsize=(18, 10))
plt.title("Covered/Time")
plt.xlabel("Time")
plt.ylabel("% Covered")
plt.plot(coverData, color="b")
plt.ylim(0,105)

# budget/t
plt.figure(figsize=(18, 10))
plt.title("Movement/Time")
plt.xlabel("Time")
plt.ylabel("% of Movement Budget Expended")
plt.plot(movementData, color="g")
plt.ylim(0,105)
# Path Plot
"""
data = np.asarray(pathData)

for i in range(planet_size):
    # horizontal and vertical background lines
    plt.plot([i,i], [0,(planet_size - 1)], linewidth=0.5, color='black')
    plt.plot([0,(planet_size - 1)], [i,i], linewidth=0.5, color='black')
for r in range(num_robots):
    plt.plot(data[r][:,0], data[r][:,1], color = colors[r]) # line segments
    plt.scatter(data[r][:,0], data[r][:,1], color = colors[r]) # markers

plt.show()
"""

fig, ax = plt.subplots()

def animate(i):
    print(i)
    plt.clf()
    plt.xlim([-1, planet_size])
    plt.ylim([-1, planet_size])
    plt.scatter(tx, ty, marker = 'x', color = 'r') 
    
    for r in range(num_robots):
        plt.scatter(locations_array[r][1][i], locations_array[r][0][i], marker = 's', color = colors[r]) 
        locations_array[r][2].append(locations_array[r][0][i])
        locations_array[r][3].append(locations_array[r][1][i])
        plt.scatter(locations_array[r][3], locations_array[r][2], marker = '.', color = colors[r])        
    return fig

ani = animation.FuncAnimation(fig, animate, repeat=True,
                                    frames=len(locations_array[r][0]) - 1, interval=500) 

# To save the animation using Pillow as a gif
writer = animation.PillowWriter(fps=3,
                             metadata=dict(artist='Me'),
                             bitrate=1800)
ani.save('path.gif', writer=writer)
plt.show()
        

In [None]:
# Original #
"""
testHistory = []
server.reset_count()
planet = Planet(planet_width=planet_size, num_robots=num_robots)
colors = plt.cm.jet(np.linspace(0,1,num_robots))

# Trap Locations
tx, ty = [], []
# Generate Arrays for Robot Locations
locations_array = []
for r in range(num_robots):
    # Appends pasty, pastx, currety, currentx
    locations_array.append(([],[],[],[]))

for y, row in enumerate(planet.planet):
    for x, col in enumerate(row):
        if planet.planet[(y,x)] == 1:
            tx.append(x)
            ty.append(y)
            
# Intialise original positions
for r in range(num_robots):
    locations_array[r][0].append(planet.robot_positions[r][0])
    locations_array[r][1].append(planet.robot_positions[r][1])
    plt.scatter(locations_array[r][1], locations_array[r][0], marker='s', color=colors[r])
     
plt.xlim([-1, planet_size])
plt.ylim([-1, planet_size])   
plt.show()

vist_count = 0
action_history = []
#robot.random_factor = 0 # Only choose the best action
#Intialise Variables
movementBudget = Movement_Budget
server.reset_count()
# vist all starting locations
for r in range(num_robots):
    if planet.robot_positions[r] not in server.visited_locations:
        server.visited_locations.append(planet.robot_positions[r])
        server.no_visited_locations += 1

while not server.fully_covered:  
    for n in range(num_robots):
        robot = swarm[n]
        sHistory = robot.state_history
        state, _ = planet.get_state_and_reward(sHistory, robot.robot_num, server.visited_locations) # get the current state 
        action, valid = robot.choose_action(state, planet.allowed_states[state], sHistory, planet, server.visited_locations) # choose an action (explore or exploit)
        # There is a possible move for the robot
        if valid:
            planet.update_planet(action, robot.robot_num) # update the planet according to the action
            next_state, reward = planet.get_state_and_reward(sHistory, robot.robot_num, server.visited_locations) # get the new state and reward    
            if next_state not in server.visited_locations:
                server.visited_locations.append(next_state)
                server.no_visited_locations += 1
                if server.no_visited_locations >= visitable_locations:
                    completed_count += 1
                    reward = 10
            robot.learn(state, reward, action, next_state, i, planet.planet_width)
            

    planet.steps += 1
    movementBudget -= 1
    if server.no_visited_locations >= visitable_locations: 
        server.fully_covered = True
    elif movementBudget <= 0:
        # End the episode if the movement budget runs out
        server.fully_covered = True       

    # Get locations of robots for plotting    
    for r in range(num_robots):
        locations_array[r][0].append(planet.robot_positions[r][0])
        locations_array[r][1].append(planet.robot_positions[r][1])

for r in range(num_robots):
        locations_array[r][0].append(planet.robot_positions[r][0])
        locations_array[r][1].append(planet.robot_positions[r][1])
    
testHistory.append(planet.steps) # get a history of number of steps taken to plot later

fig, ax = plt.subplots()

def animate(i):
    print(i)
    plt.clf()
    plt.xlim([-1, planet_size])
    plt.ylim([-1, planet_size])
    plt.scatter(tx, ty, marker = 'x', color = 'r') 
    
    for r in range(num_robots):
        plt.scatter(locations_array[r][1][i], locations_array[r][0][i], marker = 's', color = colors[r]) 
        locations_array[r][2].append(locations_array[r][0][i])
        locations_array[r][3].append(locations_array[r][1][i])
        plt.scatter(locations_array[r][3], locations_array[r][2], marker = '.', color = colors[r])        
    return fig

ani = animation.FuncAnimation(fig, animate, repeat=True,
                                    frames=len(locations_array[r][0]) - 1, interval=500) 

# To save the animation using Pillow as a gif
writer = animation.PillowWriter(fps=3,
                             metadata=dict(artist='Me'),
                             bitrate=1800)
ani.save('path.gif', writer=writer)
plt.show()
"""