In [5]:
from skimage.io import imread, imsave
from pprint import pprint
import numpy as np
import copy
import random
import time
import heapq

class Agent_Frontier:
    def __init__(self):
        self.explorable_nodes=[]
    
    def num_of_nodes(self):
        return len(self.explorable_nodes)
    
    def add_node(self, priority, node):
        heapq.heappush(self.explorable_nodes, (priority,node))
    
    def get_lowest_node(self):
        return heapq.heappop(self.explorable_nodes)[1]
    
    def add_and_get_lowest(self,priority,node):
        return heapq.heappushpop(self.explorable_nodes, (priority,node))
    
class A_star_details():
    #sets general details used across all three algorithms
    def __init__(self,*args,**kwargs):        
        #Sets the colour of the blocker in [R,G,B] format
        self.blocker_colour=kwargs.get('blocker_colour',[100,100,100])
        
        #Sets the visited nodes colour in [R,G,B] format
        self.visited_colour=kwargs.get('visited_colour',[255,0,0])
        
        #Sets the path colour in [R,G,B] format
        self.path_colour=kwargs.get('path_colour',[0,0,0])
        
class A_star_methods():
    #This contains methods general to various tests
    def __init__(self):
        pass
        
    def path_reconstruction(self,*args, **kwargs):
        #reconstructing the agents path to the goal
        add_to_name=kwargs.get('add_to_name','')
        current=self.goal
        self.path=[]
        while current!=self.agent_start:
            self.path.append(current)
            current=self.pathing_nodes[tuple(current)]
        self.path.append(self.agent_start)
        self.path.reverse()
        
        #print the path length in nodes as well as the amount of nodes visited
        print('Path Node Length',len(self.path))
        print('Total nodes visited: ',len(self.pathing_nodes))
        
    def determine_neighbours(self,agent_loc):
        #determines valid neighbours of the current node the agent is on
        #agent_loc stores pixel location as [x,y]
        x=agent_loc[0]
        y=agent_loc[1]
        neighbours=[]
        
        #make list of adjacent and diagonal neighbours
        for i in range(3):
            for j in range(3):
                if 0<x-1+i<len(self.blockers) and 0<y-1+j<len(self.blockers[0]):
                    neighbours.append([x-1+i,y-1+j])
                    
        #remove current node from neighbour list
        neighbours.remove([x,y])
        
        #prune neighours if they are blocked
        temp=[]
        for i in neighbours:
            if self.blockers[i[0]][i[1]]!=0:
                temp.append(i)
        for i in temp:
            neighbours.remove(i)
        return neighbours
        
    def euclid_dist(self,agent_loc,target):
        #calculates the euclidean distance between the ants location and the target location
        dist=((agent_loc[0]-target[0])**2+(agent_loc[1]-target[1])**2)**0.5
        return dist

class map_management(A_star_details):
    def __init__(self,map_name,start_pos,goal,blocker_percentage,*args,**kwargs):
        self.image_file=map_name
        self.agent_start=start_pos
        self.goal=goal
        self.blocker_percent=blocker_percentage
        A_star_details.__init__(self)
        
        self.load_image()
        self.create_blockers()
        
    def load_image(self,*args, **kwargs):
        #Loads image to be tested with
        self.map=imread(self.image_file)
        
    def change_pixel_colour(self,pixel,colour):
        #Changes colour of pixel to given [R,G,B] array colour
        self.map[pixel[0],pixel[1]]=colour
        
    def colour_compare(self,input_p,colour_to_check):
        #compares pixel colours
        dR=abs(input_p[0]-colour_to_check[0])
        dG=abs(input_p[1]-colour_to_check[1])
        dB=abs(input_p[2]-colour_to_check[2])
        dp=dR+dG+dB
        return dp==0
        
    def create_blockers(self):
        #creates random blockers to occupy given percent of map
        #initializes blocker array, with 0 being unblocked, 1 being blocked
        self.blockers=np.zeros((256,256))
        
        #make counter, and find termination number of blockers
        num_blockers=0
        goal_num_blockers=0.01*self.blocker_percent*256*256
        
        #create blockers
        while num_blockers<goal_num_blockers:
            x=random.randint(0,255)
            y=random.randint(0,255)
            
            #ensure goal and start node are not blocked
            if self.colour_compare(self.map[x][y],self.blocker_colour)==False and [x,y]!=self.goal or self.agent_start:
                self.change_pixel_colour([x,y],self.blocker_colour)
                self.blockers[x][y]=1
                num_blockers+=1
                
        #save base map, with only blockers on the map
        self.save_map_state_base('Base Map ')
        
    def save_map_state_base(self,name):
        #Saves map with given name, details can be added as needed
        imsave('Results/'+name+self.image_file,self.map)
        
class GBFS(A_star_details,A_star_methods):
    def __init__(self,map_name,blockers,start_pos,goal,*args,**kwargs):
        self.image_file=map_name
        self.blockers=blockers
        self.agent_start=start_pos
        self.goal=goal
        
    def greedy_best_first_base(self):
        #initialize frontier
        agent_frontier=Agent_Frontier()
        agent_frontier.add_node(0,self.agent_start)
        
        #initialize list of previous nodes
        previous_nodes={}
        previous_nodes[tuple(self.agent_start)]=None
        while agent_frontier.num_of_nodes()!=0:
            current_node=agent_frontier.get_lowest_node()
            
            #break if goal found
            if current_node==self.goal:
                print('GBFS Goal Found')
                break
            
            #determine neighbours
            unvisited_neighbour_nodes=self.determine_neighbours(current_node)
            
            #analyze neighbouring nodes
            for i in unvisited_neighbour_nodes:
                if tuple(i) not in previous_nodes:
                    priority=self.euclid_dist(self.goal,i)
                    agent_frontier.add_node(priority,i)
                    previous_nodes[tuple(i)]=current_node
                    
        self.pathing_nodes=previous_nodes
        
    def run_GBFS_test(self):
        self.greedy_best_first_base()
        #check if goal node is in pathing_nodes (path was found)
        if tuple(self.goal) in self.pathing_nodes:
            self.path_reconstruction(add_to_name='Completed Path ')
            
        #if no path was found
        else:
            self.pathing_nodes=False
            print('Did not reach goal')
            
class dijkstra(A_star_details,A_star_methods):
    def __init__(self,map_name,blockers,start_pos,goal,*args,**kwargs):
        self.image_file=map_name
        self.blockers=blockers
        self.agent_start=start_pos
        self.goal=goal
        
    def dijkstra_base(self):
        #initialize frontier
        agent_frontier=Agent_Frontier()
        agent_frontier.add_node(0,self.agent_start)

        #initialize previous nodes, costs
        previous_nodes={}
        previous_nodes[tuple(self.agent_start)]=None
        total_cost={}
        total_cost[tuple(self.agent_start)]=0
        while agent_frontier.num_of_nodes()!=0:
            current_node=agent_frontier.get_lowest_node()
            
            if current_node==self.goal:
                print('Dijkstra Found Goal')
                break
            
            #determine neighbours
            unvisited_neighbour_nodes=self.determine_neighbours(current_node)
            
            #analyze neighbouring nodes
            for i in unvisited_neighbour_nodes:
                new_cost=total_cost[tuple(current_node)]+self.euclid_dist(current_node,i)
                
                #determine if the node has been visited, or if path is shorter than others
                if tuple(i) not in previous_nodes or new_cost<total_cost[tuple(i)]:
                    total_cost[tuple(i)]=new_cost
                    priority=new_cost
                    agent_frontier.add_node(priority,i)
                    previous_nodes[tuple(i)]=current_node
        
        self.pathing_nodes=previous_nodes
        
    def run_DA_test(self):
        self.dijkstra_base()
        #check if goal node is in pathing_nodes (path was found)
        if tuple(self.goal) in self.pathing_nodes:
            self.path_reconstruction(add_to_name='Completed Path ')

        #if no path was found
        else:
            self.pathing_nodes=False
            print('Did not reach goal')
    
class A_star(A_star_details,A_star_methods):
    def __init__(self,map_name,blockers,start_pos,goal,*args,**kwargs):
        self.image_file=map_name
        self.blockers=blockers
        self.agent_start=start_pos
        self.goal=goal

    def Astar_base(self):
        #initialize frontier
        agent_frontier=Agent_Frontier()
        agent_frontier.add_node(0,self.agent_start)
        self.visited=copy.deepcopy(self.blockers)
        
        #initialize previous nodes, costs
        previous_nodes={}
        previous_nodes[tuple(self.agent_start)]=None
        total_cost={}
        total_cost[tuple(self.agent_start)]=0
        while agent_frontier.num_of_nodes()!=0:
            current_node=agent_frontier.get_lowest_node()
            
            if current_node==self.goal:
                print('Goal Reached!')
                break
            
            #find unexpanded neighbours
            unvisited_neighbour_nodes=self.determine_neighbours(current_node)
            
            for i in unvisited_neighbour_nodes:
                new_cost=total_cost[tuple(current_node)]+self.euclid_dist(current_node,i)
                if tuple(i) not in total_cost or new_cost<total_cost[tuple(i)]:
                    total_cost[tuple(i)]=new_cost
                    priority=new_cost+self.euclid_dist(self.goal,i)
                    agent_frontier.add_node(priority,i)
                    previous_nodes[tuple(i)]=current_node

        self.total_cost=total_cost
        self.pathing_nodes=previous_nodes

    def run_test(self):
        self.Astar_base()
        #check if goal node is in pathing_nodes (path was found)
        if tuple(self.goal) in self.pathing_nodes:
            self.path_reconstruction(add_to_name='Completed Path ')
        #if no path was found
        else:
            self.pathing_nodes=False
            print('Did not reach goal')
        
class path_data_out(A_star_details):
    #this finds the path of the agent to the goal, and changes the colour of the visited and final path nodes
    def __init__(self,map_name,map_used,examined_nodes,agent_path,*args,**kwargs):
        self.image_file=map_name
        
        #uses deepcopy to ensure that maps are not overwriting each other
        self.map=copy.deepcopy(map_used)
        self.visited_nodes=examined_nodes
        self.agent_path=agent_path
        A_star_details.__init__(self)
        
        #General detail tag added to each image file name to describe each test
        self.details=kwargs.get('details','')
        
    def save_map_state(self,name):
        #Saves map with given name, details can be added as needed
        imsave('Results/'+self.details+name+self.image_file,self.map)
        
    def change_pixel_colour(self,pixel,colour):
        #Changes colour of pixel to given [R,G,B] array colour
        self.map[pixel[0],pixel[1]]=colour
        
    def colour_compare(self,input_p,colour_to_check):
        #compares pixel colours
        dR=abs(input_p[0]-colour_to_check[0])
        dG=abs(input_p[1]-colour_to_check[1])
        dB=abs(input_p[2]-colour_to_check[2])
        dp=dR+dG+dB
        return dp==0
    
    def show_pathing_nodes(self):
        for i in self.visited_nodes:
            self.change_pixel_colour(i,self.visited_colour)
        self.save_map_state('Visited Nodes ')
        for i in self.agent_path:
            self.change_pixel_colour(i,self.path_colour)
        self.save_map_state('Completed Path ')


#set start, goal nodes
start=[1,1]
goal=[254,254]

#sets percentage of the map occupied by blockers
blocker_percentage=40

#creates map
test_map=map_management('Blank_map.png',start,goal,blocker_percentage)

#Run A* Test
print('A*')
test=A_star('Blank_map.png',test_map.blockers,start,goal)
test.run_test()

#Run GBFS Test
print('GBFS')
testGBFS=GBFS('Blank_map.png',test_map.blockers,start,goal)
testGBFS.run_GBFS_test()

#Run DA Test
print('Dijkstra')
testDA=dijkstra('Blank_map.png',test_map.blockers,start,goal)
testDA.run_DA_test()

#check if the goal was reachable or not, and then output images
if test.pathing_nodes!=False:
    data_out=path_data_out(test_map.image_file,test_map.map,test.pathing_nodes,test.path,details='A_Star ')
    data_out.show_pathing_nodes()

if testGBFS.pathing_nodes!=False:
    data_out=path_data_out(test_map.image_file,test_map.map,testGBFS.pathing_nodes,testGBFS.path,details='GBFS ')
    data_out.show_pathing_nodes()
    
if testGBFS.pathing_nodes!=False:
    data_out=path_data_out(test_map.image_file,test_map.map,testDA.pathing_nodes,testDA.path,details='Dijkstra ')
    data_out.show_pathing_nodes()

A*
Goal Reached!
Path Node Length 292
Total nodes visited:  15124
GBFS
GBFS Goal Found
Path Node Length 313
Total nodes visited:  925
Dijkstra
Dijkstra Found Goal
Path Node Length 292
Total nodes visited:  43636
