In [1]:
import numpy as np
import cv2
from patcher import *
import time

In [2]:
def circular_mask(size, center, min_radius, max_radius, distance=1, forward=True):
    '''
    if not type(size) in (tuple, list): raise ValueError("Argument size is not list or tuple.")
    if not type(center) in (tuple, list): raise ValueError("Argument center is not list or tuple.")
    '''
    
    h=size[0]
    w=size[1]

    Y, X = np.ogrid[:h, :w]
    dist_from_center = np.sqrt((X - center[0])**2 + (Y-center[1])**2)
    maskR=(dist_from_center >= min_radius) & (dist_from_center <= max_radius)
    return maskR

In [3]:
def range_mask(size, center, radius, coord, theta, distance=1, forward=True, right=True):
    '''
    if not type(size) in (tuple, list): raise ValueError("Argument size is not list or tuple.")
    if not type(center) in (tuple, list): raise ValueError("Argument center is not list or tuple.")
    if not type(coord) in (tuple, list): raise ValueError("Argument coord is not list or tuple.")
    '''
    
    h=size[0]
    w=size[1]

    Y, X = np.ogrid[:h, :w]
    
    tc=(distance%(2*np.pi*radius))/radius
    tc=tc if forward else (-tc)%(2*np.pi)
    tc=tc if right else (-tc)%(2*np.pi)
    
    fA=lambda x,y:y>np.tan(max((theta%(2*np.pi)+tc)%(2*np.pi),1e-7))*(x-center[0])+center[1]
    fC=lambda x,y:y<=np.tan(max(theta%(2*np.pi),1e-7))*(x-center[0])+center[1]
    
    maskA=fA(X,Y)
    maskC=fC(X,Y)
    
    tX=(coord[0]-center[0])*np.cos(tc)-(coord[1]-center[1])*np.sin(tc)+center[0]
    tY=(coord[0]-center[0])*np.sin(tc)+(coord[1]-center[1])*np.cos(tc)+center[1]
    
    if not fA(coord[0],coord[1]): maskA=~maskA
    
    if not fC(tX,tY): maskC=~maskC
    
    maskR=maskA & maskC
    
    if tc>np.pi: maskR=~maskR
    if not forward: maskR=~maskR
    if not right: maskR=~maskR
    
    return maskR

In [4]:
def path_mask(pose, direction, steer, distance, size, width, length, resolution=20, forward=True):
    if steer==0:
        tf_direction=direction if forward else (direction+2)%2-1
        
        w=size[0]
        h=size[1]

        Y, X = np.ogrid[:h, :w]
        
        mask=None
        
        if tf_direction==0:
            xS=-width*resolution/2+pose[0]
            xE=width*resolution/2+pose[0]
            yS=-distance+pose[1]
            yE=pose[1]
            
            mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
        elif tf_direction in [-1,1]:
            xS=-width*resolution/2+pose[0]
            xE=width*resolution/2+pose[0]
            yS=pose[1]
            yE=distance+pose[1]
            
            mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
        elif tf_direction==0.5:
            xS=pose[1]
            xE=distance+pose[1]
            yS=-width*resolution/2+pose[1]
            yE=width*resolution/2+pose[1]
            
            mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
        elif tf_direction==-0.5:
            xS=-distance+pose[1]
            xE=pose[1]
            yS=-width*resolution/2+pose[1]
            yE=width*resolution/2+pose[1]
            
            mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
        else:
            theta_x=((tf_direction-0.5)%2)*np.pi
            theta_y=(tf_direction%2)*np.pi
            xG=np.tan(theta_x)
            yG=np.tan(theta_y)
            
            alpha=lambda x,y:((x)*np.cos(theta_y)-(y)*np.sin(theta_y))+pose[0]
            beta=lambda x,y:((x)*np.sin(theta_y)+(y)*np.cos(theta_y))+pose[1]
            
            x1=lambda x:xG*(x-alpha(-width*resolution/2,0))+beta(-width*resolution/2,0)
            x2=lambda x:xG*(x-alpha(width*resolution/2,0))+beta(width*resolution/2,0)
            
            y1=lambda x:yG*(x-alpha(0,-distance))+beta(0,-distance)
            y2=lambda x:yG*(x-alpha(0,0))+beta(0,0)
            
            xS=Y>=x1(X) if tf_direction>0 else Y<=x1(X)
            xE=Y<=x2(X) if tf_direction>0 else Y>=x2(X)
            yS=Y>=y1(X) if abs(tf_direction)<0.5 else Y<=y1(X)
            yE=Y<=y2(X) if abs(tf_direction)<0.5 else Y>=y2(X)
            
            mask=xS & xE & yS & yE
             
        return mask
    else:
        tf_direction=(direction%2)*np.pi

        tf_steer=max(np.radians(steer%360),1e-2)

        rad_M=np.array((length/np.tan(tf_steer),length))*resolution
        rotation_kernel=np.array([[np.cos(tf_direction),-np.sin(tf_direction)],
                                 [np.sin(tf_direction),np.cos(tf_direction)]])
        rad_M=np.matmul(rotation_kernel,rad_M) + pose

        wheels=np.array([
            ((-width/2),0),
            ((width/2),0),
            ((-width/2),length),
            ((width/2),length)
        ])*resolution

        rads=np.sqrt(np.sum((wheels-(np.array((length/np.tan(tf_steer),length))*resolution))**2, axis=-1))
        
        min_rad=np.min(rads)
        max_rad=np.max(rads)
        avg_rad=np.average(rads)
    
        return range_mask(size,rad_M,avg_rad,pose,tf_direction,distance,forward,steer>=0) & circular_mask(size,rad_M,min_rad,max_rad)


In [5]:
def check_collision(bin_map, pose, direction, steer, distance, width, length, resolution=20, forward=True):
    size=np.array((len(bin_map[0]),len(bin_map)))
    
    tf_pose=size/2-np.array([pose[0],-pose[1]])*resolution
    
    coverage=int(distance*resolution*1.1)
    xS, yS= tf_pose-coverage
    xE, yE= tf_pose+coverage
    
    tmp_map=bin_map[int(xS):int(xE), int(yS):int(yE)]
    
    size=np.array((len(tmp_map[0]),len(tmp_map)))
    
    mask_pose=(coverage,coverage)
    
    return tmp_map | path_mask(mask_pose, direction, steer, distance*resolution, size, width, length, resolution, forward)
    
    mask=path_mask(mask_pose, direction, steer, distance*resolution, size, width, length, resolution, forward)
    
    ret=np.sum(tmp_map * mask)
    return ret

In [2]:
class Dynamic_Window_Approach:
    max_distance=2.0
    distance_step=0.5
    max_steer=30
    steer_step=5
    threshold=10
    
    def __init__(self, car_width, car_length, map_size=(2048,2048), resolution=20):
        self.width=car_width
        self.length=car_length
        self.map_size=np.array(map_size)
        self.resolution=resolution
        
    def __call__(self, map, pose, direction):
        dist=np.sqrt(np.sum((np.array((pose[1],-pose[0]))-self.destination)**2))
        print(np.array(pose),dist)
        if dist>=0.5:
            ret_steer=-self.max_steer
            ret_dist=float('inf')
            ret_kernel=None
            ret_colls=0
            sw=False
            if np.sum(self.map_size == np.array(map.shape[:2])) != 2: self.map_size=np.array(map.shape[:2])

            for i, distance in enumerate(np.arange(self.distance_step, self.max_distance+1, self.distance_step)[::-1]):
                for steer in np.arange(-self.max_steer, self.max_steer+1, self.steer_step):
                    colls, dist, kernel = self.check_collision(map, pose, direction, steer, distance, forward=True)
                    if colls<=self.threshold:
                        if ret_dist>dist:
                            sw=True
                            ret_steer=steer
                            ret_dist=dist
                            ret_kernel=kernel
                            ret_colls=colls
                if sw: break

            #imshow("TEST",ret_kernel)
            #print(ret_colls)
            return ret_steer if sw else None
        else:
            return None
        
    @property
    def destination(self):
        return (self.__dest-self.map_size/2)/self.resolution
    
    @destination.setter
    def destination(self, coord):
        if type(coord) in (tuple,list):
            self.__dest=self.map_size/2+np.array(coord)*self.resolution
        else: raise ValueError("Please set a tuple or list.")
        
    def circular_mask(self, size, center, min_radius, max_radius, distance=1, forward=True):
        h=size[0]
        w=size[1]

        Y, X = np.ogrid[:h, :w]
        dist_from_center = np.sqrt((X - center[0])**2 + (Y-center[1])**2)
        maskR=(dist_from_center >= min_radius) & (dist_from_center <= max_radius)
        return maskR
    
    def range_mask(self, size, center, radius, coord, theta, distance=1, forward=True, right=True):
        h=size[0]
        w=size[1]

        Y, X = np.ogrid[:h, :w]

        tc=(distance%(2*np.pi*radius))/radius
        tc=tc if forward else (-tc)%(2*np.pi)
        tc=tc if right else (-tc)%(2*np.pi)

        fA=lambda x,y:y>np.tan(max((theta%(2*np.pi)+tc)%(2*np.pi),1e-7))*(x-center[0])+center[1]
        fC=lambda x,y:y<=np.tan(max(theta%(2*np.pi),1e-7))*(x-center[0])+center[1]

        maskA=fA(X,Y)
        maskC=fC(X,Y)

        tX=(coord[0]-center[0])*np.cos(tc)-(coord[1]-center[1])*np.sin(tc)+center[0]
        tY=(coord[0]-center[0])*np.sin(tc)+(coord[1]-center[1])*np.cos(tc)+center[1]

        if not fA(coord[0],coord[1]): maskA=~maskA

        if not fC(tX,tY): maskC=~maskC

        maskR=maskA & maskC

        if tc>np.pi: maskR=~maskR
        if not forward: maskR=~maskR
        if not right: maskR=~maskR

        return maskR
    
    def path_mask(self, pose, direction, steer, distance, size, forward=True):
        if steer==0:
            tf_direction=direction if forward else (direction+2)%2-1

            w=size[0]
            h=size[1]

            Y, X = np.ogrid[:h, :w]

            mask=None

            if tf_direction==0:
                xS=-self.width*self.resolution/2+pose[0]
                xE=self.width*self.resolution/2+pose[0]
                yS=-distance+pose[1]
                yE=pose[1]

                mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
            elif tf_direction in [-1,1]:
                xS=-self.width*self.resolution/2+pose[0]
                xE=self.width*self.resolution/2+pose[0]
                yS=pose[1]
                yE=distance+pose[1]

                mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
            elif tf_direction==0.5:
                xS=pose[1]
                xE=distance+pose[1]
                yS=-self.width*self.resolution/2+pose[1]
                yE=self.width*self.resolution/2+pose[1]

                mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
            elif tf_direction==-0.5:
                xS=-distance+pose[1]
                xE=pose[1]
                yS=-self.width*self.resolution/2+pose[1]
                yE=self.width*self.resolution/2+pose[1]

                mask=(xS<=X) & (X<=xE) & (yS<=Y) & (Y<=yE)
            else:
                theta_x=((tf_direction-0.5)%2)*np.pi
                theta_y=(tf_direction%2)*np.pi
                xG=np.tan(theta_x)
                yG=np.tan(theta_y)

                alpha=lambda x,y:((x)*np.cos(theta_y)-(y)*np.sin(theta_y))+pose[0]
                beta=lambda x,y:((x)*np.sin(theta_y)+(y)*np.cos(theta_y))+pose[1]

                x1=lambda x:xG*(x-alpha(-self.width*self.resolution/2,0))+beta(-self.width*self.resolution/2,0)
                x2=lambda x:xG*(x-alpha(self.width*self.resolution/2,0))+beta(self.width*self.resolution/2,0)

                y1=lambda x:yG*(x-alpha(0,-distance))+beta(0,-distance)
                y2=lambda x:yG*(x-alpha(0,0))+beta(0,0)

                xS=Y>=x1(X) if tf_direction>0 else Y<=x1(X)
                xE=Y<=x2(X) if tf_direction>0 else Y>=x2(X)
                yS=Y>=y1(X) if abs(tf_direction)<0.5 else Y<=y1(X)
                yE=Y<=y2(X) if abs(tf_direction)<0.5 else Y>=y2(X)

                mask=xS & xE & yS & yE

            return mask
        else:
            tf_direction=(direction%2)*np.pi

            tf_steer=max(np.radians(steer%360),1e-2)

            rad_M=np.array((self.length/np.tan(tf_steer),self.length))*self.resolution
            rotation_kernel=np.array([[np.cos(tf_direction),-np.sin(tf_direction)],
                                     [np.sin(tf_direction),np.cos(tf_direction)]])
            rad_M=np.matmul(rotation_kernel,rad_M) + pose

            wheels=np.array([
                ((-self.width/2),0),
                ((self.width/2),0),
                ((-self.width/2),self.length),
                ((self.width/2),self.length)
            ])*self.resolution

            rads=np.sqrt(np.sum((wheels-(np.array((self.length/np.tan(tf_steer),self.length))*self.resolution))**2, axis=-1))

            min_rad=np.min(rads)
            max_rad=np.max(rads)
            avg_rad=np.average(rads)

            return self.range_mask(size,rad_M,avg_rad,pose,tf_direction,distance,forward,steer>=0) & self.circular_mask(size,rad_M,min_rad,max_rad)

    def check_collision(self, bin_map, pose, direction, steer, distance, forward=True):
        size=np.array((len(bin_map[0]),len(bin_map)))

        tf_pose=size/2-np.array([pose[0],-pose[1]])*self.resolution

        coverage=int(distance*self.resolution*1.1)
        xS, yS= tf_pose-coverage
        xE, yE= tf_pose+coverage

        tmp_map=bin_map[int(xS):int(xE), int(yS):int(yE)]

        size=np.array((len(tmp_map[0]),len(tmp_map)))

        mask_pose=(coverage,coverage)

        mask=self.path_mask(mask_pose, direction, steer, distance*self.resolution, size, forward)

        y,x=np.ogrid[:coverage*2, :coverage*2]
        x=np.array(x,np.float32)
        y=np.array(y,np.float32)
        x+=xS-coverage
        y+=yS-coverage
        
        dist=np.min(np.sqrt((x-self.__dest[0])**2+(y-self.__dest[1])**2)[mask])
        
        #imshow("TEST",np.ones(mask.shape)*(tmp_map|mask)*255)
        ret=np.sum(tmp_map * mask)
        return (ret, dist, np.ones(mask.shape)*(tmp_map|mask)*255)

In [3]:
DWA=Dynamic_Window_Approach(0.55,0.33)
DWA.destination=(0,-5)
DWA.threshold=0
DWA.max_steer=20

In [4]:
DWA.destination=(0,0)

In [4]:
import hector

In [5]:
slam=hector.SLAM()

In [6]:
import rospy
from sensor_msgs.msg import Joy
from threading import Thread, Lock

lock=Lock()

class Controller:
    steer=0
    speed=0

    def __init__(self):
        self._js_signal=None
        self.intc_thread=None
        self.ros_thread=None
        #rospy.init_node('virtual_joy', anonymous = True)
        self.rate = rospy.Rate(10) #10hz

    def __ros_thread(self):
        self.pub = rospy.Publisher('/joy',Joy,queue_size=10)
        
        while not rospy.is_shutdown():
            value = Joy()
            lock.acquire()
            value.axes=[0, self.speed, self.steer, -0.0, -0.0, 0.0]
            lock.release()
            value.buttons=[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
            self.pub.publish(value)
            self.rate.sleep()

    def __call__(self, current_speed=None, current_steer=None):
        if self.ros_thread is None:
            self.ros_thread = Thread(target=self.__ros_thread, daemon=True)
            self.ros_thread.start()
        
        if current_speed is not None:
            lock.acquire()
            self.speed=current_speed
            lock.release()

        if current_steer is not None:
            lock.acquire()
            self.steer=current_steer
            lock.release()

In [7]:
c=Controller()

In [9]:
try:
    while True:
        pose=(-slam.pose['position']['x'], slam.pose['position']['y']) # 0.05m = 1 pixel
        direction=-slam.pose['orientation']['yaw']/np.pi # -1 ~ 1
        print(direction)
        log=time.time()
        steer=DWA(slam.map, pose, direction)
        if steer is not None:
            steer/=-20.
            c(0.4,steer)
            print(-steer,time.time()-log)
        else:
            c(0,0)
        
except KeyboardInterrupt:
    c(0,0)

-0.09255592618753236
[0.08773804 0.30099487] 4.9214749319347915
-0.5 0.06143951416015625
-0.09278405732382733
[0.07721329 0.30067062] 4.9319602433730765
-0.5 0.0411524772644043
-0.09278405732382733
[0.07721329 0.30067062] 4.9319602433730765
-0.5 0.05607199668884277
-0.09248791990573163
[0.08185577 0.3017807 ] 4.927394262667871
-0.5 0.05929827690124512
-0.09287926975739402
[0.08268356 0.30212402] 4.926589070020216
-0.5 0.042244911193847656
-0.09287926975739402
[0.08268356 0.30212402] 4.926589070020216
-0.5 0.03299283981323242
-0.09287926975739402
[0.08268356 0.30212402] 4.926589070020216
-0.5 0.04600262641906738
-0.0928719391882799
[0.08563232 0.30059814] 4.9235524671987845
-0.5 0.05583977699279785
-0.09248968546390497
[0.09230804 0.30214691] 4.916984145483223
-0.5 0.07916784286499023
-0.09093581864797863
[0.12051392 0.30461502] 4.888985043407771
-0.5 0.04151558876037598
-0.10069610485903495
[0.16954041 0.28556442] 4.83889315192483
-0.5 1.2997872829437256
-0.19590863046316784
[0.5627021

In [14]:
c(0,0)

In [9]:
width=0.25 #m
length=0.33 #m

resolution=20

pose=(0.05, 0.00) # 0.05m = 1 pixel
direction=0.1 # -1 ~ 1
steer=0

In [10]:
#mask=path_mask(pose, direction, steer, 1, map_size, width, length, resolution=20, forward=True)
bin_map=np.zeros((1000,1000))
bin_map[500, :]=1
#bin_map[470:527, 451:508]=1
log=time.time()
mask=check_collision(bin_map, pose, -0.5, steer, 2, width, length, resolution=20, forward=True)
print(time.time()-log)
#mask=bin_map[470:527, 451:508] * mask
#print(mask)
imshow("Test",(np.ones(mask.shape)*mask*255),2000,2000)

TypeError: ufunc 'bitwise_or' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

In [13]:
width=0.25 #m
length=0.33 #m

resolution=20

In [14]:


while True:
    pose=(slam.pose['position']['x'], slam.pose['position']['y']) # 0.05m = 1 pixel
    direction=-slam.pose['orientation']['z'] # -1 ~ 1
    steer=15
    #bin_map=np.zeros((1000,1000))
    #bin_map[500, :]=1
    log=time.time()
    for _ in range(50):check_collision(slam.map, pose, direction, steer, 2, width, length, resolution=20, forward=True)
    mask=check_collision(slam.map, pose, direction, steer, 2, width, length, resolution=20, forward=True)
    print(time.time()-log)

    #print(mask)
    mask=(np.ones(mask.shape)*mask*255)
    tf_pose=np.array(mask.shape)/2-np.array([pose[0],-pose[1]])*resolution
    
    mask[int(tf_pose[1]), int(tf_pose[0])]=150
    imshow("Test",mask,2000,2000)
    time.sleep(0.01)


0.09223389625549316


Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

0.08787703514099121
0.08423209190368652
0.08521866798400879
0.0883474349975586
0.09069299697875977
0.08846592903137207
0.08704638481140137
0.08528852462768555
0.08570194244384766
0.08912062644958496
0.09343695640563965
0.09165000915527344


KeyboardInterrupt: 

In [71]:
x, y=np.ogrid[:10,:10]

In [80]:
a=np.zeros((10,10))
a[5:,5:]=True
a=np.where(a,True,False)

In [88]:
d=(-3,5)

In [90]:
b=np.sqrt((x-d[0])**2+(y-d[1])**2)

In [96]:
np.zeros((10,10))[np.array([[True],[True],[True]])]

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0.])

In [134]:
np.arange(0.1,0.5,0.01)

array([0.1 , 0.11, 0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.2 ,
       0.21, 0.22, 0.23, 0.24, 0.25, 0.26, 0.27, 0.28, 0.29, 0.3 , 0.31,
       0.32, 0.33, 0.34, 0.35, 0.36, 0.37, 0.38, 0.39, 0.4 , 0.41, 0.42,
       0.43, 0.44, 0.45, 0.46, 0.47, 0.48, 0.49])