In [None]:
import numpy as np
from PIL import Image
import heapq
import random
import time
from IPython.display import display, clear_output
import cv2

OCCUPIED = 100
ARM = 200

def arm_coordinates(theta1, l1, theta2, l2):
    v1 = np.array([np.cos(theta1*np.pi*2), np.sin(theta1*np.pi*2)])
    v2 = np.array([np.cos(theta2*np.pi*2), np.sin(theta2*np.pi*2)])

    p1 = np.array([0.5, 0.2])
    p2 = p1 + v1*l1
    p3 = p2 + v2*l2
    return p1, p2, p3

def in_bounds(p):
    return (p >=0.0).all() and (p <= 1.0).all()

class GridSpace():
    def __init__(self, dims):
        self.dims = dims
        self.grid = np.zeros(dims, dtype=np.uint8)
        self.grid_vis = np.copy(self.grid)
        
        
    def at(self, point, as_array=False):
        assert len(point) == len(self.dims)
        indices = []
        for i in range(len(self.dims)):
            indices.append(int(self.dims[i]*point[i]))
            if indices[i] == self.dims[i]: indices[i] -= 1
        if as_array: return np.array(indices, dtype=np.int)
        return tuple(indices)
    
    def mark(self, point, mark):
        self.grid_vis[self.at(point)] = mark
    
    
    def occupied(self, point=None):
        for p in point: 
            if p<0.0 or p>1.0: return False
        return self.grid[self.at(point)] == OCCUPIED
    
    def check_arm(self, theta1, l1, theta2, l2):
        assert len(self.dims) == 2
        p1, p2, p3 = arm_coordinates(theta1, l1, theta2, l2)
        return self.check_line(p1, p2) and self.check_line(p2, p3)
    
    def check_line(self, p1, p2):
        assert len(self.dims) == 2
        if not(in_bounds(p1) and in_bounds(p2)): return False
        
        #smallest step size: size of a cell dx,dy
        dx,dy = 1.0/self.dims[0],1.0/self.dims[1]
        d = min(dx,dy)
        v = p2-p1
        v_max = np.max(np.abs(v))
        steps = int(np.ceil(v_max/d))
        v = v / steps
        for i in range(steps+1):
            if self.occupied(p1 + i*v): return False
        return True
        
        
    
    def add_arm(self, theta1, l1, theta2, l2, mark=ARM):
        assert len(self.dims) == 2
        p1, p2, p3 = arm_coordinates(theta1, l1, theta2, l2)
        self.add_line(p1, p2, mark=mark)
        self.add_line(p2, p3, mark=mark)
        
        
    def add_line(self, p1, p2, mark=ARM):
        assert len(self.dims) == 2
        #smallest step size: size of a cell dx,dy
        dx,dy = 1.0/self.dims[0],1.0/self.dims[1]
        d = min(dx,dy)
        v = p2-p1
        v_max = np.max(np.abs(v))
        steps = int(np.ceil(v_max/d))
        v = v / steps
        for i in range(steps):
            p = p1 + i*v
            if not in_bounds(p): return
            self.grid_vis[self.at(p)] = mark
    
    
    def add_rect(self, rect):
        assert len(self.dims) == 2
        start = self.at([rect.x, rect.y])
        end = self.at([rect.x+rect.width, rect.y+rect.height])
        self.grid[start[0]:end[0], start[1]:end[1]] = OCCUPIED
        self.grid_vis[start[0]:end[0], start[1]:end[1]] = OCCUPIED
        
        
    def search_path(self, p1, p2, animate=False):
        assert len(self.dims) == 2
        p1,p2 = np.array(p1),np.array(p2)
        
        class Node:
            def __init__(self, p, parent):
                self.p, self.parent = p, parent
            def __str__(self):
                return str(self.p)
        
        def simple_distance(p1, p2):
            return np.sum(np.abs(p1-p2))
        
        factor = 30.0
        max_distance = factor/min(self.dims)
        diff = int(min(self.dims)/factor)
        visited = np.zeros(self.dims, dtype=np.bool)
        
        d1 = simple_distance(p1, p2)
        p1 = Node(p1, None)
        nodes = [(d1,p1)]
        frontier_heap = [(d1, p1)]
        repick = False
        reached_goal = False
        while not reached_goal:
            if animate: self.display()
            time.sleep(0.1)
            
            if len(frontier_heap) == 0:
                repick = True
                frontier_heap = nodes[:]
                #print('Repicking')
                
            _,p = frontier_heap.pop(0)
            
            #expand direction (roughly)
            if repick or p.parent is None:
                direction = np.array([0,0])
            else:
                direction = p.p - p.parent.p
                
            for i in range(15):
                v = direction + (np.random.rand(2)-0.5) * max_distance
                new_p = p.p+v
                if not in_bounds(new_p): continue
                new_indices = self.at(new_p)
                if not visited[new_indices] and self.check_line(p.p, new_p):
                    p_new = Node(new_p, p)
                    if self.check_line(new_p, p2): 
                        reached_goal = True
                        break
                        
                    distance = simple_distance(new_p, p2)
                    heapq.heappush(frontier_heap, (distance, p_new))
                    heapq.heappush(nodes, (distance, p_new))
                    ix,iy = new_indices[0],new_indices[1]
                    ix_min, iy_min = max(ix-diff, 0), max(iy-diff, 0)
                    ix_max, iy_max = min(ix+diff, self.dims[0]), min(iy+diff, self.dims[1])
                    visited[ix_min:ix_max,iy_min:iy_max] = True
                    #self.grid_vis[ix-diff:ix+diff,iy-diff:iy+diff] = 10
                    if animate: self.mark(new_p, 255)
        
        path = [p2]
        self.add_line(p_new.p, p2, mark=50)
        while p_new.parent is not None:
            path.insert(0, p_new.p)
            self.add_line(p_new.p, p_new.parent.p, mark=50)
            p_new = p_new.parent
        path.insert(0,p1.p)
        self.display()
        return path
    
    
    def reset_vis(self):
        self.grid_vis = np.copy(self.grid)
        
    def animate_arm(self, path, l1, l2, video_file=None):
        
        if video_file is not None:
            fourcc = cv2.VideoWriter_fourcc(*'MPEG')
            writer = cv2.VideoWriter(video_file, fourcc, 30.0, (500,500))
        
        secs_per_point = 1
        hertz = 30
        frames = hertz * secs_per_point
        dt = 1./frames
        
        for i in range(len(path)-1):
            p1, p2 = path[i], path[i+1]
            v = p2 - p1
            
            for frame in range(frames):
                self.reset_vis()
                p = p1 + v*frame*dt
                self.add_arm(p[0], l1, p[1], l2, mark=200)
                img = np.array(self.display())
                if video_file is not None:
                    img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
                    writer.write(img)
                time.sleep(dt)
                
        if video_file is not None:
            writer.release()
        
        
        
        
        
    def display(self, path=None):
        img_a = np.swapaxes(255-self.grid_vis, 1, 0)
        img = Image.fromarray(img_a, 'L')
        img = img.resize((500,500))
        clear_output(wait=True)
        display(img)
        if path is not None: img.save(path)
        return img
        

        
class Rect():
    def __init__(self, x, y, width, height):
        self.x, self.y, self.width, self.height = x, y, width, height
        
        
def create_arm_c_space(space, l1, l2, dims=[500,500]):
    assert len(dims) == 2 and len(space.dims) == 2
    c_space = GridSpace(dims)
    for t1 in range(dims[0]):
        for t2 in range(dims[1]):
            theta1 = float(t1)/dims[0]
            theta2 = float(t2)/dims[1]
            free = space.check_arm(theta1, l1, theta2, l2)
            #print(theta1, theta2, free)
            if not free: 
                c_space.grid[t1,t2] = OCCUPIED
                c_space.grid_vis[t1,t2] = OCCUPIED
    return c_space
            
            
    
space = GridSpace([100,100])
space.add_rect(Rect(0., 0., 1.1, 0.1))
space.add_rect(Rect(0.1, 0.18, 0.3, 0.15))
space.add_rect(Rect(0.45, 0.41, 0.1, 0.4))
space.add_rect(Rect(0.7, 0.18, 0.2, 0.15))

theta1_start, theta2_start = 0.1, 0.2
theta1_end, theta2_end = 0.35, 0.3
l1, l2 = 0.2, 0.2

c_space = create_arm_c_space(space, l1, l2, dims=space.dims)

#print( space.check_line(np.array([0.0,0.0]), np.array([0.2, 0.2])) )
#space.add_line(np.array([0.0,0.0]), np.array([0.5, 0.3]))
#print(space.check_arm(theta1, l1, theta2, l2))
space.add_arm(theta1_start, l1, theta2_start, l2, 200)
space.add_arm(theta1_end, l1, theta2_end, l2, 125)


In [None]:
c_space.reset_vis()
c_space.mark([theta1_start, theta2_start], 200)
c_space.mark([theta1_end, theta2_end], 125)
path = c_space.search_path((theta1_start, theta2_start), 
                           (theta1_end, theta2_end), animate=False)


In [None]:
c_space.display('c_space.png')
i=space.display('space.png')      

In [None]:
space.animate_arm(path, l1, l2, video_file='path.avi')