In [1]:
import numpy as np
import heapq
from numpy.random import random_integers as rnd
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

In [2]:
# randomly generate connected maze
def maze(width=81, height=51, complexity=.75, density =.75):
    # Only odd shapes
    shape = ((height//2)*2+1, (width//2)*2+1)
    # Adjust complexity and density relative to maze size
    complexity = int(complexity*(5*(shape[0]+shape[1])))
    density    = int(density*(shape[0]//2*shape[1]//2))
    # Build actual maze
    Z = np.zeros(shape, dtype=float)
    # Fill borders
    Z[0,:] = Z[-1,:] = 1
    Z[:,0] = Z[:,-1] = 1
    # Make isles
    for i in range(density):
        x, y = rnd(0,shape[1]//2)*2, rnd(0,shape[0]//2)*2
        Z[y,x] = 1
        for j in range(complexity):
            neighbours = []
            if x > 1:           neighbours.append( (y,x-2) )
            if x < shape[1]-2:  neighbours.append( (y,x+2) )
            if y > 1:           neighbours.append( (y-2,x) )
            if y < shape[0]-2:  neighbours.append( (y+2,x) )
            if len(neighbours):
                y_,x_ = neighbours[rnd(0,len(neighbours)-1)]
                if Z[y_,x_] == 0:
                    Z[y_,x_] = 1
                    Z[y_+(y-y_)//2, x_+(x-x_)//2] = 1
                    x, y = x_, y_
    return Z

In [3]:
class Element:
    def __init__(self, key, value1, value2):
        self.key = key
        self.value1 = value1
        self.value2 = value2

    def __eq__(self, other):
        return np.sum(np.abs(self.key - other.key)) == 0
    
    def __ne__(self, other):
        return self.key != other.key

    def __lt__(self, other):
        return ((self.value1, self.value2) < (other.value1, other.value2))

    def __le__(self, other):
        return ((self.value1, self.value2) <= (other.value1, other.value2))

    def __gt__(self, other):
        return ((self.value1, self.value2) > (other.value1, other.value2))

    def __ge__(self, other):
        return ((self.value1, self.value2) >= (other.value1, other.value2))


In [4]:
class Dstar_lite:
    
    def __init__(self, map, x_goal, y_goal, x_start, y_start):
        # initialize
        self.start = np.array([x_start, y_start])
        self.goal = np.array([x_goal, y_goal])
        self.k_m = 0
        # all rhs is set to inf
        self.rhs = np.ones((len(map), len(map[0]))) * np.inf
        self.g = self.rhs.copy()
        # sensed map is initated to zero
        self.global_map = map
        self.sensed_map = np.zeros((len(map), len(map[0])))
        # rhs of goal is set to zero
        self.rhs[self.goal[0], self.goal[1]] = 0
        # enter the goal as the first item in the queue
        self.queue = []
        A = Element(self.goal, *self.CalculateKey(self.goal))
        heapq.heappush(self.queue, A) 
    
    def CalculateKey(self, s):
        key = [0, 0]
        # print(self.g[s[0],s[1]], self.rhs[s[0],s[1]], self.h_estimate(self.start, s), self.k_m)
        key[0] = min(self.g[s[0],s[1]], self.rhs[s[0],s[1]]) + self.h_estimate(self.start, s) + self.k_m
        key[1] = min(self.g[s[0],s[1]], self.rhs[s[0],s[1]])
        return key
    
    def UpdateVertex(self, u):
        print("current state: ", u)
        print("RHS: ", self.rhs)
        print("g: ", self.g)
        print("queue: ", self.queue)
        
        # if u (current state) is not the goal
        if np.sum(np.abs(u - self.goal)) != 0:
            print("vertex 1")
            s_list = self.succ(u)
            min_s = np.inf
            
            # for all successors of the current state
            for s in s_list:
                if self.cost(u, s) + self.g[s[0],s[1]] < min_s:
                    min_s = self.cost(u, s) + self.g[s[0],s[1]]
            # update the rhs based on the min(cost(from current state to successor) + g)
            self.rhs[u[0],u[1]] = min_s
        
        # if the current state is already in the queue
        if Element(u, 0, 0) in self.queue:
            print("vertex 2")
            # remove the state from the queue
            self.queue.remove(Element(u, 0, 0))
            # restrucutre the queue
            heapq.heapify(self.queue)
        
        # if the g of current does not equal rhs of current
        if self.g[u[0],u[1]] != self.rhs[u[0],u[1]]:
            print("vertex 3")
            # add the current to the queue
            heapq.heappush(self.queue, Element(u, *self.CalculateKey(u)))
            
    def ComputeShortestPath(self):

        # while the queue is not empty
        # and the lowest priority key is lower priority than the currenty key
        # and if rhs != g
        while len(self.queue) > 0 and heapq.nsmallest(1, self.queue)[0] < Element(self.start, *self.CalculateKey(self.start)) or self.rhs[self.start[0], self.start[1]] != self.g[self.start[0], self.start[1]]:
            
            # returns the lowest priority node
            k_old = heapq.nsmallest(1, self.queue)[0]
            print("k_old: ", k_old.key)
            print("k_old_value1: ", k_old.value1)
            print("k_old_value2: ", k_old.value2)

            # returns the lowest priority node and remove it from queue
            u = heapq.heappop(self.queue).key
            print("u: ", u)
            
            # create a temporary element based on u (last node)
            temp = Element(u, *self.CalculateKey(u)) 
            print("temp_key: ", temp.key)
            print("temp_value1: ", temp.value1)
            print("temp_value2: ", temp.value2)
            
            # if the lowest priority is less than the temporary, add it to the queue
            if k_old < temp:
                print("1st if")
                heapq.heappush(self.queue, temp)
                
            # if g > rhs: g is set to rhs
            # update all successor of u
            elif self.g[u[0],u[1]] > self.rhs[u[0],u[1]]:
                print("2nd if")
                self.g[u[0],u[1]] = self.rhs[u[0],u[1]]
                s_list = self.succ(u)
                for s in s_list:
                    self.UpdateVertex(s)
            
            # if g < rhs: g is set to infinite
            # update all successors of u
            else:
                print("3rd if")
                self.g[u[0],u[1]] = np.inf
                s_list = self.succ(u)
                s_list.append(u)
                for s in s_list:
                    self.UpdateVertex(s)
    
    # heuristic estimation
    def h_estimate(self, s1, s2):
        #return 0.0
        return np.linalg.norm(s1 - s2)
    
    # fetch successors and predessors
    def succ(self, u):
        s_list = [np.array([u[0]-1,u[1]-1]), np.array([u[0]-1,u[1]]), np.array([u[0]-1,u[1]+1]), np.array([u[0],u[1]-1]),                  np.array([u[0],u[1]+1]), np.array([u[0]+1,u[1]-1]), np.array([u[0]+1,u[1]]), np.array([u[0]+1,u[1]+1])];
        row = len(self.global_map)
        col = len(self.global_map[0])
        real_list = []
        for s in s_list:
            if s[0] >= 0 and s[0] < row and s[1] >= 0 and s[1] < col:
                real_list.append(s)
        return real_list
    
    # calculate cost between nodes
    # cost is either infinite or the h_estimate = norm of difference
    def cost(self, u1, u2):
        if self.sensed_map[u1[0],u1[1]] == np.inf or self.sensed_map[u2[0],u2[1]] == np.inf:
            # print('inf!')
            return np.inf
        else:
            #return 2 * np.linalg.norm(u1 - u2)
            return self.h_estimate(u1, u2)
    
    # sense the surroundings and return their real-time value
    def sense(self, range_s):
        real_list = []
        row = len(self.global_map)
        col = len(self.global_map[0])
        for i in range(-range_s, range_s+1):
            for j in range(-range_s, range_s+1):
                if self.start[0] + i >= 0 and self.start[0] + i < row and self.start[1] + j >= 0 and self.start[1] + j < col:
                    if not (i == 0 and j == 0):
                        real_list.append(np.array([self.start[0]+i,self.start[1]+j]))
        return real_list

In [5]:
def Main(map, x_goal, y_goal, x_start, y_start):
    ds = Dstar_lite(map, x_goal, y_goal, x_start, y_start)
    last = ds.start
    last, curr_update = Scan(ds, last)
    path = [ds.start]
    sensed_map = [ds.sensed_map.copy()]
    updated_points = [curr_update.copy()]
    ds.ComputeShortestPath()
    count = 0
    while np.sum(np.abs(ds.start - ds.goal)) != 0 and count < 100000:
        count += 1
        print("curr_location:", ds.start)
        s_list = ds.succ(ds.start)
        min_s = np.inf
        for s in s_list:
            if ds.cost(ds.start, s) + ds.g[s[0],s[1]] < min_s:
                # print(ds.cost(ds.start, s), ds.g[s[0],s[1]])
                min_s = ds.cost(ds.start, s) + ds.g[s[0],s[1]]
                temp = s
        ds.start = temp.copy()
        path.append(ds.start)
        sensed_map.append(ds.sensed_map.copy())
        last, curr_update = Scan(ds, last)
        updated_points.append(curr_update)
    return path, sensed_map, updated_points

In [6]:
# update map information and replan
def Scan(ds, last):
    
    # detects inf within a range
    # ds.sense(3) returns a list of arrays that are 3 nodes away in all direction
    # ie. ds.sense(1) = the 8 nodes surrounding the current node
    s_list = ds.sense(1) # original 3
    flag = True
    updated_points = []
    
    # if anything in the sensed range is different than the global map, set flag = false
    for s in s_list:
        print("sensed map: ", ds.sensed_map)
        print("global map: ", ds.global_map)
        # the moment there is a difference, break out
        if ds.sensed_map[s[0],s[1]] != ds.global_map[s[0],s[1]]:
            flag = False
            print('See a wall!')
            break
    
    # if flag = false (something was different between sensed and global/actual map)
    if flag == False:
        
        # set km based on distance estimate
        ds.k_m += ds.h_estimate(last, ds.start)
        last = ds.start.copy()
        
        # for all states within range, set sensed map = global map
        for s in s_list:
            if ds.sensed_map[s[0],s[1]] != ds.global_map[s[0],s[1]]:
                updated_points.append(s[0])
                updated_points.append(s[1])
                ds.sensed_map[s[0],s[1]] = ds.global_map[s[0],s[1]]
                ds.UpdateVertex(s)
        
        print()
        print("COMPUTE PATH------------------------------")
        # compute shortest path
        ds.ComputeShortestPath()
        
    return last, np.asarray(updated_points)

In [7]:
Z = maze(width=5, height=5)
Z[Z == 1] = np.inf
print(Z)

[[inf inf inf inf inf]
 [inf  0.  0.  0. inf]
 [inf  0. inf  0. inf]
 [inf  0.  0.  0. inf]
 [inf inf inf inf inf]]


In [8]:
map = Z.copy()
x_goal = 3
y_goal = 3
x_start = 1
y_start = 1

In [9]:
# # ds = Dstar_lite(map, x_goal, y_goal, x_start, y_start)
# path, sensed_map, updated_points = Main(map, x_goal, y_goal, x_start, y_start)
# print(path)

In [10]:
ds = Dstar_lite(map, x_goal, y_goal, x_start, y_start)

In [11]:
last = ds.start
last

array([1, 1])

In [12]:
# get the last node as well as the current view
last, curr_update = Scan(ds, last)

sensed map:  [[0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
global map:  [[inf inf inf inf inf]
 [inf  0.  0.  0. inf]
 [inf  0. inf  0. inf]
 [inf  0.  0.  0. inf]
 [inf inf inf inf inf]]
See a wall!
current state:  [0 0]
RHS:  [[inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf  0. inf]
 [inf inf inf inf inf]]
g:  [[inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]]
queue:  [<__main__.Element object at 0x7f97f06c7550>]
vertex 1
current state:  [0 1]
RHS:  [[inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf  0. inf]
 [inf inf inf inf inf]]
g:  [[inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]]
queue:  [<__main__.Element object at 0x7f97f06c7550>]
vertex 1
current state:  [0 2]
RHS:  [[inf inf inf inf inf]
 [inf inf inf inf inf]
 [inf inf inf inf inf]
 [in

In [13]:
last

array([1, 1])

In [14]:
# current view is a list of coordinates of where the walls are
curr_update

array([0, 0, 0, 1, 0, 2, 1, 0, 2, 0, 2, 2])

In [15]:
# start the path list with the starting position
path = [ds.start]
path

[array([1, 1])]

In [16]:
sensed_map = [ds.sensed_map.copy()]
updated_points = [curr_update.copy()]

In [17]:
# updates the g value: ds.g = 2d array
ds.ComputeShortestPath()

In [18]:
ds.g

array([[       inf,        inf,        inf,        inf,        inf],
       [       inf, 3.41421356, 2.41421356,        inf,        inf],
       [       inf, 2.41421356,        inf, 1.        ,        inf],
       [       inf,        inf, 1.        , 0.        ,        inf],
       [       inf,        inf,        inf,        inf,        inf]])

In [19]:
count = 0

In [20]:
# while we have not reached the end and less than a maximum number of counts
while np.sum(np.abs(ds.start - ds.goal)) != 0 and count < 100000:
    count += 1
    print("curr_location:", ds.start)
    
    # get a list of successors from the current node
    s_list = ds.succ(ds.start)
    print("s_list: ", s_list)
    
    # set the minimum s to infinite
    min_s = np.inf
    
    # for every successor in the successor list
    for s in s_list:
        print("successor in question: ", s)
        
        # check if cost from start to s (neighbor) + ds.g < min_s
        # since all the walls are inf, this will return the first node that isnt a wall
        if ds.cost(ds.start, s) + ds.g[s[0],s[1]] < min_s:
            print("succcessor smaller than min: ", s)
            print("successor cost: ", ds.cost(ds.start, s))
            print("succesor g: ", ds.g[s[0],s[1]])
            
            # if a new minimum is found
            min_s = ds.cost(ds.start, s) + ds.g[s[0],s[1]]
            temp = s
    
    # start becomes the successor with the lowest cost + g
    ds.start = temp.copy()
    
    # appendn this to path
    path.append(ds.start)
    
    # append to sensed map
    sensed_map.append(ds.sensed_map.copy())
    
    # scan again
    last, curr_update = Scan(ds, last)
    updated_points.append(curr_update)

curr_location: [1 1]
s_list:  [array([0, 0]), array([0, 1]), array([0, 2]), array([1, 0]), array([1, 2]), array([2, 0]), array([2, 1]), array([2, 2])]
successor in question:  [0 0]
successor in question:  [0 1]
successor in question:  [0 2]
successor in question:  [1 0]
successor in question:  [1 2]
succcessor smaller than min:  [1 2]
successor cost:  1.0
succesor g:  2.414213562373095
successor in question:  [2 0]
successor in question:  [2 1]
successor in question:  [2 2]
sensed map:  [[inf inf inf  0.  0.]
 [inf  0.  0.  0.  0.]
 [inf  0. inf  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]]
global map:  [[inf inf inf inf inf]
 [inf  0.  0.  0. inf]
 [inf  0. inf  0. inf]
 [inf  0.  0.  0. inf]
 [inf inf inf inf inf]]
sensed map:  [[inf inf inf  0.  0.]
 [inf  0.  0.  0.  0.]
 [inf  0. inf  0.  0.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]]
global map:  [[inf inf inf inf inf]
 [inf  0.  0.  0. inf]
 [inf  0. inf  0. inf]
 [inf  0.  0.  0. inf]
 [inf inf inf inf inf]]
sense

In [21]:
path

[array([1, 1]), array([1, 2]), array([2, 3]), array([3, 3])]

In [22]:
def arrayisin(array, list_of_arrays):
    for a in list_of_arrays:
        if np.array_equal(array, a):
            return True
    return False

# show path
for row in range(Z.shape[0]):
    for column in range(Z.shape[1]):
        if arrayisin(np.array([row, column]), path):
            Z[row, column] = 5
Z

array([[inf, inf, inf, inf, inf],
       [inf,  5.,  5.,  0., inf],
       [inf,  0., inf,  5., inf],
       [inf,  0.,  0.,  5., inf],
       [inf, inf, inf, inf, inf]])