In [1]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d
import scipy.ndimage as ndimage
from scipy.signal import convolve2d
import torch

In [20]:
class Environment():
    def __init__(self, pos, size, bound, voxel_size = np.array([0.1,0.1]), eps=1, order=1):
        """
        Args: 
            pos: np array with shape [N,2], with N being number of obstacles, indicating coordinate of obstacle'slower left corner
            size: np array with shape [N,2], with N being number of obstacles, indicating width and hight of obstacles
            bound: np array with shape [2,], upper boundary of the work space. Lower bound is implicitly (0,0)
        """
        self.pos = pos.astype(int)
        self.size = size.astype(int)
        self.bound = bound.astype(int)
        self.voxel_size = voxel_size
        self.ob_num = pos.shape[0]
        self.max_ob = np.max(size, axis = 0)
        self.eps = eps
        self.order = order 
        
        self.obstacle = self.obstacle()
        self.dis = self.dis() 
        self.dis_der = self.dis_der()
        self.dis_fun = self.dis_fun1()
        self.dis_der_fun = self.dis_der_fun1()
        self.cost_fun = self.cost_fun1()
        self.cost_der_fun = self.cost_der_fun1()
                
    def obstacle(self):
        """
        geometric shape of the environment 
        Returns: 
            obstacle: a boolean numpy array same shape as attribute bound, True indicates obstacle, False indicates free 
        """
        pos = self.pos
        size = self.size 
        bound = self.bound
        obstacle = np.zeros(bound,dtype = bool)
        for i in range(pos.shape[0]):
            low_left = pos[i]
            up_right = low_left + size[i]
            obstacle[tuple(map(slice, low_left, up_right))] = True
        return obstacle 
    
    def dis(self):
        """
        create nearest distance field, negative indicates inside obstacle
        Returns: 
            dis: a float numpy array same shape as attribute bound
        """
        bound = self.bound
        voxel_size = self.voxel_size 
        
        im = self.obstacle
            
        pad = np.ones(self.bound+2, dtype=bool)
        pad[1:bound[0]+1,1:bound[1]+1] = im    
            
        dis = ndimage.distance_transform_edt(-pad.astype(int) + 1, sampling=voxel_size)
        dis1 = ndimage.distance_transform_edt(pad.astype(int), sampling=voxel_size)
        dis[pad] = - dis1[pad]  # Add interior information
            
        dis = dis[1:bound[0]+1,1:bound[1]+1]
        return dis
    
    def dis_der(self):
        """
        applying sobel filter to nearest distance field to get and x and y gradient field 
        Returns: 
            dis_der: a float numpy array with shape [2,bound[0],bound[1]], x gradient and y gradient 
        """
        dis_der = np.zeros((2,self.bound[0],self.bound[1]),dtype=np.float64)
        for d in range(2):  # Treat image boundary like obstacle
            dis_der[d, ...] = ndimage.sobel(self.dis, axis=d, mode='constant', cval=0)/self.voxel_size[d]
        return dis_der
    
    def dis_fun1(self):
        """
        interpolate the nearest distance to get distance function
        Returns: 
            dis_fun: a function whose input is float numpy array with shape [N,2], N is number of inquiry points
                                      output is float numpy array with shape [N,], respecting cost of each inquiry points
        """ 
        factor = 1/self.voxel_size
        im = self.dis
        def dis_fun(x):
            x = np.multiply(x,factor)-0.5
            out = ndimage.map_coordinates(im, coordinates=x.T, order=self.order, mode='nearest')
            return out          
        return dis_fun
    
    def dis_der_fun1(self):
        """
        interpolate the x and y gradient field to get distance gradient function
        Returns: 
            dis_der_fun: a function whose input is float numpy array with shape [N,2], N is number of inquiry points
                                      output is float numpy array with shape [N,2], respecting x and y gradient of each point
        """ 
        der = self.dis_der
        factor = 1/self.voxel_size
        def dis_der_fun(x):
            x = np.multiply(x,factor)-0.5
            gx = ndimage.map_coordinates(der[0,...], coordinates=x.T, order=self.order, mode='nearest')
            gy = ndimage.map_coordinates(der[1,...], coordinates=x.T, order=self.order, mode='nearest')
            return np.stack((gx,gy),axis=0).T
        return dis_der_fun
    
    def cost_fun1(self):
        """
        assign cost to nearest distance field
        Returns: 
            cost_fun: a function whose input is float numpy array with shape [N,2], N is number of inquiry points
                                       output is float numpy array with shape [N,], cost of each point
        """
        eps = self.eps
        def cost_fun(x):
            dis = self.dis_fun(x)
            cost = np.zeros(dis.shape,dtype=np.float64)
            cost[dis>eps] = 0
            cost[np.logical_and(dis>0,dis<=eps)] = np.square(dis[np.logical_and(dis>0,dis<=eps)]-eps)/(2*eps)
            cost[dis<=0] = 1/(2*eps) - dis[dis<=0]
            return cost
        return cost_fun

    def cost_der_fun1(self):
        """
        assign cost gradient
        Returns: 
            cost_der_fun: a function whose input is float numpy array with shape [N,2], N is number of inquiry points
                                           output is float numpy array with shape [N,2], x and y cost gradient of each point
        """
        eps = self.eps
        def cost_der_fun(x):
            dis = self.dis_fun(x)
            dis_der = self.dis_der_fun(x)
            der = cost = np.zeros((len(dis),2),dtype=np.float64)
            der[dis<=0] = 0
            der[np.logical_and(dis>0,dis<=eps)] = np.multiply((dis[np.logical_and(dis>0,dis<=eps)]-eps),dis_der[np.logical_and(dis>0,dis<=eps)].T).T/eps
            der[dis<=0] = - dis_der[dis<0]
            return der 
        return cost_der_fun

      

        
class Objective():
    def __init__(self,start, end, opt_num, sp_num,env,w):
        """
        Args: 
            start: np array with shape [2,], start point of the robot
            end: np: array with shape [2,], end point of the robot
            opt_num: number of optimization points 
            sp_num: number of sampling points on line segements between two optimization points for calculating objective
            env: environment the objective function based on
            w: weight term for length objective 
            ob_fun: objective function 
            ob_der_fun: derivative of objective function
        """
        self.start = start 
        self.end = end
        self.opt_num = opt_num 
        self.sp_num = sp_num
        self.env = env
        self.w = w  #length weight 
        
        self.ob_fun = self.ob_fun1()
        self.ob_der_fun = self.ob_der_fun1()
        
    def ob_fun1(self):
        """
        objective function
        Returns: 
            ob_fun: a function whose input is float numpy array with shape [opt_num, 2]
                                              output is float numpy scalar the objective value
        """
        
        env = self.env
        w = self.w
        start = self.start
        end = self.end
        def ob_fun(x):
            x1 = self.all_points(x)
            return np.mean(env.cost_fun(x1)) + w*np.sum(np.diff(np.insert(x,(0,x.shape[0]),(start,end),axis=0),axis=0)**2)
        return ob_fun
    
    def ob_der_fun1(self):
        """
        derivative of objective function
        Returns: 
            ob_der_fun: a function whose input is a float numpy array with shape [opt_num, 2]
                                         output is a float numpy scalar with shape [opt_num,2], the derivative 
        """
        env = self.env
        w = self.w
        opt_num = self.opt_num
        sp_num = self.sp_num
        def ob_der_fun(x):
            ### gradient of obstacle cost ###
            x1 = self.all_points(x)
            x1 = np.delete(x1,0,0)
            x1 = np.delete(x1,x1.shape[0]-1,0)
            x1 = torch.Tensor(x1).reshape(1,1,x1.shape[0],x1.shape[1])
            kernel1 = np.append(np.arange(1,sp_num+2,1),np.arange(sp_num,0,-1))/(sp_num+1)
            kernel1 = torch.Tensor(kernel1).reshape(1,1,kernel1.shape[0],1)
            re1 = torch.nn.functional.conv2d(x1,kernel1,stride=(sp_num+1,1))
            re1 = re1/(opt_num+(opt_num+1)*sp_num)
            re1 = torch.squeeze(torch.squeeze(re1,0),0).numpy()
            ### gradient of length cost ###
            x2 = np.insert(x,(0,x.shape[0]),(start,end),axis=0)
            x2 = torch.Tensor(x2).reshape(1,1,x2.shape[0],x2.shape[1])
            kernel2 = torch.Tensor([-1,2,-1]).reshape(1,1,3,1)
            re2 = 2*w*torch.nn.functional.conv2d(x2,kernel2,stride=1)
            re2 = torch.squeeze(torch.squeeze(re2,0),0).numpy()
            return re1+re2
        return ob_der_fun
    
    def all_points(self,x):
        """
        combine all start, end, optimization and subsampling points
        Args:
            x: float numpy array with shape [opt_num,2], optimization points
        Returns: 
            x1: float numpy array with shape [opt_num+2+sp_num*(opt_num+1), 2]
        """
        start = self.start 
        end = self.end 
        sp_num = self.sp_num
        x1 = np.insert(x,(0,x.shape[0]),(start,end),axis=0)
        for i in range(x1.shape[0]-1):
            x2 = np.linspace(x1[i+(sp_num)*i],x1[i+1+(sp_num)*i],sp_num+1,endpoint=False)
            x2 = np.delete(x2,0,0)
            x1 = np.insert(x1,i+1+(sp_num)*i,x2,axis=0)
        return x1
    
    def initial(self):
        """
        initialize the trajectory by connecting start, end point and uniform sampling
        Returns: 
            x0: float numpy array with shape [opt_num, 2], initial optimization points
        """
        x0 = np.linspace(self.start,self.end,self.opt_num+1,endpoint=False)
        x0 = np.delete(x0,0,0)
        return x0
        

In [21]:
pos = np.array([[10,10],[40,50]])
size = np.array([[20,20],[10,30]])
bound = np.array([64,64])
start = np.array([0.1,0.1])
end = np.array([6,6])
opt_num = 10
sp_num = 5
w = 1


### objective demo ###
env = Environment(pos,size,bound)
obj = Objective(start, end, opt_num, sp_num,env,w)
k = obj.ob_fun
kk = obj.ob_der_fun
print(k(obj.initial()))
print(kk(obj.initial()))

### objective demo ###


### environment demo ###
a = env.obstacle
b = env.dis
c = env.dis_der
d = env.dis_fun
f = env.dis_der_fun
g = env.cost_fun
h = env.cost_der_fun
#plt.matshow(b)
#print(c.shape)
x = np.array([[1.1,1.2],[1.2,2],[3,4]])
y = np.array([[1.1,1.2]])
#print(d(x))
### environment demo


### cost visualization ###
xx = np.linspace(0, 6.4, 30)
yy = np.linspace(0, 6.4, 30)
XX,YY = np.meshgrid(xx,yy)
coords=np.array((XX.ravel(), YY.ravel())).T
zz = g(coords)
#ax = plt.axes(projection='3d')
#ax.scatter3D(coords.T[0], coords.T[1], zz, c=zz, cmap='Greens')
### cost visualization ###

6.776776743432687
[[0.05874126 0.05874126]
 [0.10825151 0.10825151]
 [0.1577627  0.1577627 ]
 [0.20727272 0.20727272]
 [0.25678322 0.25678322]
 [0.30629373 0.30629373]
 [0.3558042  0.3558042 ]
 [0.40531468 0.40531468]
 [0.45482516 0.45482516]
 [0.50433564 0.50433564]]
