# Algorithm Test Bench
Replace the `Search Map` block with any RRT variant to test it against obstacles generated randomly

In [1]:
import numpy as np
import cv2
import heapq
import time
import math

## Define robot parameters

In [2]:
ROBOT_RADIUS = 50#/1000 # 220mm
clearance = 5
clearance += ROBOT_RADIUS

## Generate a random map 
Generate a random map of obstacles of different types

In [3]:
clearance_color = (0, 255, 255)
obstacle_color = (0, 0, 0)
robot_radius_color = (254, 105, 180)
free_space = (255, 255, 255)
start_color = (254, 0, 0)
goal_color = (0, 0, 254)

# Define the map size
width = 1000
height = 1000
scale = 1

width_resized = width//scale
height_resized = height//scale


# Define a function to generate random obstacles
def obstacles():

    # Create a black canvas 
    canvas = np.zeros((height, width, 3), dtype="uint8")
    # Create a white rectangle
    canvas = cv2.rectangle(canvas, (clearance, clearance), (width-clearance, height-clearance), free_space, -1)

    sq_size = 25
    num_obstacles = 100

    for i in range(num_obstacles):

        # Randomly generate the obstacle position
        x = np.random.randint(clearance, width-clearance)
        y = np.random.randint(clearance, height-clearance)

        if i%2 == 0:
            # Generate a square at the random position
            canvas = cv2.rectangle(canvas, (x-sq_size, y-sq_size), (x+sq_size, y+sq_size), obstacle_color, -1)
        else:
            # Generate a rectangle of sq_size + random length at the random position
            x_len = np.random.randint(15, 40)
            y_len = np.random.randint(15, 40)
            canvas = cv2.rectangle(canvas, (x-x_len, y-y_len), (x+x_len, y+y_len), obstacle_color, -1)
    
    # Sample a start from 0 to 100 for x and y, and make sure it is not in the obstacle
    start = (np.random.randint(clearance, clearance+100), np.random.randint(height-clearance, height))
    while canvas[start[1], start[0], 0] == 0:
        start = (np.random.randint(clearance, clearance+100), np.random.randint(height-clearance, height))

    # Sample a goal from 100 off the top right corner for x and y, and make sure it is not in the obstacle
    goal = (np.random.randint(width-100, width-clearance), np.random.randint(clearance, 100))
    while canvas[goal[1], goal[0], 0] == 0:
        goal = (np.random.randint(width-100, width-clearance), np.random.randint(clearance, 100))

    # Draw the start and goal on the canvas
    canvas = cv2.circle(canvas, start, 5, start_color, 5)
    canvas = cv2.circle(canvas, goal, 5, goal_color, 5)

    # Scale the canvas down to the desired size by scaling the image
    canvas = cv2.resize(canvas, (width_resized, height_resized), interpolation=cv2.INTER_NEAREST)
    return canvas, start, goal

Display the generated map

In [None]:
# Get a random obstacle map
map, start_node, goal_node = obstacles()
canvas = map.copy()

cv2.imshow("Obstacles", canvas)
cv2.waitKey(0)
cv2.destroyAllWindows()

## Implement a search algorithm

### RRT*

In [None]:
canvas = map.copy()
x_start, y_start = start_node
x_goal, y_goal = goal_node

step_size = 50
distance_threshold = 1
search_radius = 70

# Make a lambda function to adjust the value of x to the visited space
adjust = lambda x, threshold: int(int(round(x*2)/2)/threshold)

# Dictionary to store visited nodes
visited = {(adjust(x_start, distance_threshold),
            adjust(y_start, distance_threshold)): 1}

# Dictionary to store the parent of each node
parent = {(x_start, y_start): (x_start, y_start)}

# Dictionary to store the children of each node
children = {(x_start, y_start): []}

# Dictionary to store the cost of each node
cost = {(x_start, y_start): 0}

# Function to check if the node is valid
def valid_node(x, y):
    # Adjust the value for visited nodes
    x_vis = adjust(x, distance_threshold)
    y_vis = adjust(y, distance_threshold)
    # Adjust the values for canvas
    x_cvs = int(round(x*2)/2)
    y_cvs = int(round(y*2)/2)

    # Check if the node is not in the obstacle and not visited
    if canvas[y_cvs, x_cvs, 0] != 0 and (x_vis, y_vis) not in visited:
        return True
    return False

# Function to check if the child node and parent node are not intersecting with the obstacle
def valid_edge(x_parent, y_parent, x_child, y_child):
    # Sample 3 points on the line joining the parent and child nodes, 
    # not including the parent and child nodes
    x_intermediate = np.linspace(x_parent, x_child, 5)[1:-1]
    y_intermediate = np.linspace(y_parent, y_child, 5)[1:-1]

    # Adjust the values for canvas
    x_intermediate = [int(round(x*2)/2) for x in x_intermediate]
    y_intermediate = [int(round(y*2)/2) for y in y_intermediate]

    # Check if any of the intermediate points are in the obstacle
    for x, y in zip(x_intermediate, y_intermediate):
        if canvas[y, x, 0] == 0:
            return False
    return True


reached = False
iterations = 0
c_best = float('inf')
start = time.time()

while iterations < 1000:
    
    # Get a new node
    x_node = np.random.randint(clearance, width-clearance)
    y_node = np.random.randint(clearance, height-clearance)    

    # If the node is valid
    if valid_node(x_node, y_node):

        # Find the nearsest node in the tree to get the projected node
        min_dist = float('inf')

        for x, y in parent:
            dist = np.sqrt((x-x_node)**2 + (y-y_node)**2)
            if dist < min_dist:
                min_dist = dist
                x_parent, y_parent = x, y

        # Get the angle of the new node
        theta = np.arctan2(y_node-y_parent, x_node-x_parent)

        # Calculate the new node
        x_new = int(x_parent + step_size*np.cos(theta))
        y_new = int(y_parent + step_size*np.sin(theta))
        
        if reached:
            iterations += 1

        # Check if the new node is valid
        if valid_node(x_new, y_new):

            # Collect all the nodes in the search radius
            neighbours = []
            for x, y in parent:
                if np.sqrt((x-x_new)**2 + (y-y_new)**2) < search_radius:
                    neighbours.append((x, y))

            # Find the node with the minimum cost to get new node
            min_cost = cost[(x_parent, y_parent)] + step_size
            for x, y in neighbours:
                new_cost = cost[(x, y)] + np.sqrt((x-x_new)**2 + (y-y_new)**2)
                if new_cost < min_cost:
                    min_cost = new_cost
                    x_parent, y_parent = x, y

            # Check if the edge between the parent and child nodes is valid
            if not valid_edge(x_parent, y_parent, x_new, y_new):
                continue
            
            # Add the cost of the new node
            cost[(x_new, y_new)] = min_cost

            # Check if rewiring to the newly added node will reduce the cost of the neighbours
            for x, y in neighbours:
                new_cost = cost[(x_new, y_new)] + np.sqrt((x-x_new)**2 + (y-y_new)**2)
                if new_cost < cost[(x, y)]:

                    # Check if the edge between the parent and child nodes is valid
                    if not valid_edge(x, y, x_new, y_new):
                        continue

                    previous_cost = cost[(x, y)]
                    cost[(x, y)] = new_cost
                    
                    # Remove this node from the children of its current parent
                    x_parent_neighbour, y_parent_neighbour = parent[(x, y)]
                    children[(x_parent_neighbour, y_parent_neighbour)].remove((x, y))

                    # Add the new node to the parent dictionary
                    parent[(x, y)] = (x_new, y_new)

                    # Add the new node to the children dictionary
                    children.setdefault((x_new, y_new), []).append((x, y))
                    
                    # If the cost of this node is reduced, we must update the cost of its children,
                    # and their children, and so on
                    cost_reduction = previous_cost - new_cost
                    queue = []
                    heapq.heappush(queue, ((x, y)))
                    while queue:
                        x_, y_ = heapq.heappop(queue)
                        # Check if the node has children
                        if (x_, y_) in children:
                            for child in children[(x_, y_)]:
                                    cost[child] -= cost_reduction
                                    heapq.heappush(queue, child)
                                    
            x_achieved, y_achieved = x_new, y_new

            x_adjusted = adjust(x_new, distance_threshold)
            y_adjusted = adjust(y_new, distance_threshold)
            
            # Add the new node to the visited nodes
            visited[(x_adjusted, y_adjusted)] = 1

            # Add the new node to the parent dictionary
            parent[(x_new, y_new)] = (x_parent, y_parent)

            # Add the new node to the children dictionary
            children.setdefault((x_parent, y_parent), []).append((x_new, y_new))


                # Check if the new node is close to the goal and the cost is less than the best cost
            if np.sqrt((x_new-x_goal)**2 + (y_new-y_goal)**2) < 50 and cost[(x_new, y_new)] < c_best:
                # if np.sqrt((x_new-x_goal)**2 + (y_new-y_goal)**2) < 50:
                    end = time.time()
                    print("Goal reached: ", end-start, "seconds")

                    # Update cost of c_best
                    c_best = cost[(x_new, y_new)]

                    # Add the final node as a child of the current node to complete the path
                    children.setdefault((x_new, y_new), []).append((x_goal, y_goal))

                    # Add the final node to the parent dictionary
                    parent[(x_goal, y_goal)] = (x_new, y_new)

                    # Save the final node
                    x_final, y_final = x_goal, y_goal

                    # Add the cost of the final node
                    cost[(x_goal, y_goal)] = cost[(x_new, y_new)] + np.sqrt((x_goal-x_new)**2 + (y_goal-y_new)**2)

                    print("Current Cost: ", cost[(x_goal, y_goal)])

                    # Set the reached flag to True
                    reached = True
        
print("Final cost: ", cost[(x_final, y_final)]) 

### RRT*-SMART

In [None]:
canvas = map.copy()
x_start, y_start = start_node
x_goal, y_goal = goal_node

step_size = 50
distance_threshold = 1
search_radius = 70

# Make a lambda function to adjust the value of x to the visited space
adjust = lambda x, threshold: int(int(round(x*2)/2)/threshold)

# Dictionary to store visited nodes
visited = {(adjust(x_start, distance_threshold),
            adjust(y_start, distance_threshold)): 1}

# Dictionary to store the parent of each node
parent = {(x_start, y_start): (x_start, y_start)}

# Dictionary to store the children of each node
children = {(x_start, y_start): []}

# Dictionary to store the cost of each node
cost = {(x_start, y_start): 0}

# Function to check if the node is valid
def valid_node(x, y):
    # Adjust the value for visited nodes
    x_vis = adjust(x, distance_threshold)
    y_vis = adjust(y, distance_threshold)

    # Adjust the values for canvas
    x_cvs = int(round(x*2)/2)
    y_cvs = int(round(y*2)/2)

    # Check if the node is not in the obstacle and not visited
    if canvas[y_cvs, x_cvs, 0] !=0 and (x_vis, y_vis) not in visited:
            return True
    return False

# Function to check if the child node and parent node are not intersecting with the obstacle
def valid_edge(x_parent, y_parent, x_child, y_child):
    # Sample n_sample points on the line joining the parent and child nodes, 
    # not including the parent and child nodes
    n_sample = int(np.sqrt((x_child-x_parent)**2 + (y_child-y_parent)**2)/distance_threshold)
    x_intermediate = np.linspace(x_parent, x_child, n_sample)[1:-1]
    y_intermediate = np.linspace(y_parent, y_child, n_sample)[1:-1]

    # Adjust the values for canvas
    x_intermediate = [int(round(x*2)/2) for x in x_intermediate]
    y_intermediate = [int(round(y*2)/2) for y in y_intermediate]

    # Check if any of the intermediate points are in the obstacle
    for x, y in zip(x_intermediate, y_intermediate):
        if canvas[y, x, 0] == 0:
            return False
    return True

def compute_path(x_start, y_start, x_goal, y_goal):
    # Get the path from the parent dictionary
    path = []
    x, y = x_final, y_final
    while (x, y) != (x_start, y_start):
        # print(x, y)
        path.append((x, y))
        x, y = parent[(x, y)]
    path.append((x, y))
    path.reverse()
    return path

def sample():
    # If the goal is reached, sample heuristic is satisfied and beacons are not empty
    if reached and iterations%2==0 and beacons:
        # Randomly pick a point from the beacons
        x_beacon, y_beacon = beacons[np.random.randint(0, len(beacons))]

        # Sample a point around the beacon of radius r_beacon
        x_node = np.random.randint(x_beacon-r_beacon, x_beacon+r_beacon)
        y_node = np.random.randint(y_beacon-r_beacon, y_beacon+r_beacon)
        
    else:
        x_node = np.random.randint(clearance, width-clearance)
        y_node = np.random.randint(clearance, height-clearance)  
    return x_node, y_node  


reached = False
iterations = 0
c_best = float('inf') 
n = -1 # Iteration at which the goal is reached
N = 1000/1 # Number of iterations
b = 4 # Constant that determines number of times to sample around beacons
r_beacon = 30 # Radius around the beacon to sample
start = time.time()

while iterations < N:
    
    # Get a new node
    # x_node = np.random.randint(clearance, width-clearance)
    # y_node = np.random.randint(clearance, height-clearance) 

    x_node, y_node = sample()   

    # If the node is valid
    if valid_node(x_node, y_node):

        # Find the nearsest node in the tree to get the projected node
        min_dist = float('inf')

        for x, y in parent:
            dist = np.sqrt((x-x_node)**2 + (y-y_node)**2)
            if dist < min_dist:
                min_dist = dist
                x_parent, y_parent = x, y

        # Get the angle of the new node
        theta = np.arctan2(y_node-y_parent, x_node-x_parent)

        # Calculate the new node
        x_new = int(x_parent + step_size*np.cos(theta))
        y_new = int(y_parent + step_size*np.sin(theta))
        
        if reached:
            iterations += 1

        # Check if the new node is valid
        if valid_node(x_new, y_new):

            # Collect all the nodes in the search radius
            neighbours = []
            for x, y in parent:
                if np.sqrt((x-x_new)**2 + (y-y_new)**2) < search_radius:
                    neighbours.append((x, y))

            # Find the node with the minimum cost to get new node
            min_cost = cost[(x_parent, y_parent)] + step_size
            for x, y in neighbours:
                new_cost = cost[(x, y)] + np.sqrt((x-x_new)**2 + (y-y_new)**2)
                if new_cost < min_cost:
                    min_cost = new_cost
                    x_parent, y_parent = x, y

            # Check if the edge between the parent and child nodes is valid
            if not valid_edge(x_parent, y_parent, x_new, y_new):
                continue
            
            # Add the cost of the new node
            cost[(x_new, y_new)] = min_cost

            # Check if rewiring to the newly added node will reduce the cost of the neighbours
            for x, y in neighbours:
                new_cost = cost[(x_new, y_new)] + np.sqrt((x-x_new)**2 + (y-y_new)**2)
                if new_cost < cost[(x, y)]:

                    # Check if the edge between the parent and child nodes is valid
                    if not valid_edge(x, y, x_new, y_new):
                        continue

                    previous_cost = cost[(x, y)]
                    cost[(x, y)] = new_cost
                    
                    # Remove this node from the children of its current parent
                    x_parent_neighbour, y_parent_neighbour = parent[(x, y)]
                    children[(x_parent_neighbour, y_parent_neighbour)].remove((x, y))

                    # Add the new node to the parent dictionary
                    parent[(x, y)] = (x_new, y_new)

                    # Add the new node to the children dictionary
                    children.setdefault((x_new, y_new), []).append((x, y))
                    
                    # If the cost of this node is reduced, we must update the cost of its children,
                    # and their children, and so on
                    cost_reduction = previous_cost - new_cost
                    queue = []
                    heapq.heappush(queue, (x, y))
                    while queue:
                        x_, y_ = heapq.heappop(queue)
                        # Check if the node has children
                        if (x_, y_) in children:
                            for child in children[(x_, y_)]:
                                    cost[child] -= cost_reduction
                                    heapq.heappush(queue, child)
                                    
            x_achieved, y_achieved = x_new, y_new

            x_adjusted = adjust(x_new, distance_threshold)
            y_adjusted = adjust(y_new, distance_threshold)
            
            # Add the new node to the visited nodes
            visited[(x_adjusted, y_adjusted)] = 1

            # Add the new node to the parent dictionary
            parent[(x_new, y_new)] = (x_parent, y_parent)

            # Add the new node to the children dictionary
            children.setdefault((x_parent, y_parent), []).append((x_new, y_new))

            # Path Optimization: Check if the goal is reached, then check if a node on the path can be
            # connected to its grandparent node
            if reached:
                path = compute_path(x_start, y_start, x_final, y_final)

                # Start from the goal node
                node = path[-1]
                parent_node = parent[node]
                grandparent_node = parent[parent_node]

                while node!= (x_start, y_start):
                    # Check if there is a valid edge between the node and its grandparent
                    if valid_edge(grandparent_node[0], grandparent_node[1], node[0], node[1]):
                        
                        # print("Rewiring the tree")
                        # Rewire the tree
                        # Remove the node from the children of its parent
                        children[parent_node].remove(node)

                        # Add the node to the parent dictionary of the grandparent
                        parent[node] = grandparent_node

                        # Add the node to the children dictionary of the grandparent
                        children.setdefault(grandparent_node, []).append(node)

                        # Update the cost of the node
                        previous_cost = cost[node]
                        cost[node] = cost[grandparent_node] + np.sqrt((node[0]-grandparent_node[0])**2 + (node[1]-grandparent_node[1])**2)

                        # Update the cost of the children of the node, and their children, and so on
                        cost_reduction = previous_cost - cost[node]
                        queue = []
                        heapq.heappush(queue, node)
                        while queue:
                            x_, y_ = heapq.heappop(queue)
                            # Check if the node has children
                            if (x_, y_) in children:
                                for child in children[(x_, y_)]:
                                        cost[child] -= cost_reduction
                                        heapq.heappush(queue, child)

                        # Update the node, parent_node, and grandparent_node    
                        node = grandparent_node
                        parent_node = parent[node]
                        grandparent_node = parent[parent_node]

                    else:
                        node = parent_node
                        parent_node = parent[node]
                        grandparent_node = parent[parent_node]

                # Use the new optimized path as beacons to sample around
                beacons = path[1:-1]
                
            # if not reached:
                # Check if the new node is close to the goal cost of the node is less than the best cost
                # if np.sqrt((x_new-x_goal)**2 + (y_new-y_goal)**2) < distance_threshold:
            goal_distance = np.sqrt((x_new-x_goal)**2 + (y_new-y_goal)**2)
            if goal_distance < 50 and cost[(x_new, y_new)] + goal_distance < c_best:
                    end = time.time()
                    print("Goal reached: ", end-start, "seconds")

                    # Update the best cost
                    c_best = cost[(x_new, y_new)] + goal_distance

                    beacons = []

                    # Add the final node as a child of the current node to complete the path
                    children.setdefault((x_new, y_new), []).append((x_goal, y_goal))

                    # Add the final node to the parent dictionary
                    parent[(x_goal, y_goal)] = (x_new, y_new)

                    # Save the final node
                    x_final, y_final = x_goal, y_goal

                    # Add the cost of the final node
                    cost[(x_goal, y_goal)] = c_best

                    # Save the iteration at which the goal is reached
                    n = iterations

                    print("Current Cost: ", cost[(x_final, y_final)])

                    # Set the reached flag to True
                    reached = True
        
print("Final cost: ", cost[(x_final, y_final)]) 

### Informed-RRT*

In [None]:
canvas = map.copy()
x_start, y_start = start_node
x_goal, y_goal = goal_node

step_size = 50
distance_threshold = 1
search_radius = 70
# Make a lambda function to adjust the value of x to the visited space
adjust = lambda x, threshold: int(int(round(x*2)/2)/threshold)

# Dictionary to store visited nodes
visited = {(adjust(x_start, distance_threshold),
            adjust(y_start, distance_threshold)): 1}

# Dictionary to store the parent of each node
parent = {(x_start, y_start): (x_start, y_start)}

# Dictionary to store the children of each node
children = {(x_start, y_start): []}

# Dictionary to store the cost of each node
cost = {(x_start, y_start): 0}

# Function to check if the node is valid
def valid_node(x, y):
    # Adjust the value for visited nodes
    x_vis = adjust(x, distance_threshold)
    y_vis = adjust(y, distance_threshold)
    # Adjust the values for canvas
    x_cvs = int(round(x*2)/2)
    y_cvs = int(round(y*2)/2)

    # Check if the node is within the boundaries
    if 0 <= x < width and 0 <= y < height:
        # Check if the node is not in the obstacle and not visited
        if canvas[y_cvs, x_cvs, 0] != 0 and (x_vis, y_vis) not in visited:
            return True
    return False

# Function to check if the child node and parent node are not intersecting with the obstacle
def valid_edge(x_parent, y_parent, x_child, y_child):
    # Sample 3 points on the line joining the parent and child nodes, 
    # not including the parent and child nodes
    x_intermediate = np.linspace(x_parent, x_child, 5)[1:-1]
    y_intermediate = np.linspace(y_parent, y_child, 5)[1:-1]

    # Adjust the values for canvas
    x_intermediate = [int(round(x*2)/2) for x in x_intermediate]
    y_intermediate = [int(round(y*2)/2) for y in y_intermediate]

    # Check if any of the intermediate points are in the obstacle
    for x, y in zip(x_intermediate, y_intermediate):
        if canvas[y, x, 0] == 0:
            return False
    return True

def sample_unit_ball():
    a = np.random.random()
    b = np.random.random()

    if b < a:
        a, b = b, a

    sample = (b * np.cos(2 * np.pi * a / b),
              b * np.sin(2 * np.pi * a / b))
    
    return np.array([[sample[0]], [sample[1]], [0]])

# Function to sample a node for Informed-RRT*
def sample(x_start, y_start, x_goal, y_goal, c_max):
    if c_max < float('inf'):

        c_min = math.hypot(x_start - x_goal,
                           y_start - y_goal)
        x_center = np.array([[(x_start + x_goal) / 2.0],
                             [(y_start + y_goal) / 2.0], [0]])
        
        a1 = np.array([[(x_start - x_goal) / c_min],
                       [(y_start - y_goal) / c_min], [0]])
        
        id1_t = np.array([1.0, 0.0, 0.0]).reshape(1, 3)

        m = a1 @ id1_t

        u, s, vh = np.linalg.svd(m, True, True)
        c = u @ np.diag(
            [1.0, 1.0,
             np.linalg.det(u) * np.linalg.det(np.transpose(vh))]) @ vh

        r = [c_max / 2.0, math.sqrt(c_max ** 2 - c_min ** 2) / 2.0,
                math.sqrt(c_max ** 2 - c_min ** 2) / 2.0]
        
        rl = np.diag(r)

        x_ball = sample_unit_ball()

        rnd = np.dot(np.dot(c, rl), x_ball) + x_center

        x_rand = int(rnd[(0, 0)])
        y_rand = int(rnd[(1, 0)])

    else:
        x_rand = np.random.randint(clearance, width-clearance)
        y_rand = np.random.randint(clearance, height-clearance)  

    return x_rand, y_rand

reached = False
iterations = 0 # Number of iterations to improve the solution
c_best = float('inf') # Best cost so far
start = time.time()

while iterations < 100*5:
    
    # Get a new node
    # x_node = np.random.randint(clearance, width-clearance)
    # y_node = np.random.randint(clearance, height-clearance)

    x_node, y_node = sample(x_start, y_start, x_goal, y_goal, c_best)    

    # If the node is valid
    if valid_node(x_node, y_node):

        # Find the nearsest node in the tree to get the projected node
        min_dist = float('inf')

        for x, y in parent:
            dist = np.sqrt((x-x_node)**2 + (y-y_node)**2)
            if dist < min_dist:
                min_dist = dist
                x_parent, y_parent = x, y

        # Get the angle of the new node
        theta = np.arctan2(y_node-y_parent, x_node-x_parent)

        # Calculate the new node
        x_new = int(x_parent + step_size*np.cos(theta))
        y_new = int(y_parent + step_size*np.sin(theta))
        
        if reached:
            # Update c_best
            if cost[(x_final, y_final)] < c_best:
                c_best = cost[(x_final, y_final)]
                print("New best cost: ", c_best)
            iterations += 1

        # Check if the new node is valid
        if valid_node(x_new, y_new):

            # Collect all the nodes in the search radius
            neighbours = []
            for x, y in parent:
                if np.sqrt((x-x_new)**2 + (y-y_new)**2) < search_radius:
                    neighbours.append((x, y))

            # Find the node with the minimum cost to get new node
            min_cost = cost[(x_parent, y_parent)] + step_size
            for x, y in neighbours:
                new_cost = cost[(x, y)] + np.sqrt((x-x_new)**2 + (y-y_new)**2)
                if new_cost < min_cost:
                    min_cost = new_cost
                    x_parent, y_parent = x, y

            # Check if the edge between the parent and child nodes is valid
            if not valid_edge(x_parent, y_parent, x_new, y_new):
                continue

            # Add the cost of the new node
            cost[(x_new, y_new)] = min_cost

            # Check if rewiring to the newly added node will reduce the cost of the neighbours
            for x, y in neighbours:
                new_cost = cost[(x_new, y_new)] + np.sqrt((x-x_new)**2 + (y-y_new)**2)
                if new_cost < cost[(x, y)]:

                    # Check if the edge between the parent and child nodes is valid
                    if not valid_edge(x, y, x_new, y_new):
                        continue

                    previous_cost = cost[(x, y)]
                    cost[(x, y)] = new_cost
                    
                    # Remove this node from the children of its current parent
                    x_parent_neighbour, y_parent_neighbour = parent[(x, y)]
                    children[(x_parent_neighbour, y_parent_neighbour)].remove((x, y))

                    # Add the new node to the parent dictionary
                    parent[(x, y)] = (x_new, y_new)

                    # Add the new node to the children dictionary
                    children.setdefault((x_new, y_new), []).append((x, y))
                    
                    # If the cost of this node is reduced, we must update the cost of its children,
                    # and their children, and so on
                    cost_reduction = previous_cost - new_cost
                    queue = []
                    heapq.heappush(queue, ((x, y)))
                    while queue:
                        x_, y_ = heapq.heappop(queue)
                        # Check if the node has children
                        if (x_, y_) in children:
                            for child in children[(x_, y_)]:
                                    cost[child] -= cost_reduction
                                    heapq.heappush(queue, child)
                                    
            x_achieved, y_achieved = x_new, y_new

            x_adjusted = adjust(x_new, distance_threshold)
            y_adjusted = adjust(y_new, distance_threshold)
            
            # Add the new node to the visited nodes
            visited[(x_adjusted, y_adjusted)] = 1

            # Add the new node to the parent dictionary
            parent[(x_new, y_new)] = (x_parent, y_parent)

            # Add the new node to the children dictionary
            children.setdefault((x_parent, y_parent), []).append((x_new, y_new))

            
            # Check if the new node is close to the goal cost of the node is less than the best cost
            goal_distance = np.sqrt((x_new-x_goal)**2 + (y_new-y_goal)**2)
            if goal_distance < 50 and cost[(x_new, y_new)] + goal_distance < c_best:
            # if np.sqrt((x_new-x_goal)**2 + (y_new-y_goal)**2) < 50:
                end = time.time()
                print("Goal reached: ", end-start, "seconds")

                # Update the best cost
                c_best = cost[(x_new, y_new)] + goal_distance

                # Add the final node as a child of the current node to complete the path
                children.setdefault((x_new, y_new), []).append((x_goal, y_goal))

                # Add the final node to the parent dictionary
                parent[(x_goal, y_goal)] = (x_new, y_new)

                # Save the final node
                x_final, y_final = x_goal, y_goal

                # Add the cost of the final node
                cost[(x_goal, y_goal)] = c_best

                print("Current Cost: ", cost[(x_goal, y_goal)])

                # Set the reached flag to True
                reached = True

print("Final cost: ", cost[(x_final, y_final)]) 

## Generate optimal path

In [None]:
# Get the path from the parent dictionary
path = []
x, y = x_final, y_final
while (x, y) != (x_start, y_start):
    # print(x, y)
    path.append((x, y))
    x, y = parent[(x, y)]
path.append((x, y))
path.reverse()

# Calculate the distance between every point in the path
sum = 0
for i in range(len(path)-1):
    # print(path[i])
    sum += np.sqrt((path[i][0]-path[i+1][0])**2 + (path[i][1]-path[i+1][1])**2)

print("Total distance: ", sum)
print("Difference in cost: ", np.round(cost[(x_final, y_final)] - sum, 2))

## Plot the graph

In [None]:
# Plot the start and goal nodes
canvas = cv2.circle(canvas, (x_start, y_start), 5, (0, 0, 254), 10) 
canvas = cv2.circle(canvas, (x_goal, y_goal), 5, (0, 0, 254), 10)

canvas = map.copy()

# Plot a line between the parent and child nodes
for x, y in parent:
    x_parent, y_parent = parent[(x, y)]
    canvas = cv2.line(canvas, (x, y), (x_parent, y_parent), (0, 255, 0), 1)
    # Draw a purple circle at the parent node
    canvas = cv2.circle(canvas, (x_parent, y_parent), 5, (255, 0, 255), 2)

# Plot a circle at all the child points 
for x, y in children:
    for child in children[(x, y)]:
        canvas = cv2.circle(canvas, child, 5, (255, 0, 255), 2)

# Plot the path
for i in range(len(path)-1):
    x1, y1 = path[i]
    x2, y2 = path[i+1]
    canvas = cv2.line(canvas, (x1, y1), (x2, y2), (0, 0, 255), 2)

# Resize the canvas by a factor of scale
canvas_resized = cv2.resize(canvas, (width_resized, height_resized))

cv2.imshow("Canvas", canvas_resized)
cv2.waitKey(0)
cv2.destroyAllWindows()

Optionally, save the map

In [None]:
# Save the canvas, with captions for the different algorithms
# cv2.imwrite("RRT_star5.png", canvas_resized)

# cv2.imwrite("RRT_star-SMART5.png", canvas_resized)

cv2.imwrite("Informed-RRT_star5.png", canvas_resized)