In [None]:
import numpy as np
import random
from matplotlib import pyplot as plt
from matplotlib.widgets import Slider, Button
from operator import itemgetter, attrgetter
from datetime import date
import json

In [None]:
# Constants

NOT_VISITED = -1
UNBLOCKED = 0
BLOCKED = 1
START = 2
END = 3
KNOWN_UNBLOCKED = 4
KNOWN_BLOCKED = 5
KNOWN_DEADEND = 6
KNOWN_VISITED = 7
KNOWN_OPENLIST = 8

COLORMAP = {
    NOT_VISITED : np.array([127,127,127]),#grey
    UNBLOCKED : np.array([255,255,255]),#white
    BLOCKED : np.array([0,0,0]),#black
    START : np.array([255,0,0]),#red
    END : np.array([0,127,255]),#blue
    KNOWN_UNBLOCKED : np.array([248,131,121]),#pink
    KNOWN_BLOCKED : np.array([0,0,0]),#green
    KNOWN_DEADEND : np.array([255,255,0]),#purple
    KNOWN_VISITED : np.array([255,255,0]),#yellow
    KNOWN_OPENLIST : np.array([0,255,255])#cyan
}

TEST_WORLDS = 5
TEST_WORLD_SIZE = 15

OUTPUT_BASE_PATH = "Outputs/"
TODAY = date.today()

In [None]:
class MinHeap:
    def __init__(self):
        self.elements = []
        self.size = 0

    def heapifyUpwards(self, i):
        while i != 0:
            p = (i - 1) // 2
            if self.elements[i][0] < self.elements[p][0]:
                self.elements[i], self.elements[p] = self.elements[p], self.elements[i]
            i = p

    def heapifyDownwards(self, i):
        while i < self.size:
            if 2 * (i + 1) >= self.size:
                m = (2 * i) + 1
            else:
                m = (2 * i) + 1 if self.elements[(2 * i) + 1][0] < self.elements[2 * (i + 1)][0] else 2 * (i + 1)
            if m>=self.size:
                break
            if self.elements[m][0] < self.elements[i][0]:
                self.elements[i], self.elements[m] = self.elements[m], self.elements[i]
            i = m

    def length(self):
        return self.size

    def insert(self, k):
        self.elements.append(k)
        self.size+=1
        self.heapifyUpwards(self.size-1)

    def peek(self):
        print(self.elements)
        return self.elements[0]

    def pop(self):
        k = self.elements[0]
        self.elements[0] = self.elements[-1]
        del self.elements[-1]
        self.size-=1
        self.heapifyDownwards(0)
        return k

In [None]:
def calculateAllNeighbours(location, t_size):
    # We first calculate all 9 cells in the neighbourhood,
    # and then filter out itself and the ones in the cross
    # by filtering elements with manhattan distance not equal to one.
    return list(filter(lambda x: abs(location[0]-x[0])+abs(location[1]-x[1])==1, [(x,y) for x in range(location[0]-1 if location[0]-1 > -1 else 0, location[0]+2 if location[0]+1<t_size else t_size) for y in range(location[1]-1 if location[1]-1 > -1 else 0, location[1]+2 if location[1]+1<t_size else t_size)]))

In [None]:
def getRandomCoordinates(t_size):
    # Returns a tuple of coordinnates
    return (random.randint(0, t_size - 1), random.randint(0, t_size-1))

def getRandomCoordinatesInQuarter(t_size, quarter):
    # Returns a tuple of coordinnates
    t = t_size - 1
    x = (0,int(t/4)) if quarter[0]==0 else (int(3*t/4), t)
    y = (0,int(t/4)) if quarter[1]==0 else (int(3*t/4), t)
    return (random.randint(x[0], x[1]), random.randint(y[0], y[1]))

In [None]:
def generateEnvironment(t_size):
    # Initially all cells are set as not visited
    # 1 marks blocked, 0 marks unblocked, -1 for not visited
    grid = np.full(shape = (t_size, t_size), fill_value = NOT_VISITED, dtype = np.int8)

    # A random location on the grid to begin with, and mark it visited
    begin = getRandomCoordinates(t_size)
    grid[begin[0], begin[1]] = BLOCKED

    # Using a stack as a frontier, so we can use DFS to generate the blocked paths
    frontier = []
    frontier.append(begin)
    while(len(frontier) > 0):
        current = frontier.pop()
        # Calculating the current's neighbours
        neighbours = calculateAllNeighbours(current, t_size)
        for nx,ny in neighbours:
            if grid[nx,ny] == NOT_VISITED:
                # If the location is not visited, add it to the frontier
                frontier.append((nx,ny))
                # Mark it blocked or unblocked randomly
                grid[nx,ny] = BLOCKED if random.random() > 0.7 else UNBLOCKED
    start = getRandomCoordinatesInQuarter(TEST_WORLD_SIZE, (0,0))
    reach = getRandomCoordinatesInQuarter(TEST_WORLD_SIZE, (1,1))
    grid[start] = START
    grid[reach] = END
    return grid, start, reach

In [None]:
def showGrid(grid, name):
    # Fancypants util that makes use of predefined colours
    # for each data value in the grid and makes it easier to visualise
    plt.xticks([])
    plt.yticks([])
    plt.imshow(X = [[COLORMAP[e] for e in row] for row in grid])
    plt.savefig(OUTPUT_BASE_PATH + TODAY.strftime("%d-%m-%Y") + "-" + name + ".png", bbox_inches="tight")

In [None]:
def initAgentKnowlegde(grid, start, reach):
    agentKnowledge = np.full(shape = grid.shape, fill_value = NOT_VISITED, dtype = np.int8)
    agentKnowledge[start] = START
    agentKnowledge[reach] = END
    return agentKnowledge

In [None]:
def manhattanDistance(pos1, pos2):
    # pos1 and pos2 are coordinates
    return abs(pos2[1] - pos1[1]) + abs(pos2[0] - pos1[0])

In [None]:
def maxEvaluatedNeighbour(LeafNodes, evaluationFunction, environment):
    neighbourOrder = []
    for i,j in LeafNodes:
        neighbourOrder.append((i,j,evaluationFunction[i,j]))
    return sorted(neighbourOrder, key=itemgetter(2), reverse=True)

In [None]:
def AStar(environment, agentKnowledge, start, reach, optimize):
    gcostFunction = np.full(shape = environment.shape, fill_value = 0, dtype = np.int64)
    hueristicFunction  = np.full(shape = environment.shape, fill_value = 0, dtype = np.int64)
    evaluationFunction = np.full(shape = environment.shape, fill_value = 0, dtype = np.int64)
    priorityOpenList = MinHeap()
    path = []
    pathlength = 0
    closeList = []
    gcostFunction[start[0],start[1]] = 0
    hueristicFunction[start[0],start[1]] = manhattanDistance(start,reach)
    evaluationFunction[start[0],start[1]] = gcostFunction[start[0],start[1]] + hueristicFunction[start[0],start[1]] 
    priorityOpenList.insert((evaluationFunction[start[0],start[1]],start))
    runSteps = 0
    stepsTaken = 0
    agentKnowledgeVisuals = []
    while priorityOpenList.length()>0:
        if runSteps==0:
            runSteps = input("Enter number of steps to run:")
            if not runSteps.isnumeric():
                return False, "Ran out of steps", agentKnowledgeVisuals, priorityOpenList, closeList, hueristicFunction, evaluationFunction, stepsTaken
            else:
                runSteps = int(runSteps)
            print("Running {} steps".format(runSteps))
        runSteps-=1
        stepsTaken+=1
        current = priorityOpenList.pop()
        path.append(current[1])
        agentKnowledge[current[1]]=START
        agentKnowledgeVisuals.append(np.copy(agentKnowledge))
        closeList.append(current[1])
        # Optimize parameter added as flag to allow nodes to not include openlist of parents in priorityOpenList
        if optimize:
            priorityOpenList = MinHeap()
        pathlength = pathlength + 1
        agentKnowledge[current[1]] = KNOWN_VISITED
        if current[1] == reach:
            return True, "Success,", agentKnowledgeVisuals, priorityOpenList, closeList, hueristicFunction, evaluationFunction, stepsTaken, path
        neighbours = calculateAllNeighbours(current[1], TEST_WORLD_SIZE)
        LeafNodes = list(filter(lambda x: (environment[x]==UNBLOCKED or environment[x]==END or environment[x]==START) and x not in closeList, neighbours))
        blocked = list(filter(lambda x: environment[x]==BLOCKED, neighbours))
        for nx,ny in blocked:
            agentKnowledge[nx,ny]=KNOWN_BLOCKED
        for nx,ny in LeafNodes:
            gcostFunction[nx,ny] = gcostFunction[current[1]] + 1
            hueristicFunction[nx,ny] = manhattanDistance((nx,ny),reach)
            evaluationFunction[nx,ny] = gcostFunction[nx,ny] + hueristicFunction[nx,ny]
            priorityOpenList.insert((evaluationFunction[nx,ny],(nx,ny)))
    return False, "Ran out of nodes to explore", agentKnowledgeVisuals, priorityOpenList, closeList, hueristicFunction, evaluationFunction, stepsTaken, path

In [None]:
def repeatedForwardAStar(environment, agentKnowledge, start, reach):
    # Calls the AStar algorithm from current to reach whenever a block is encountered
    
    # Initialize the agent's knowledge, the full repeated AStar path and set the current node to the start node
    agentKnowledge = initAgentKnowlegde(environment, start, reach)
    showGrid(agentKnowledge, "agent_starts_with")
    final_path = []
    current = start
    # While the node hasn't reached its target
    while (len(final_path)==0 or reach != final_path[len(final_path)-1]):
        # Get the full AStar path like a DFS tree
        AStar_output = AStar(environment, agentKnowledge, current, reach, True)
        full_path = AStar_output[8]
        
        # Traverse the path till it comes to a point where 3 neighbours are blocked
        for cell in full_path:
            if(cell not in final_path):
                final_path.append(cell)
                neighbours = calculateAllNeighbours(cell, TEST_WORLD_SIZE)
                blocked = list(filter(lambda x: environment[x]==BLOCKED, neighbours))
                repeated_path = []

                # When 3 neighbours are blocked it is a deadend
                if len(blocked)==3:
                    current = cell
                    # visited created to avoid loops when determining the next possible start point
                    visited = []
                    # while current in visited go to the unblocked neighbours of current in search of a new start
                    while current in visited:
                        visited.append(current)
                        for next_current in list(set(neighbours) - set(blocked)):
                            if next_current not in visited:
                                current = next_current
                        repeated_path.append(current)
                        neighbours = calculateAllNeighbours(current, TEST_WORLD_SIZE)
                        blocked = list(filter(lambda x: environment[x]==BLOCKED, neighbours))
                    break
                # Add in the repeated path of backtracking
                final_path += repeated_path
    return final_path
# repeatedForwardAStar(environment, agentKnowledge, start, reach)

In [None]:
def repeatedBackwardAStar(environment, agentKnowledge, start, reach):
    # Swap start and reach to start from the target for backward A star
    return repeatedForwardAStar(environment, agentKnowledge, reach, start)
# repeatedBackwardAStar(environment, agentKnowledge, start, reach)

In [None]:
environment, start, reach = generateEnvironment(TEST_WORLD_SIZE)
showGrid(environment, "base_environment")

In [None]:
# with open(OUTPUT_BASE_PATH+"environment", 'r') as f:
#     env, start_, reach_ = json.load(f)
# environment = np.array(env)
# start = (start_[0], start_[1])
# reach = (reach_[0], reach_[1])

In [None]:
# with open(OUTPUT_BASE_PATH+"environment_1", 'w') as f:
#     json.dump((environment.tolist(), start, reach) , f)

In [None]:
%%time

agentKnowledge = initAgentKnowlegde(environment, start, reach)
showGrid(agentKnowledge, "agent_starts_with")

ans = AStar(environment, agentKnowledge, start, reach, False)
if ans[0]:
    print("Steps taken : {}".format(ans[7]))
else:
    print("No path found")

agentKnowledge[start] = START
agentKnowledge[reach] = END
showGrid(agentKnowledge, "final_state")

In [None]:
%matplotlib widget

#https://stackoverflow.com/questions/55401246/pyplot-imshow-3d-array-with-a-slider

agentVisual = ans[2]

idx0 = 0
plt.xticks([])
plt.yticks([])
l = plt.imshow(X = [[COLORMAP[e] for e in row] for row in agentVisual[idx0]])

axidx = plt.axes([0.1, 0.25, 0.0225, 0.63])
slidx = Slider(axidx, 'Step Number', 0, len(agentVisual)-1, valinit=idx0, valfmt='%d', orientation='vertical')

def update(val):
    idx = slidx.val
    l.set_data([[COLORMAP[e] for e in row] for row in agentVisual[int(idx)]])
slidx.on_changed(update)

plt.show()