In [14]:
import numpy as np
import matplotlib.pyplot as plt

"""
Simulation environment
"""
LINK_LENGTH = [1, 1]    # lengths of arm links

# environment 1
OBSTACLES = [[1.75, 0.75, 0.6], [-.5, 1.5, 0.5], [0, -1, 0.7]] # circular obstacles [x, y, r]
START = (1.0, 0.0)      # start joint configuration
GOAL = (-3.0, -1.0)     # goal joint configuration

# environment 2
#OBSTACLES = [[1.75, 0.75, 0.6], [-.55, 1.5, 0.5], [0, -1, 0.7], [-2, -0.5, 0.6]]
#START = (-3.0, 1.0)
#GOAL = (-0.5, 0.5)

"""
RRT parameters
"""
MAX_NODES = 500    # maximum number of nodes to sample
BIAS = 0.1          # probability that a sampled node is manually set to the goal
DELTA = 0.2       # length of tree branches
EDGE_INC = 0.02     # edge sampling increment for collision detection

"""
Class for an n-link arm
"""
class NLinkArm(object):
    def __init__(self, link_lengths, joint_angles):
        self.n_links = len(link_lengths)
        if self.n_links != len(joint_angles):
            raise ValueError()

        self.link_lengths = np.array(link_lengths)
        self.joint_angles = np.array(joint_angles)
        #There are three points here
        self.points = [[0, 0] for _ in range(self.n_links + 1)]

        self.lim = sum(link_lengths)
        self.update_points()

    def update_joints(self, joint_angles):
        self.joint_angles = np.array(joint_angles)
        self.update_points()

    def update_points(self):
        for i in range(1, self.n_links + 1):
            #get the point1 and point2 location
            self.points[i][0] = self.points[i - 1][0] + \
                self.link_lengths[i - 1] * \
                np.cos(np.sum(self.joint_angles[:i]))
            self.points[i][1] = self.points[i - 1][1] + \
                self.link_lengths[i - 1] * \
                np.sin(np.sum(self.joint_angles[:i]))

        self.end_effector = np.array(self.points[self.n_links]).T

"""
Utility functions
"""
def detect_collision(arm, config):
    """
    :param arm: NLinkArm object
    :param config: Configuration (joint angles) of the arm
    :return: True if any part of arm collides with obstacles, False otherwise
    """
    arm.update_joints(config)
    points = arm.points
    for k in range(len(points) - 1):
        for circle in OBSTACLES:
            a_vec = np.array(points[k])
            b_vec = np.array(points[k+1])
            c_vec = np.array([circle[0], circle[1]])
            radius = circle[2]

            line_vec = b_vec - a_vec
            line_mag = np.linalg.norm(line_vec)
            circle_vec = c_vec - a_vec
            proj = circle_vec.dot(line_vec / line_mag)

            if proj <= 0:
                closest_point = a_vec
            elif proj >= line_mag:
                closest_point = b_vec
            else:
                closest_point = a_vec + line_vec * proj / line_mag

            if np.linalg.norm(closest_point - c_vec) <= radius:
                return True

    return False


def closest_euclidean(q, qp):
    """
    :param q, qp: Two 2D vectors in S1 x S1
    :return: qpp, dist. qpp is transformed version of qp so that L1 Euclidean distance between q and qpp
    is equal to toroidal distance between q and qp. dist is the corresponding distance.
    """
    q = np.array(q)
    qp = np.array(qp)

    A = np.meshgrid([-1,0,1], [-1,0,1])
    qpp_set = qp + 2*np.pi*np.array(A).T.reshape(-1,2)
    distances = np.linalg.norm(qpp_set-q, 1, axis=1)
    ind = np.argmin(distances)
    dist = np.min(distances)

    return qpp_set[ind], dist

def clear_path(arm, q1, q2):
    """
    :param arm: NLinkArm object
    :param q1, q2: Two configurations in S1 x S1
    :return: True if edge between q1 and q2 sampled at EDGE_INC increments collides with obstacles, False otherwise
    """
    q2p, dist = closest_euclidean(q1,q2)  #from q1 to q1p(q2prime) is the shorter segment 
    q_angle = np.arctan2(q2p[1]-q1[1], q2p[0]-q1[0])
    #print(f"q2p is {q2p}, dist is {dist}, q_angle is {q_angle}")
    
    #continue add EDGE_INC
    config = np.array(q1)
    while (EDGE_INC < dist) :
        config[0] += np.cos(q_angle) * EDGE_INC
        config[1] += np.sin(q_angle) * EDGE_INC
        if detect_collision(arm, config) == True:
            return True
        else:
            dist -= EDGE_INC
    if detect_collision(arm, q2p) == True:
        return True
    else:
        return False

def find_qnew(arm, tree, qrand):
    """
    :param tree: RRT dictionary {(node_x, node_y): (parent_x, parent_y)}
    :param qrand: Randomly sampled configuration
    :return: qnear in tree, qnew between qnear and qrand with distance DELTA from qnear
    """
    #dists includes (dist, key)
    dists = [(closest_euclidean(qrand, key)[1], key) for key in tree.keys()]
    qnear = sorted(dists, key = lambda x: x[0])[0][1]
    qrandp, dist = closest_euclidean(qnear, qrand)
    qnew = [0,0]
    if dist < DELTA:
        qnew = qrandp
    else:
        q_angle = np.arctan2(qrandp[1]-qnear[1], qrandp[0]-qnear[0])
        qnew[0] = qnear[0] + np.cos(q_angle) * DELTA
        qnew[1] = qnear[1] + np.sin(q_angle) * DELTA
    
    if clear_path(arm, qnear, qnew):
        qnew = None
    else:
        if qnew[0] > np.pi:
            qnew[0] -= 2*np.pi
        elif qnew[0] < -np.pi:
            qnew[0] += 2*np.pi 
        if qnew[1] > np.pi:
            qnew[1] -= 2*np.pi
        elif qnew[1] < -np.pi:
            qnew[1] += 2*np.pi
        qnew = tuple(qnew)
    
    return qnear, qnew

def find_qnew_greedy(arm, tree, qrand):
    """
    :param arm: NLinkArm object
    :param tree: RRT dictionary {(node_x, node_y): (parent_x, parent_y)}
    :param qrand: Randomly sampled configuration
    :return: qnear in tree, qnew between qnear and qrand as close as possible to qrand in increments of DELTA
    """
    #dists includes (dist, key)
    qnear, qnew = find_qnew(arm, tree, qrand)
    final_qnew = [0,0] #final_qnew is used to store the latest qnew which will be returned
    if qnew == None:
        final_qnew = None
        return qnear, final_qnew
    else:
        qrandp, dist = closest_euclidean(qnew, qrand)
        q_angle = np.arctan2(qrandp[1]-qnew[1], qrandp[0]-qnew[0])
        while True:
            new_qnew = [0,0] #new_qnew is the next qnew after the current qnew, clear_path will be used afterwards
            if dist < DELTA:
                new_qnew = qrandp
                if clear_path(arm, qnew, new_qnew):
                    final_qnew = qnew
                    break
                else:
                    final_qnew = qrandp
                    break
            else:  
                new_qnew[0] = qnew[0] + np.cos(q_angle) * DELTA
                new_qnew[1] = qnew[1] + np.sin(q_angle) * DELTA
            if clear_path(arm, qnew, new_qnew):
                final_qnew = qnew
                break
            else:
                qnew = new_qnew
                dist -= DELTA
    if final_qnew[0] > np.pi:
        final_qnew[0] -= 2*np.pi
    elif final_qnew[0] < -np.pi:
        final_qnew[0] += 2*np.pi 
    if final_qnew[1] > np.pi:
        final_qnew[1] -= 2*np.pi
    elif final_qnew[1] < -np.pi:
        final_qnew[1] += 2*np.pi
    final_qnew = tuple(final_qnew)
        
        
    return qnear, final_qnew

def construct_tree(arm):
    """
    :param arm: NLinkArm object
    :return: roadmap: Dictionary of nodes in the constructed tree {(node_x, node_y): (parent_x, parent_y)}
    :return: path: List of configurations traversed from start to goal
    """
    reach_goal = False
    tree = {START:None}
    for i in range(1,MAX_NODES):
        np.random.seed()
        if np.random.rand() > BIAS:
            qrand = (np.random.uniform(-np.pi, np.pi), np.random.uniform(-np.pi, np.pi))
        else:
            qrand = GOAL
        #change the function between(find_qnew, find_qnew_greedy) to see the different strategies
        qnear, qnew = find_qnew_greedy(arm, tree, qrand)
        #qnear, qnew = find_qnew(arm, tree, qrand)
        if qnew == None:
            continue
        _,dist2goal = closest_euclidean(qnew, GOAL)
        if dist2goal <= DELTA:
            #set qnew = GOAL
            if clear_path(arm, qnear, GOAL):
                continue
            else:
                tree[GOAL] = qnear
                reach_goal = True
                break
        else:
            tree[qnew] = qnear
            
    if reach_goal == True:
        route = [GOAL]
        tree_temp = tree.copy()
        while True:
            parent = tree_temp.pop(route[-1])
            route.append(parent) 
            if parent == START:
                break
        route = route[::-1]
        return tree, route
    else:
        return tree,[]

"""
Plotting and visualization functions
"""

def get_occupancy_grid(arm, M):
    grid = [[0 for _ in range(M)] for _ in range(M)]
    theta_list = [2 * i * np.pi / M for i in range(-M // 2, M // 2 + 1)]
    for i in range(M):
        for j in range(M):
            grid[i][j] = int(detect_collision(arm, [theta_list[i], theta_list[j]]))
    return np.array(grid)

def plot_roadmap(ax, roadmap):
    for node, parent in roadmap.items():
        if parent is not None:
            euc_parent, _ = closest_euclidean(node, parent)
            euc_node, _ = closest_euclidean(parent, node)
            ax.plot([node[0], euc_parent[0]], [node[1], euc_parent[1]], "-k")
            ax.plot([euc_node[0], parent[0]], [euc_node[1], parent[1]], "-k")
        ax.plot(node[0], node[1], ".b")

def plot_arm(plt, ax, arm):
    for obstacle in OBSTACLES:
        circle = plt.Circle((obstacle[0], obstacle[1]), radius=obstacle[2], fc='k')
        plt.gca().add_patch(circle)

    for i in range(arm.n_links + 1):
        if i is not arm.n_links:
            ax.plot([arm.points[i][0], arm.points[i + 1][0]], [arm.points[i][1], arm.points[i + 1][1]], 'r-')
        ax.plot(arm.points[i][0], arm.points[i][1], 'k.')

def visualize_spaces(arm):
    plt.subplots(1, 2)

    plt.subplot(1, 2, 1)
    grid = get_occupancy_grid(arm, 200)
    plt.imshow(np.flip(grid.T, axis=0))
    plt.xticks([0,50,100,150,200], ["-\u03C0", "-\u03C0/2", "0", "\u03C0/2", "\u03C0"])
    plt.yticks([0,50,100,150,200], ["\u03C0", "\u03C0/2", "0", "-\u03C0/2", "-\u03C0"])
    plt.title("Configuration space")
    plt.xlabel('joint 1')
    plt.ylabel('joint 2')

    ax = plt.subplot(1, 2, 2)
    arm.update_joints(START)
    plot_arm(plt, ax, arm)
    plt.title("Workspace")
    plt.xlabel('x')
    plt.ylabel('y')
    plt.axis('scaled')
    plt.xlim(-3.0, 3.0)
    plt.ylim(-3.0, 3.0)
    plt.show()

def animate(arm, roadmap, route):
    ax1 = plt.subplot(1, 2, 1)
    plot_roadmap(ax1, roadmap)
    if route:
        plt.plot(route[0][0], route[0][1], "Xc")
        plt.plot(route[-1][0], route[-1][1], "Xc")
    plt.title("Configuration space")
    plt.xlabel('joint 1')
    plt.ylabel('joint 2')
    plt.axis('scaled')
    plt.xlim(-3.2, 3.2)
    plt.ylim(-3.2, 3.2)

    ax2 = plt.subplot(1, 2, 2)
    arm.update_joints(START)
    plot_arm(plt, ax2, arm)
    plt.title("Workspace")
    plt.xlabel('x')
    plt.ylabel('y')
    plt.axis('scaled')
    plt.xlim(-3.0, 3.0)
    plt.ylim(-3.0, 3.0)
    plt.pause(1)

    for config in route:
        arm.update_joints([config[0], config[1]])
        ax1.plot(config[0], config[1], "xr")
        ax2.lines = []
        plot_arm(plt, ax2, arm)
        # Uncomment here to save the sequence of frames
        #plt.savefig('frame{:04d}.png'.format(i))
        plt.pause(0.3)

    plt.show()

"""
Main function
"""
def main():
    ARM = NLinkArm(LINK_LENGTH, [0,0])
    visualize_spaces(ARM)
    roadmap, route = construct_tree(ARM)
    if not route:
        print("No path found!")
    print(route)
    animate(ARM, roadmap, route)

if __name__ == '__main__':
    main()