In [1]:
import numpy as np
import cv2
import pickle
from collections import deque
from moviepy.editor import VideoFileClip
from IPython.display import HTML

In [2]:
class LaneTracker:
    def __init__(self):
        # load calibration parameters
        self.coeffs = pickle.load(open('coeffs.p', 'rb'))
        self.mtx = self.coeffs['mtx']
        self.dist = self.coeffs['dist']
        # transform points
        self.s_pts = np.array(((550, 480), (740, 480), (1110, 720), (200, 720)), np.float32)
        self.d_pts = np.array(((300, 0), (1000, 0), (1000, 720), (300, 720)), np.float32)
        self.M = cv2.getPerspectiveTransform(self.s_pts, self.d_pts)
        self.Minv = cv2.getPerspectiveTransform(self.d_pts, self.s_pts)
        # line coeffs to check parallel left/right
        self.a2 = 0.0003
        self.b2 = 0.35
        self.c2 = (370, 820)
        # margin for searching lines on next frame
        self.margin_find = 40
        self.margin_check = 100
        # minimum number of pixels found to recenter window
        self.minpix = 50
        # buffer
        self.buffer_left_coeffs = deque(maxlen=5)
        self.buffer_right_coeffs = deque(maxlen=5)
        # tracking lines
        self.detected = False
        # curvature
        self.left_curvature = None
        self.right_curvature = None
        self.xm_per_pix = None
    
    def undistort(self, img):
        return cv2.undistort(img, self.mtx, self.dist, None, self.mtx)

    def warp(self, img, M):
        # add contrast correction
        img = cv2.cvtColor(img, cv2.COLOR_RGB2YUV)
        img[:,:,0] = cv2.equalizeHist(img[:,:,0])
        img = cv2.cvtColor(img, cv2.COLOR_YUV2RGB)
        return cv2.warpPerspective(img, M, (img.shape[1], img.shape[0]))

    # gradient
    def sobelxy(self, img, orient, sobel_kernel, thresh):
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        if orient == 'x':
            sobel = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
        if orient == 'y':
            sobel = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
        abs_sobel = np.absolute(sobel)
        scaled_sobel = np.uint8(255 * abs_sobel / np.max(abs_sobel))
        binary = np.zeros_like(scaled_sobel)
        binary[scaled_sobel >= thresh] = 1
        return binary

    def magnitude(self, img, sobel_kernel, thresh):
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
        sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
        abs_sobelx = np.absolute(sobelx)
        abs_sobely = np.absolute(sobely)
        mag = np.sqrt(abs_sobelx**2 + abs_sobely**2)
        scaled_mag = np.uint8(255 * mag / np.max(mag))
        binary = np.zeros_like(scaled_mag)
        binary[scaled_mag >= thresh] = 1
        return binary

    # color
    def color_thresh(self, img):
        lab = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
        L = lab[:,:,0]
        B = lab[:,:,2]
        B = cv2.morphologyEx(B, cv2.MORPH_TOPHAT, np.ones((25,25),np.uint8))
        thresh_b = (5, 100)
        thresh_l = (245, 255)
        color = np.zeros_like(L)
        cond1 = (L > thresh_l[0]) & (L <= thresh_l[1])
        cond2 = (B > thresh_b[0]) & (B <= thresh_b[1])
        color[cond1 | cond2] = 1
        return color
    
    def apply_colorgrad(self, img):     
        # gradient
        sobelx = self.sobelxy(img, 'x', sobel_kernel=15, thresh=50)
        sobely = self.sobelxy(img, 'y', sobel_kernel=15, thresh=50)
        mag = self.magnitude(img, sobel_kernel=15, thresh=50)
        grad = np.zeros_like(sobelx)
        cond_xy = (sobelx == 1) & (sobely == 1)
        grad[cond_xy | (mag == 1)] = 1
        # color
        color = self.color_thresh(img)
        # combine color and gradient
        combined = np.zeros_like(grad)
        combined[(color == 1) | (grad == 1)] = 1
        combined = cv2.erode(combined, np.ones((7,7),np.uint8), iterations=1)
        return combined

    # fit lines
    def find_line(self, img, side, window_height=60):
        # take a histogram of the bottom half of the image
        histogram = np.sum(img[img.shape[0]//2:,:], axis=0)
        # find the starting point for the left and right lines
        midpoint = histogram.shape[0] // 2
        
        if side == 'left':
            x_initial = np.argmax(histogram[:midpoint])
        if side == 'right':
            x_initial = np.argmax(histogram[midpoint:]) + midpoint

        nwindows = img.shape[0] // window_height

        # identify the x and y positions of all nonzero pixels in the image
        nonzeroy = np.array(img.nonzero()[0])
        nonzerox = np.array(img.nonzero()[1])
        # current positions to be updated for each window
        x_current = x_initial
                
        lane_inds = []
        for window in range(nwindows):
            # window boundaries in x and y (and right and left)
            win_y_low = img.shape[0] - (window + 1) * window_height
            win_y_high = img.shape[0] - window * window_height
            win_x_low = x_current - self.margin_find
            win_x_high = x_current + self.margin_find
            # identify the nonzero pixels in x and y within the window
            good_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
            (nonzerox >= win_x_low) &  (nonzerox < win_x_high)).nonzero()[0]
            lane_inds.append(good_inds)
            
            # if found > minpix pixels, recenter next window on their mean position
            if len(good_inds) > self.minpix:
                x_current = np.int(np.mean(nonzerox[good_inds]))
            
        lane_inds = np.concatenate(lane_inds)

        # extract left and right line pixel positions
        x = nonzerox[lane_inds]
        y = nonzeroy[lane_inds] 
        
        # fit a second order polynomial to each
        coeffs = None
        if x.any() is not None:
            try:
                coeffs = np.polyfit(y, x, 2)
            except:
                return None
        return coeffs
    
    def search_margin(self, bnr, cfs, side):
        y = np.array(bnr.nonzero()[0])
        x = np.array(bnr.nonzero()[1])
                
        lane_inds = ((x > (cfs[0]*(y**2) + cfs[1]*y + cfs[2] - self.margin_check)) & 
                     (x < (cfs[0]*(y**2) + cfs[1]*y + cfs[2] + self.margin_check))) 

        # extract left and right line pixel positions
        x = x[lane_inds]
        y = y[lane_inds] 
        # fit a second order polynomial to each
        coeffs = None
                        
        if x.all() is not None:
            try:
                coeffs = np.polyfit(y, x, 2)
            except:
                return None
        return coeffs
                
    def parallel_left_right(self, left_coeffs, right_coeffs):
        diff = np.absolute(left_coeffs - right_coeffs)
        if diff[0] < self.a2 and diff[1] < self.b2 and diff[2] > self.c2[0] and diff[2] < self.c2[1]:
            return True
        return False

    def create_line(self, shape, coeffs):
        ploty = np.linspace(0, shape[0]-1, shape[0])
        line = coeffs[0]*ploty**2 + coeffs[1]*ploty + coeffs[2]
        return line
    
    def curvature(self, b, x, lane_width):
        ploty = np.linspace(0, b.shape[0]-1, b.shape[0])
        y_eval = np.max(ploty)
        # conversion in x and y from pixels space to meters
        dashed_line_length = 150 #pixels
        ym_per_pix = 3 / dashed_line_length # dashed lane line in real world is 3m
        self.xm_per_pix = 3.7 / lane_width # lane width in real world is 3.7m

        # fit new polynomials to x,y in world space
        cr = np.polyfit(ploty*ym_per_pix, x*self.xm_per_pix, 2)
        # radius of curvature in meters
        curverad = ((1 + (2*cr[0]*y_eval*ym_per_pix + cr[1])**2)**1.5) / np.absolute(2*cr[0])        
        return curverad
    
    def draw_back(self, img, b, lines, warp, left, right, left_c, right_c,
                  curv_l, curv_r, offset, left_c_prev=None, right_c_prev=None):
        warp_zero = np.zeros_like(b).astype(np.uint8)
        color_warp = np.dstack((warp_zero, warp_zero, warp_zero))
        # recast the x and y points into usable format for cv2.fillPoly()
        ploty = np.linspace(0, b.shape[0]-1, b.shape[0])
        pts_left = np.array([np.transpose(np.vstack([left, ploty]))])
        pts_right = np.array([np.flipud(np.transpose(np.vstack([right, ploty])))])
        pts = np.hstack((pts_left, pts_right))
        # draw the lane onto the warped blank image
        pts = np.int_([pts])
        cv2.fillPoly(color_warp, pts, (0, 255, 0))
        # draw the lane lines 
        pts_left = np.int_([pts_left])
        pts_left = pts_left.reshape((-1,1,2))
        pts_right = np.int_([pts_right])
        pts_right = pts_right.reshape((-1,1,2))
        cv2.polylines(color_warp, pts_left, True, (255,0,255), 20)
        cv2.polylines(color_warp, pts_right, True, (255,0,255), 20)
        
        # warp the blank back to original image space using inverse perspective matrix (Minv)
        newwarp = cv2.warpPerspective(color_warp, self.Minv, (img.shape[1], img.shape[0])) 

        # combine the result with the original image
        result = cv2.addWeighted(img, 1, newwarp, 0.5, 0)

        # draw info images at the top of the frame
        info_lines = cv2.resize(lines, None, fx=0.25, fy=0.25, interpolation = cv2.INTER_LINEAR)
        info_warp = cv2.resize(warp, None, fx=0.25, fy=0.25, interpolation = cv2.INTER_LINEAR)
        height, width, _ = info_lines.shape
        result[:height, 2*width:3*width] = info_lines
        result[:height, 3*width:] = info_warp

        # text
        font = cv2.FONT_HERSHEY_SIMPLEX
        
        text1 = 'Radius of Curvature:'
        text2 = '{:.0f}m'.format((curv_r+curv_l)/2)
        cv2.putText(result, text1, (50,40), font, 1.2, (255,255,255), 2, cv2.LINE_AA)
        cv2.putText(result, text2, (50,80), font, 1.2, (255,255,255), 2, cv2.LINE_AA)
        
        text3 = 'Vehicle is:'
        text4 = ''
        if offset > 0:
            text4 = '{:.2f}m left of center'.format(offset*self.xm_per_pix)
        if offset < 0:
            text4 = '{:.2f}m right of center'.format(abs(offset)*self.xm_per_pix)
        if offset == 0:
            text4 = 'centered in the lane'
            
        cv2.putText(result, text3, (50,120), font, 1.2, (255,255,255), 2, cv2.LINE_AA)
        cv2.putText(result, text4, (50,160), font, 1.2, (255,255,255), 2, cv2.LINE_AA)
        
        return result
    
    def pipeline(self, img):
        copy = np.copy(img)
        und = self.undistort(copy)
        wrp = self.warp(und, self.M)
        bnr = self.apply_colorgrad(wrp)
        
        lft_coeffs = None
        rght_coeffs = None
        lft_line = None
        rght_line = None
        
        # find lines coeffs
        if not self.detected:
            lft_coeffs = self.find_line(bnr, 'left')
            rght_coeffs = self.find_line(bnr, 'right')     
        else:
            lft_coeffs_prev = self.buffer_left_coeffs[-1]
            lft_coeffs = self.search_margin(bnr, lft_coeffs_prev, 'left')
            rght_coeffs_prev = self.buffer_right_coeffs[-1]
            rght_coeffs = self.search_margin(bnr, rght_coeffs_prev, 'right')
        
        # check if parallel left and right
        if lft_coeffs is not None and rght_coeffs is not None:
            if self.parallel_left_right(lft_coeffs, rght_coeffs):
                if len(self.buffer_left_coeffs) > 0 and len(self.buffer_right_coeffs) > 0:
                    self.buffer_left_coeffs.append(lft_coeffs)
                    self.buffer_right_coeffs.append(rght_coeffs)
                    lft_coeffs = np.average(self.buffer_left_coeffs, axis=0, 
                                            weights=range(len(self.buffer_left_coeffs),0,-1))
                    rght_coeffs = np.average(self.buffer_right_coeffs, axis=0, 
                                             weights=range(len(self.buffer_right_coeffs),0,-1))
                lft_line = self.create_line(bnr.shape, lft_coeffs)
                rght_line = self.create_line(bnr.shape, rght_coeffs)
                self.detected = True
            else:
                lft_coeffs = None
                rght_coeffs = None
        
        # check left/right side
        if lft_line is not None and rght_line is not None:
            mid = bnr.shape[1] / 2
            if lft_line[-1] > mid or rght_line[-1] < mid:
                lft_coeffs = None
                rght_coeffs = None
                lft_line = None
                rght_line = None
                self.buffer_left_coeffs = self.buffer_left_coeffs[:-1]
                self.buffer_right_coeffs = self.buffer_right_coeffs[:-1]
                self.detected = False
                
        # if not found, use buffer
        if lft_coeffs is None and rght_coeffs is None:
            if len(self.buffer_left_coeffs) > 0 and len(self.buffer_right_coeffs) > 0:
                lft_coeffs = np.average(self.buffer_left_coeffs, axis=0, 
                                        weights=range(len(self.buffer_left_coeffs),0,-1))
                rght_coeffs = np.average(self.buffer_right_coeffs, axis=0, 
                                        weights=range(len(self.buffer_right_coeffs),0,-1))
                lft_line = self.create_line(bnr.shape, lft_coeffs)  
                rght_line = self.create_line(bnr.shape, rght_coeffs)
                self.detected = False
        
        # draw info images
        ploty = np.linspace(0, bnr.shape[0]-1, bnr.shape[0])
        drawn_lines = np.dstack((bnr, bnr, bnr)) * 255
                
        if lft_coeffs is not None and rght_coeffs is not None:
            pts_left = np.array([np.transpose(np.vstack([lft_line, ploty]))])
            pts_left = np.int_([pts_left])
            pts_left = pts_left.reshape((-1,1,2))
            cv2.polylines(drawn_lines, pts_left, True, (0,255,0), 15)
                    
            pts_right = np.array([np.transpose(np.vstack([rght_line, ploty]))])
            pts_right = np.int_([pts_right])
            pts_right = pts_right.reshape((-1,1,2))
            cv2.polylines(drawn_lines, pts_right, True, (255,0,0), 15)
        
        # draw everything and update
        if lft_coeffs is not None and rght_coeffs is not None:
            lane_width = rght_line[-1] - lft_line[-1]
            offset = lft_line[-1] + lane_width / 2 - img.shape[1] / 2               
            self.left_curvature = self.curvature(bnr, lft_line, lane_width)
            self.right_curvature = self.curvature(bnr, rght_line, lane_width)
            img = self.draw_back(img, bnr, drawn_lines, wrp, lft_line, rght_line, lft_coeffs, rght_coeffs,
                                    self.left_curvature, self.right_curvature, offset)
        if self.detected:
            self.buffer_left_coeffs.append(lft_coeffs)
            self.buffer_right_coeffs.append(rght_coeffs)
        
        return img

In [3]:
tracker = LaneTracker()

In [4]:
out_proj = 'videos_output/project_video.mp4'
clip_proj = VideoFileClip('project_video.mp4')
project_clip = clip_proj.fl_image(tracker.pipeline)
project_clip.write_videofile(out_proj, audio=False)

[MoviePy] >>>> Building video videos_output/project_video.mp4
[MoviePy] Writing video videos_output/project_video.mp4


100%|█████████▉| 1260/1261 [06:36<00:00,  3.18it/s]


[MoviePy] Done.
[MoviePy] >>>> Video ready: videos_output/project_video.mp4 



In [5]:
tracker_challenge = LaneTracker()

In [5]:
out_chal = 'videos_output/challenge_video.mp4'
clip_chal = VideoFileClip('challenge_video.mp4')#.subclip(0,5)
challenge_clip = clip_chal.fl_image(tracker_challenge.pipeline)
challenge_clip.write_videofile(out_chal, audio=False)

[MoviePy] >>>> Building video videos_output/challenge_video.mp4
[MoviePy] Writing video videos_output/challenge_video.mp4


100%|██████████| 485/485 [02:38<00:00,  2.99it/s]


[MoviePy] Done.
[MoviePy] >>>> Video ready: videos_output/challenge_video.mp4 



In [6]:
tracker_harder_challenge = LaneTracker()

In [7]:
out_hard = 'videos_output/harder_video.mp4'
clip_hard = VideoFileClip('harder_challenge_video.mp4').subclip(0,30)
hard_clip = clip_hard.fl_image(tracker_harder_challenge.pipeline)
hard_clip.write_videofile(out_hard, audio=False)

[MoviePy] >>>> Building video videos_output/harder_video.mp4
[MoviePy] Writing video videos_output/harder_video.mp4


100%|█████████▉| 750/751 [04:01<00:00,  3.14it/s]


[MoviePy] Done.
[MoviePy] >>>> Video ready: videos_output/harder_video.mp4 

