In [1]:
import cv2
import os.path
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict

%matplotlib inline

In [2]:
class Camera:
    def __init__(self, calibrate_dir, source_dir=None, output_dir=None, is_video=False, nx=9, ny=6):
        self.video = False
        self.set_source(calibrate_dir)
        self.mtx, self.dist, self.rvecs, self.tvecs = self.calibrate(nx=9, ny=6)
        self.video = is_video
        self.set_source(source_dir)
        self.set_output(output_dir)
        
    def calibrate(self, nx=9, ny=6):
        obj_list = list()
        img_list = list()

        object_points = np.zeros((nx*ny, 3), np.float32)
        object_points[:,:2] = np.mgrid[0:nx, 0:ny].T.reshape(-1, 2)

        # find corners
        for file, image in self.read(False):
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            image_shape = gray.shape
            ret, corners = cv2.findChessboardCorners(gray, (nx, ny), None)
            if ret:
                img_list.append(corners)
                obj_list.append(object_points)
        ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(obj_list, img_list, image_shape[::-1], None, None)
        return mtx, dist, rvecs, tvecs
    
    def set_video(self, is_video):
        self.video = is_video
    
    def set_source(self, source_dir):
        if self.video:
            self.source = cv2.VideoCapture(source_dir)
        else:
            self.source = source_dir
        
    def set_output(self, output_dir):
        if self.video:
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            fps = 24
            frame = (1280, 720)
            is_color = 1
            self.output = cv2.VideoWriter(output_dir, fourcc, fps, frame, is_color)
        else:
            self.output = output_dir
    
    def write(self, filename, image):
        if self.video:
            ret = self.output.write(image)
        else:
            ret = cv2.imwrite(self.output + filename, image)
        return ret
    
    def read(self, distorted=True):
        if self.video:
            ret, img = self.source.read()
            while ret:
                if distorted:
                    img = self.undistort(img)
                yield "", img
                ret, img = self.source.read()
        else:
            image_files = os.listdir(self.source)
            for file in image_files:
                img = cv2.imread(self.source + '/' + file)
                if distorted:
                    img = self.undistort(img)
                yield file, img
    def close(self):
        ret = self.output.release()
    
    def undistort(self, image):
        img = cv2.undistort(image, self.mtx, self.dist, None, self.mtx) # Undistort image
        return img

In [3]:
class Perspective_Transform:
    def __init__(self, src = [[570, 450],[730, 450],[260,600],[1055,600]],
        dst = [[50, 50],[1230, 50],[50,700],[1230,700]]):
        self.src = np.array(src, np.float32)
        self.dst = np.array(dst, np.float32)
        self.set_M()
    
    def set_map(self, pnt_map, src=True):
        if src:
            self.src = pnt_map
        else:
            self.dst = pnt_map
        self.set_M()
    
    def set_M(self):
        self.M = cv2.getPerspectiveTransform(self.src, self.dst)
        self.Minv = cv2.getPerspectiveTransform(self.dst, self.src)
        return True
    
    def apply(self, image, reverse=False):
        img_size = image.shape
        img_size = (img_size[1], img_size[0])
        if reverse:
            return cv2.warpPerspective(image, self.Minv, img_size, flags=cv2.INTER_LINEAR)
        return cv2.warpPerspective(image, self.M, img_size, flags=cv2.INTER_LINEAR)

In [4]:
class Lane:
    def __init__(self):
        self.left_y = list()
        self.left_x = list()
        self.right_y = list()
        self.right_x = list()
        self.width_start = list()
        self.width_end = list()
        self.curve_rad = 0

    def find_base(self, image, base_l, base_r):
        hist = np.sum(image[image.shape[0]//2:,:], axis=0)
        hist_split = hist.shape[0]//3
        left = np.argmax(hist[:hist_split])
        right = np.argmax(hist[-hist_split:]) + hist_split*2
        return left, right
    
    def search_lines(self, image):
        chk_rng = 20
        i_shape = image.shape
        h_step = i_shape[0]//100
        base_l, base_r = self.find_base(image, None, None)
        self.center = base_r - base_l
        line_blank = np.zeros_like(image)
        
        left_y = list()
        left_x = list()
        right_y = list()
        right_x = list()
        for i in range(i_shape[0]-1, 0, -h_step):
            frame = image[max(i-h_step,0):min(i,i_shape[0]),
                          max(base_l-chk_rng, 0):min(base_l+chk_rng,i_shape[1]-1)]
            hist = np.sum(frame[frame.shape[0]//2:,:], axis=0)
            hist_range = hist.shape[0]
            chk_win = np.asarray([x-(hist_range//2) for x in range(hist_range)], np.float32)
            shift_l = int(np.mean(hist * chk_win))

            frame = image[max(i-h_step,0):min(i,i_shape[0]),
                          max(base_r-chk_rng,0):min(base_r+chk_rng,i_shape[1]-1)]
            hist = np.sum(frame[frame.shape[0]//2:,:], axis=0)
            hist_range = hist.shape[0]
            chk_win = np.asarray([x-(hist_range//2) for x in range(hist_range)], np.float32)
            shift_r = int(np.mean(hist * chk_win))

            if shift_l == 0:
                shift_l = shift_r
            elif shift_r == 0:
                shift_r = shift_l

            base_l = max(base_l + shift_l, (chk_rng + 1))
            base_r = min(base_r + shift_r, i_shape[1] - (chk_rng + 1))
            
            line_blank[max(i-h_step,0):min(i,i_shape[0]),
                       max(base_l-5,0):min(base_l+5,i_shape[1]-1)] = 1
            line_blank[max(i-h_step,0):min(i,i_shape[0]),
                       max(base_r-5,0):min(base_r+5,i_shape[1]-1)] = 1

            left_y.append(i)
            left_x.append(base_l)
            right_y.append(i)
            right_x.append(base_r)
        self.left_y.append(np.asarray(left_y, np.uint32))
        self.left_x.append(np.asarray(left_x, np.uint32))
        self.right_y.append(np.asarray(right_y, np.uint32))
        self.right_x.append(np.asarray(right_x, np.uint32))
        self.width_start.append(right_x[0] - left_x[0])
        self.width_end.append(right_x[-1] - left_x[-1])
        
        if len(self.left_x) > 10:
            _ = self.left_y.pop(0)
            _ = self.left_x.pop(0)
            _ = self.right_y.pop(0)
            _ = self.right_x.pop(0)
            _ = self.width_start.pop(0)
            _ = self.width_end.pop(0)
        return line_blank
    
    def line_test(self, warp, y_, x_):
        image = warp.shape
        for y, x in zip(y_,x_):
            y = np.int(y)
            x = np.int(x)
            if (x-10 >= 0) and (x+10 < image[1]) and (y-10 >= 0) and (y+10 < image[0]):
                warp[y-10:y+10, x-10:x+10] = 255
        return warp
    
    def compile_line(self, y_, x_, single=False):
        if single:
            return y_[-1], x_[-1]
        else:
            line_list = list()
            x_list = list()
            for y, x in zip(y_,x_):
                line_list.append(self.fit_line(y, x))
            y_out = np.array([a for a in range(720)])
            x_list = list()
            for l, line in enumerate(line_list):
                x_list.append(self.line_to_points(y_out, line))
            x_list = np.column_stack(x_list)
            x = np.mean(x_list, axis=1)
            return y_out, x
        
    def fit_line(self, y, x):
        return np.polyfit(y, x, 2)
    
    def find_stats(self, single=False):
        curves = list()
        width = np.mean(self.width_start + self.width_end)
       
        yft_per_pix = 100./1280. # Estimated lane length captured in images.
        xft_per_pix = 12/width # Estimated lane width captured in images.
        
        left_y = [a * yft_per_pix for a in self.left_y]
        left_x = [a * xft_per_pix for a in self.left_x]
        left_y, left_x, left_coef = self.get_points(left_y, left_x, single)
        
        right_y = [a * yft_per_pix for a in self.right_y]
        right_x = [a * xft_per_pix for a in self.right_x]
        right_y, right_x, right_coef = self.get_points(right_y, right_x, single)

        for coef, y in [(left_coef, left_y),(right_coef, right_y)]:
            for y_eval in y:
                curves.append(((1 + (2*coef[0]*y_eval + coef[1])**2)**1.5) / np.absolute(2*coef[0]))
        self.curve = np.mean(curves)
        
        lane_center = left_x[0] + (right_x[0] - left_x[0])/2
        self.lane_offset = (1280*xft_per_pix)/2 - lane_center
        return self.curve, self.lane_offset
    
    def draw_stats(self, image, single=False):
        curve, offset = self.find_stats(single)
        lane_curve = 'Lane Curve: {:06.0f} ft.'.format(curve)
        lane_offset = 'Lane Offset: {:06.2f} ft.'.format(offset)
        cv2.putText(image, lane_curve, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
        cv2.putText(image, lane_offset, (50,100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
        return image
    
    def line_to_points(self, y, coef):
        return coef[0]*y**2 + coef[1]*y + coef[2]        

    def get_points(self, y_, x_, single=False):
        y, x_ = self.compile_line(y_, x_, single)
        coef = self.fit_line(y, x_)
        x = self.line_to_points(y, coef)
        return y, x, coef
    
    def draw(self, warp, single=False):
        left_y, left_x, left_coef = self.get_points(self.left_y, self.left_x, single)
        right_y, right_x, right_coef = self.get_points(self.right_y, self.right_x, single)
        
        # Recast the x and y points into usable format for cv2.fillPoly()
        left = np.column_stack([left_x, left_y]).astype(np.int32)
        right = np.column_stack([right_x, right_y]).astype(np.int32)
        top = np.asarray([left[0], right[0]]).astype(np.int32)
        bottom = np.asarray([right[-1], left[-1]]).astype(np.int32)
        pts = np.vstack([left, top, right, bottom]).astype(np.int32)

        # Create an image to draw the lines on and draw the lane onto it
        warp_zero = np.zeros_like(warp).astype(np.uint8)
        color_warp = np.dstack((warp_zero, warp_zero, warp_zero))
        
        #print("Draw fill_poly", color_warp.shape, pts[0].shape, pts[1].shape)
        cv2.fillPoly(color_warp, [pts], (0,255,0))
        return color_warp

In [5]:
def mask_and(mask1, mask2):
    mask = np.zeros_like(mask1)
    mask[((mask1 > 0.5) & (mask2 > 0.5))] = 1
    return mask

def mask_or(mask1, mask2):
    mask = np.zeros_like(mask1)
    mask[((mask1 > 0.5) | (mask2 > 0.5))] = 1
    return mask

def absolute_sobel(img, orient='x', kernel=3, thresh=(0, 255)):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Convert to single channel (grayscale)
    gray = cv2.equalizeHist(gray)
    if orient=='x':
        sobel = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=kernel) # Calculate x gradient        
    else:
        sobel = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=kernel) # Calculate y gradient
    sobel = np.absolute(sobel)
    scaled = sobel - np.min(sobel)
    scaled = scaled * (255. / np.max(scaled))
    scaled = scaled.astype(np.uint8)
    mask =  np.zeros_like(scaled) # Create binary mask
    mask[(scaled >= thresh[0]) & (scaled <= thresh[1])] = 1 # Populate binary mask
    return mask

def hsl_select(img, channel='l', thresh=(0, 255)):
    if (channel == 'h') or (channel == 0):
        channel = 0
    elif (channel == 's') or (channel == 1):
        channel = 1
    else:
        channel = 2
    hls = cv2.cvtColor(img, cv2.COLOR_RGB2HLS)
    channel = hls[:,:,channel]
    mask = np.zeros_like(channel)
    mask[(channel >= thresh[0]) & (channel <= thresh[1])] = 1
    return mask

In [6]:
def image_pipeline(cam=Camera('./camera_cal', './test_images/', './output_images/'),
                   pers_trans=Perspective_Transform(), 
                   curr_lane=Lane()):
    for i, (f, image) in enumerate(cam.read()):
        abs_x = absolute_sobel(image, orient='x', kernel=13, thresh=(20,120))
        abs_y = absolute_sobel(image, orient='y', kernel=17, thresh=(16,128))
        hsl_l = hsl_select(image, channel = 'l', thresh=(110,255))
        img_s = mask_or(mask_and(abs_x, abs_y),hsl_l)
        pers_img = pers_trans.apply(img_s)
        img = curr_lane.search_lines(pers_img)
        draw_img = curr_lane.draw(img)#, True)
        unwarp = pers_trans.apply(draw_img, reverse=True)
        stats = curr_lane.draw_stats(image)
        
        # Combine the result with the original image and write to disk
        result = cv2.addWeighted(stats, 1, unwarp, 0.3, 0)
        cam.write(f, result)
        
cam=Camera('./camera_cal', './project_video.mp4', './output_images/project_video.avi', True)


In [7]:
ret = image_pipeline(cam)
cam.close()