Import

In [1]:
import carla, cv2
import numpy as np
from skimage.measure import LineModelND, ransac

Setup

In [2]:
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)

world = client.get_world()
spectator = world.get_spectator()

Funzioni base

In [3]:
def move_spectator_to(transform, distance=5.0, x=0, y=0, z=4, yaw=0, pitch=-30, roll=0):
    back_location = transform.location - transform.get_forward_vector() * distance
    back_location.x += x
    back_location.y += y
    back_location.z += z
    transform.rotation.yaw += yaw
    transform.rotation.pitch = pitch
    transform.rotation.roll = roll
    spectator_transform = carla.Transform(back_location, transform.rotation)
    spectator.set_transform(spectator_transform)

def spawn_vehicle(vehicle_index=0, spawn_index=0, pattern='vehicle.mercedes.coupe_2020', rotation=None):
    blueprint_library = world.get_blueprint_library()
    vehicle_bp = blueprint_library.filter(pattern)[vehicle_index]
    spawn_point = world.get_map().get_spawn_points()[spawn_index]
    if rotation:
        spawn_point.rotation = rotation
    vehicle = world.spawn_actor(vehicle_bp, spawn_point)
    return vehicle

def draw_on_screen(world, transform, content='O', color=carla.Color(0, 255, 0), life_time=20):
    world.debug.draw_string(transform.location, content, color=color, life_time=life_time)

def spawn_camera(attach_to=None, transform=carla.Transform(carla.Location(x=1.2, z=1.2), carla.Rotation(pitch=-10)), fov=90.0, width=800, height=600, sensor_tick=0.0):
    camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
    camera_bp.set_attribute('image_size_x', str(width))
    camera_bp.set_attribute('image_size_y', str(height))
    camera_bp.set_attribute('fov', str(fov))
    camera_bp.set_attribute('sensor_tick', str(sensor_tick))
    camera = world.spawn_actor(camera_bp, transform, attach_to=attach_to)
    return camera

def remove_all(world):
    for a in world.get_actors().filter('vehicle.*'):
        a.destroy()
    for a in world.get_actors().filter('sensor.*'):
        a.destroy()

def proportional_spaced_array(pts, min_value, max_value):
    delta_dist = max_value - min_value
    pts_max = pts[0]
    pts_min = pts[len(pts) - 1]
    delta_pts = pts_max - pts_min

    buffer = []
    for i in range(0, len(pts)):
        buffer.append(delta_dist * (pts_max - pts[i]) / delta_pts + min_value)

    return buffer

Warp immagine

In [4]:
class ImageWarp():

    def __init__(self,img_h=240, img_w=320, offset=150, src=[[50, 240], [200, 240], [0, 0], [320, 0]],dst=[[135, 240], [150, 240], [0, 0], [320, 0]]):
        self.img_h = img_h
        self.img_w = img_w
        self.warp_offset = offset
        self.src = np.float32(src)
        self.dst = np.float32(dst)
        self.wmat = cv2.getPerspectiveTransform(self.src, self.dst)
        self.wmat_inv = cv2.getPerspectiveTransform(self.dst,self.src)

    def img_warp(self, iimg,inv=False, offset=False):
        """
        Warps an image based on the input parameters

        Args:
            iimg ([type]): RGB / Gray image
            inv (bool, optional): invers transformation. Defaults to False.
            offset (bool, optional): use offset for warping the image. Defaults to False.

        Returns:
            [type]: warped image
        """

        ret = []
        timg = None

        if offset == True:
            timg = iimg[self.warp_offset:self.warp_offset+self.img_h, 0:self.img_w]
        else:
            timg = iimg

        if inv == False:
            ret = cv2.warpPerspective(timg,self.wmat , (self.img_w, self.img_h))
        else:
            ret = cv2.warpPerspective(timg,self.wmat_inv , (self.img_w, self.img_h))
        return ret

    def pts_unwarp(self,pts):
        """
        Backprojects points from warped image to un-warped image

        Args:
            pts ([type]): points to backproject

        Returns:
            [type]: backprojected points
        """
        return cv2.perspectiveTransform (pts, self.wmat_inv)

Interpolatore linee

In [5]:
class Interpolator():

    def __init__(self,max_poly_degree=3):
        self.max_poly_degree=max_poly_degree

    def interpolate(self,indata=dict(), ip_params={'start':0, 'stop':240, 'steps':20},key='mid', nr_interpolated_points=240, equ_selector=False, debug=False):
        """
        Takes detected points, find the corresponding polynom fits the data.

        Args:
            indata ([type], optional): detected points. Defaults to dict().
            ip_params (dict, optional): interpolation info. Defaults to {'start':0, 'stop':240, 'steps':20}.
            key (str, optional): Name of the line. Defaults to 'mid'.
            nr_interpolated_points (int, optional): Max number of interpolation points. Defaults to 240.
            equ_selector (bool, optional): search the best fitting equation (line / curve, etc.). Defaults to False.
            debug (bool, optional): Collects debug info. Defaults to False.

        Returns:
            [type]: interpolated points
        """
        data = np.array(indata[0][key])

        x_coord = data[:,0]
        y_coord = data[:,1]

        min_mse_pos = self.max_poly_degree

        # polynomial degree selector
        if equ_selector == True:
            # find the best fit
            best_poly = []
            best_fit = []
            for i in range(0,self.max_poly_degree):
                pfit = np.polyfit(y_coord,x_coord,i)
                polynom = np.poly1d(pfit)
                test_y = polynom(x_coord)
                difference= y_coord-test_y
                st_d = np.std(difference)
                best_poly.append(st_d)
                best_fit.append((pfit, polynom))

            # select best poly
            min_mse_pos = np.argmin(np.array(best_poly))

        # order start from 1, position from 0
        pfit = np.polyfit(y_coord,x_coord,min_mse_pos)
        polynom = np.poly1d(pfit)
        # pfit, polynom = best_fit[min_mse_pos]

        y_ipp = np.float32(np.linspace(ip_params['start'],ip_params['stop'],ip_params['steps']))
        x_ipp = polynom(y_ipp)

        ply_coords = np.column_stack((x_ipp,y_ipp))
        return {key:ply_coords}

    def echidistant_lane(self, warp_obj, unwarped_pts=None, lane_side=1):
        buffer = []
        dist = 115
        dist = proportional_spaced_array(unwarped_pts[0][:, 1], 115, 250)
        for i in range(0, len(unwarped_pts[0])):
            x = unwarped_pts[0][i][0]
            y = unwarped_pts[0][i][1]

            nx = x
            nx = nx - dist[i] * lane_side

            norm_pts = np.float32(np.column_stack((nx, y)))
            buffer.append(norm_pts)

        buffer = np.array(buffer, dtype=np.float32)
        buffer = cv2.perspectiveTransform(buffer, warp_obj.wmat)

        return np.array(buffer, dtype=np.float32)

Detector

In [6]:
class Detector():
    def __init__(self, scan_range={'start':0,'stop':240,'steps':20},scan_window={'height':15,'max_adjust':10}):
        self.scan_range = scan_range
        self.scan_window = scan_window
        self.model = LineModelND()
        self.debug = []

    def img_filter(self,iimg):
        """
        Filters an RGB image to get the lane boundaries

        Args:
            iimg ([type]): RGB image

        Returns:
            [type]: RGB image
        """
        iimg = cv2.cvtColor(iimg,cv2.COLOR_BGR2GRAY)
        ciimg = cv2.Canny(iimg,50,250)
        ciimg = cv2.morphologyEx(ciimg,cv2.MORPH_DILATE,(1,1),iterations = 5)

        siimg = cv2.Sobel(iimg,cv2.CV_8U,1,0,ksize=1)
        siimg = cv2.morphologyEx(siimg,cv2.MORPH_DILATE,(1,1),iterations = 5)
        ret,siimg= cv2.threshold(siimg,50,255,cv2.THRESH_OTSU)
        siimg = cv2.morphologyEx(siimg, cv2.MORPH_CLOSE, (3,3))

        iimg= cv2.bitwise_and(ciimg,ciimg,mask=siimg)

        return iimg

    def get_lane_detections(self,iimg,start={'x':105, 'y':230},stop={'x':135, 'y':230},label='mid', use_RANSAC=True, debug=False):
        """
        Parses the input image, with virtual sensors, detects the peeks and save the points

        Args:
            iimg ([type]): 1 channel gray image
            start (dict, optional): detection area start. Defaults to {'x':105, 'y':230}.
            stop (dict, optional): detection area start. Defaults to {'x':135, 'y':230}.
            label (str, optional): name of the line. Defaults to 'mid'.
            use_RANSAC (bool, optional): Use RANSAC. Defaults to True.
            debug (bool, optional): collect debug info. Defaults to False.

        Returns:
            [type]: detections coordinates
        """

        adjust = 0
        minx = min(start['x'],stop['x'])
        maxx = max(start['x'],stop['x']) + adjust
        detections = []
        for i in range (self.scan_range['start'],self.scan_range['stop'],self.scan_range['steps']):
            # detections y coordinate
            y = start['y']-i

            # get detections from a line
            det_line = iimg[y:y+self.scan_window['height'], minx:maxx]

            # scan an image segment, sum detection
            hist = np.sum(det_line, axis=0)

            # get peek location
            peek = np.argmax(hist)
            # peaks, _ = find_peaks(hist, height=np.average(hist))

            # define threshold = average, find peeks, 
            # update 'sensor location'= peek centered
            if hist[peek] > np.average(hist):
            # if len(peaks) > 0:
            #     # Select the central peak
            #     peak = peaks[len(peaks) // 2]

                x1 = minx+peek
                y1 = y
                det_mid_x = minx+len(hist)//2

                adjust = x1 - det_mid_x

                # apply adjust only if in defined range
                if np.abs(adjust) >= self.scan_window['max_adjust']:
                    sing = np.sign(adjust)
                    adjust = sing * self.scan_window['max_adjust']

                minx += adjust
                maxx += adjust

                #self.buffer.append({label:[x1,y1]})
                detections.append([x1,y1])
                if debug == True:
                    self.debug.append({'detection':[x1,y1], 'detection_mid':[det_mid_x,y1],'rectangle': [minx,y1,minx+len(hist),y1+self.scan_window['height']]}) 

        if use_RANSAC == True:
            _, inliers = self.filter_outliers(detections)
            if inliers is not None:
                detections = np.array(detections)[inliers]

        return detections
        # return {label:detections}

    def filter_outliers(self,data):
        """
        apply RUNSAC

        Args:
            data ([type]): data points

        Returns:
            [type]: filtered list
        """
        model_robust = None
        inliers = None

        data = np.array(data)
        self.model.estimate(data)
        try:
            model_robust, inliers = ransac(data, LineModelND, min_samples=2, residual_threshold=1, max_trials=200)
        except:
            pass

        return model_robust,inliers

    def draw_detections(self, iimg,data):
        """
        Visualize detections

        Args:
            iimg ([type]): RGB image
            data ([type]): points detected

        Returns:
            [type]: RGB image
        """
        limg = iimg.copy()
        for v in data:
            cv2.circle(limg,(v[0],v[1]),2,[255],-1)
        if len(self.debug)>0:
            for v in self.debug:
                # window center
                cv2.circle(limg, (v['detection_mid'][0],v['detection_mid'][1]+5),1,[255])
                # detection rectangle
                cv2.rectangle(limg,(v['rectangle'][0],v['rectangle'][1]),(v['rectangle'][2],v['rectangle'][3]),[255],1)

        return limg


Settaggi per il processing

In [7]:
src = [[70, 240], [430, 240], [0, 0], [500, 0]]
dst = [[155, 270], [165, 270], [0, 0], [320, 0]]
scan_range= {'start': 0, 'stop': 240, 'steps': 10}
scan_window= {'height': 8, 'max_adjust': 8}
offset = 150
image_width = 500

lanes = [
    {'label': 'mid', 'detections': {'start': {'x': 120, 'y': 230}, 'stop': {'x': 160, 'y': 230}}},
    {'label': 'right', 'detections': {'start': {'x': 160, 'y': 230}, 'stop': {'x': 200, 'y': 230}}}
]

Oggetti per il processing

In [8]:
warper = ImageWarp(img_w=image_width, offset=offset, src=src, dst=dst)
detector = Detector(scan_range=scan_range, scan_window=scan_window)
interpolator = Interpolator(max_poly_degree=2)

Processing dell'immagine

In [9]:
def process_image(image, res):
    img = image.copy()
    f_img = detector.img_filter(img)
    f_w_img = warper.img_warp(f_img,offset=True)
    try:
        img = image.copy()
        # img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)

        f_img = detector.img_filter(img)
        f_w_img = warper.img_warp(f_img,offset=True)

        color = [1,1,1]

        img_detected_points = f_w_img.copy()
        # img_line_fitted = f_w_img.copy()
        debug = f_w_img.copy()

        detected_points = {}
        for i, lane in enumerate(lanes):
            detected_points[lane['label']] = detector.get_lane_detections(f_w_img, start=lane['detections']['start'], stop=lane['detections']['stop'], label=lane['label'], use_RANSAC=True, debug=True)

            # Disegna l'area di valutazione delle corsie
            start_x = lane['detections']['start']['x']
            start_y = lane['detections']['start']['y']
            stop_x = lane['detections']['stop']['x']
            stop_y = lane['detections']['stop']['y']
            cv2.rectangle(debug, (start_x, start_y), (stop_x, stop_y), (255, 255, 255), 2)

        lane = None
        if detected_points['mid'].shape[0] > detected_points['right'].shape[0]:
            lane = lanes[0]
        else:
            lane = lanes[1]

        img_detected_points = detector.draw_detections(img_detected_points, detected_points[lane['label']])

        interpolated_points = interpolator.interpolate([detected_points], key=lane['label'], equ_selector=False, debug=False)

        pts = np.array([interpolated_points[lane['label']]])
        cv2.polylines(debug, [np.int32(pts)], False, [255], 2)

        unwarped_pts = np.int32(warper.pts_unwarp(pts))
        unwarped_pts_offset = np.add(unwarped_pts, [0, offset])

        color[i] = 255
        cv2.polylines(img, [unwarped_pts_offset], False, color, 2)

        # ESTIMATE FROM BEST LANE
        ed_pts = np.float32(interpolator.echidistant_lane(warp_obj=warper, unwarped_pts=unwarped_pts, lane_side=1 if lane['label'] == 'right' else -1))
        cv2.polylines(debug, [np.int32(ed_pts)], False, [255], 2)

        ed_unwarped_pts = np.int32(warper.pts_unwarp(ed_pts))
        ed_unwarped_pts_offset = np.add(ed_unwarped_pts, [0, offset])

        cv2.polylines(img, [ed_unwarped_pts_offset], False, color, 2)

        cv2.rectangle(debug, (160, 220), (160, 250), (255, 255, 255), 2)
        concat_img = cv2.hconcat([img, cv2.cvtColor(debug[:, :320], cv2.COLOR_GRAY2RGB)])


        res.put(concat_img)
    except Exception as e:
        print(e)
        start_x = lane['detections']['start']['x']
        start_y = lane['detections']['start']['y']
        stop_x = lane['detections']['stop']['x']
        stop_y = lane['detections']['stop']['y']
        cv2.rectangle(f_w_img, (start_x, start_y), (stop_x, stop_y), (255, 255, 255), 2)
        concat_img = cv2.hconcat([image, cv2.cvtColor(f_w_img[:, :320], cv2.COLOR_GRAY2RGB)])
        res.put(concat_img)

Import per il main

In [10]:
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import manual_control
from importlib import reload
reload(manual_control)

pygame 2.6.1 (SDL 2.28.4, Python 3.7.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


<module 'manual_control' from 'c:\\Users\\emanu\\Desktop\\universita\\Smart-vehicular-systems\\lane-departure-prevention-CARLA\\manual_control.py'>

Setup per il main

In [11]:
camera_width = 800
camera_height = 600

crop_width = 500
crop_height = 240
height_adjust = 50

In [12]:
remove_all(world)

processed_output = Queue()

game_loop = manual_control.setup()

vehicle = world.get_actors().filter('vehicle.*')[0]
front_camera = spawn_camera(attach_to=vehicle, transform=carla.Transform(
        carla.Location(x=0.3, y=0.0, z=1.5), # specchietto retrovisore -> x=0.3, y=0.0, z=1.5), logo = x=2.3, z=0.7
        carla.Rotation(pitch=-10.0)
    ),
    sensor_tick=0.1,
    width=camera_width, height=camera_height
)

video_output = np.zeros((camera_height, camera_width, 4), dtype=np.uint8)

cv2.namedWindow('Processed', cv2.WINDOW_NORMAL)

def camera_callback(image):
    global video_output

    try:
        video_output = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
        video_output = video_output[:, :, :3]

        # Crop the image (zoom)
        start_x = (video_output.shape[1] - crop_width) // 2
        start_y = (video_output.shape[0] - crop_height) // 2 - height_adjust
        cropped_img = video_output[start_y:start_y + crop_height, start_x:start_x + crop_width]
        try:
            with ThreadPoolExecutor() as executor:
                executor.submit(
                    lambda: process_image(cropped_img, processed_output),
                )
        except Exception as e:
            print(e)

    except Exception as e:
        print(e.with_traceback())

INFO: listening to server 127.0.0.1:2000



Welcome to CARLA manual control.

Use ARROWS or WASD keys for control.

    W            : throttle
    S            : brake
    A/D          : steer left/right
    Q            : toggle reverse
    Space        : hand-brake

    L            : toggle next light type
    Z/X          : toggle right/left blinker

    TAB          : change sensor position
    ` or N       : next sensor
    [1-9]        : change to sensor [1-9]
    C            : change weather (Shift+C reverse)

    F1           : toggle HUD
    ESC          : quit



Esecuzione

In [13]:
front_camera.listen(lambda image: camera_callback(image))

try:
    game_loop.start(processed_output, autopilot=True)
except KeyboardInterrupt:
    print('\nCancelled by user. Bye!')
    cv2.destroyAllWindows()
finally:
    cv2.destroyAllWindows()




attempt to get argmax of an empty sequence
'list' object has no attribute 'shape'




'list' object has no attribute 'shape'
'list' object has no attribute 'shape'
At least 2 input points needed.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at least 2D.
Input data must be at leas