In [1]:
import numpy as np
import cv2
from skimage.feature import hog
import time
import pickle
from collections import deque
from sklearn.svm import LinearSVC
from sklearn.preprocessing import StandardScaler
from scipy.ndimage.measurements import label
from sklearn.externals import joblib
from moviepy.editor import VideoFileClip
from IPython.display import HTML

In [2]:
class VehicleTracker:
    def __init__(self):
        # classifier
        self.data = joblib.load('clf_scaler.p')
        self.X_scaler = self.data['scaler']
        self.svc = self.data['clf']
        # HOG parameters
        self.color_space = 'YUV'
        self.orient = 24
        self.pix_per_cell = 16
        self.cell_per_block = 2
        self.channel = 0
        # 64x64 windows
        self.y_start_small = 400
        self.y_stop_small = 528
        self.scale_small = 1.0
        # 96x96 windows
        self.y_start_med = 400
        self.y_stop_med = 592
        self.scale_med = 1.5
        # 128x128 windows
        self.y_start_big = 404
        self.y_stop_big = 660
        self.scale_big = 2.0
        # buffer
        self.all_windows = []

    def convert_color(self, img, conv):
        color = 'cv2.COLOR_RGB2{}'.format(conv)
        color = eval(color)
        return cv2.cvtColor(img, color)

    def find_cars(self, img, ystart, ystop, scale):    
        img_tosearch = img[ystart:ystop,:,:]
        ctrans_tosearch = self.convert_color(img_tosearch, self.color_space)
        # resize in order to fit in 64x64 patches (because clf was trained that way)
        if scale != 1:
            imshape = ctrans_tosearch.shape
            ctrans_tosearch = cv2.resize(ctrans_tosearch, (np.int(imshape[1]/scale), np.int(imshape[0]/scale)))
            
        ch = ctrans_tosearch[:,:,self.channel]
        
        # compute HOG features for the image
        hog_features = hog(ch, self.orient, (self.pix_per_cell, self.pix_per_cell),
                           (self.cell_per_block, self.cell_per_block),
                           transform_sqrt=True, feature_vector=False)

        # define blocks and steps
        nxblocks = (ctrans_tosearch.shape[1] // self.pix_per_cell) - self.cell_per_block + 1
        nyblocks = (ctrans_tosearch.shape[0] // self.pix_per_cell) - self.cell_per_block + 1 

        window = 64 # 64 was the orginal sampling rate, with 8 cells and 8 pix per cell
        nblocks_per_window = (window // self.pix_per_cell) - self.cell_per_block + 1
        cells_per_step = 2
        nxsteps = (nxblocks - nblocks_per_window) // cells_per_step
        nysteps = (nyblocks - nblocks_per_window) // cells_per_step

        windows_list = []

        for xb in range(nxsteps):
            for yb in range(nysteps):
                ypos = yb * cells_per_step
                xpos = xb * cells_per_step
                # extract HOG for this patch
                hog_feat = hog_features[ypos:ypos+nblocks_per_window, xpos:xpos+nblocks_per_window].ravel()
                
                xleft = xpos * self.pix_per_cell
                ytop = ypos * self.pix_per_cell
                
                # get features for classifier and predict
                features = self.X_scaler.transform(hog_feat.reshape(1, -1)) 
                prediction = self.svc.predict(features)

                if prediction:
                    xwindow_left = np.int(xleft*scale)
                    ytop_left = np.int(ytop*scale)
                    length = np.int(window*scale)
                    windows_list.append(((xwindow_left, ytop_left+ystart), 
                                         (xwindow_left+length, ytop_left+length+ystart)))
        return windows_list

    def add_heat(self, heatmap, box_list):
        for i, box in enumerate(box_list):
            # add 1 for all pixels inside each box
            # parameter 'i' defines weight of a box 
            # so newer boxes have more impact
            heatmap[box[0][1]:box[1][1], box[0][0]:box[1][0]] += 0.1*i
        return heatmap

    def heat_threshold(self, heatmap, threshold=2):
        # zero out pixels below the threshold
        heatmap[heatmap <= threshold] = 0
        return heatmap
    
    def draw_labeled_bboxes(self, img, labels):
        # iterate through all detected cars
        for car_number in range(1, labels[1]+1):
            # find pixels with each car_number label value
            nonzero = (labels[0] == car_number).nonzero()
            # identify x and y values of those pixels
            nonzeroy = np.array(nonzero[0])
            nonzerox = np.array(nonzero[1])
            # define a bounding box based on min/max x and y
            bbox = ((np.min(nonzerox), np.min(nonzeroy)), (np.max(nonzerox), np.max(nonzeroy)))
            # draw the box on the image
            cv2.rectangle(img, bbox[0], bbox[1], (255, 255, 0), 6)
        return img
    
    def vehicle_tracking(self, img):
        # search windows 64x64, 96x96 and 128x128
        windows_small = self.find_cars(img, self.y_start_small, self.y_stop_small, self.scale_small)
        windows_med = self.find_cars(img, self.y_start_med, self.y_stop_med, self.scale_med)
        windows_big = self.find_cars(img, self.y_start_big, self.y_stop_big, self.scale_big)
        windows = windows_small + windows_med + windows_big                    
        
        # if something found
        if len(windows) > 0:
            self.all_windows.append(windows)
            # wait for a few first frames to get more detections
            if len(self.all_windows) >= 5:
                heat = np.zeros_like(img[:,:,0]).astype(np.float)
                list_windows = [window for sublist in self.all_windows for window in sublist]
                heat = self.add_heat(heat, list_windows)
                heat = self.heat_threshold(heat, 1+int(len(list_windows)*1.7)) 
                heatmap = np.clip(heat, 0, 255)
                labels = label(heatmap)
                result = self.draw_labeled_bboxes(np.copy(img), labels)
                # keep size of buffer from growing (no more than 18 frames)
                if len(self.all_windows) > 18:
                    self.all_windows.pop(0)
                return result
        elif len(self.all_windows) > 0:
            self.all_windows.pop(0)
        return img

In [3]:
class LaneTracker:
    def __init__(self):
        # load calibration parameters
        self.coeffs = pickle.load(open('coeffs.p', 'rb'))
        self.mtx = self.coeffs['mtx']
        self.dist = self.coeffs['dist']
        # transform points
        self.s_pts = np.array(((550, 480), (740, 480), (1110, 720), (200, 720)), np.float32)
        self.d_pts = np.array(((300, 0), (1000, 0), (1000, 720), (300, 720)), np.float32)
        self.M = cv2.getPerspectiveTransform(self.s_pts, self.d_pts)
        self.Minv = cv2.getPerspectiveTransform(self.d_pts, self.s_pts)
        # line coeffs to check parallel left/right
        self.a2 = 0.0003
        self.b2 = 0.35
        self.c2 = (370, 820)
        # margin for searching lines on next frame
        self.margin_find = 40
        self.margin_check = 100
        # minimum number of pixels found to recenter window
        self.minpix = 50
        # buffer
        self.buffer_left_coeffs = deque(maxlen=5)
        self.buffer_right_coeffs = deque(maxlen=5)
        # tracking lines
        self.detected = False
    
    def undistort(self, img):
        return cv2.undistort(img, self.mtx, self.dist, None, self.mtx)

    def warp(self, img, M):
        # add contrast correction
        img = cv2.cvtColor(img, cv2.COLOR_RGB2YUV)
        img[:,:,0] = cv2.equalizeHist(img[:,:,0])
        img = cv2.cvtColor(img, cv2.COLOR_YUV2RGB)
        return cv2.warpPerspective(img, M, (img.shape[1], img.shape[0]))

    def magnitude(self, img, sobel_kernel, thresh):
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
        sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
        abs_sobelx = np.absolute(sobelx)
        abs_sobely = np.absolute(sobely)
        mag = np.sqrt(abs_sobelx**2 + abs_sobely**2)
        scaled_mag = np.uint8(255 * mag / np.max(mag))
        binary = np.zeros_like(scaled_mag)
        binary[scaled_mag >= thresh] = 1
        return binary

    # color
    def color_thresh(self, img):
        lab = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
        L = lab[:,:,0]
        B = lab[:,:,2]
        B = cv2.morphologyEx(B, cv2.MORPH_TOPHAT, np.ones((25,25),np.uint8))
        thresh_b = (5, 100)
        thresh_l = (245, 255)
        color = np.zeros_like(L)
        cond1 = (L > thresh_l[0]) & (L <= thresh_l[1])
        cond2 = (B > thresh_b[0]) & (B <= thresh_b[1])
        color[cond1 | cond2] = 1
        return color
    
    def apply_colorgrad(self, img):     
        grad = self.magnitude(img, sobel_kernel=15, thresh=50)
        color = self.color_thresh(img)
        combined = np.zeros_like(grad)
        combined[(color == 1) | (grad == 1)] = 1
        combined = cv2.erode(combined, np.ones((7,7),np.uint8), iterations=1)
        return combined

    # fit lines
    def find_line(self, img, side, window_height=80):
        # take a histogram of the bottom half of the image
        histogram = np.sum(img[img.shape[0]//2:,:], axis=0)
        # find the starting point for the left and right lines
        midpoint = histogram.shape[0] // 2
        
        if side == 'left':
            x_initial = np.argmax(histogram[:midpoint])
        if side == 'right':
            x_initial = np.argmax(histogram[midpoint:]) + midpoint

        nwindows = img.shape[0] // window_height

        # identify the x and y positions of all nonzero pixels in the image
        nonzeroy = np.array(img.nonzero()[0])
        nonzerox = np.array(img.nonzero()[1])
        # current positions to be updated for each window
        x_current = x_initial
                
        lane_inds = []
        for window in range(nwindows):
            # window boundaries in x and y (and right and left)
            win_y_low = img.shape[0] - (window + 1) * window_height
            win_y_high = img.shape[0] - window * window_height
            win_x_low = x_current - self.margin_find
            win_x_high = x_current + self.margin_find
            # identify the nonzero pixels in x and y within the window
            good_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
            (nonzerox >= win_x_low) &  (nonzerox < win_x_high)).nonzero()[0]
            lane_inds.append(good_inds)
            
            # if found > minpix pixels, recenter next window on their mean position
            if len(good_inds) > self.minpix:
                x_current = np.int(np.mean(nonzerox[good_inds]))
            
        lane_inds = np.concatenate(lane_inds)

        # extract left and right line pixel positions
        x = nonzerox[lane_inds]
        y = nonzeroy[lane_inds] 
        
        # fit a second order polynomial to each
        coeffs = None
        if x.any() is not None:
            try:
                coeffs = np.polyfit(y, x, 2)
            except:
                return None
        return coeffs
    
    def search_margin(self, bnr, cfs, side):
        y = np.array(bnr.nonzero()[0])
        x = np.array(bnr.nonzero()[1])
                
        lane_inds = ((x > (cfs[0]*(y**2) + cfs[1]*y + cfs[2] - self.margin_check)) & 
                     (x < (cfs[0]*(y**2) + cfs[1]*y + cfs[2] + self.margin_check))) 

        # extract left and right line pixel positions
        x = x[lane_inds]
        y = y[lane_inds] 
        # fit a second order polynomial to each
        coeffs = None
                        
        if x.all() is not None:
            try:
                coeffs = np.polyfit(y, x, 2)
            except:
                return None
        return coeffs
                
    def parallel_left_right(self, left_coeffs, right_coeffs):
        diff = np.absolute(left_coeffs - right_coeffs)
        if diff[0] < self.a2 and diff[1] < self.b2 and diff[2] > self.c2[0] and diff[2] < self.c2[1]:
            return True
        return False

    def create_line(self, shape, coeffs):
        ploty = np.linspace(0, shape[0]-1, shape[0])
        line = coeffs[0]*ploty**2 + coeffs[1]*ploty + coeffs[2]
        return line
    
    def draw_back(self, img, b, left, right):
        warp_zero = np.zeros_like(b).astype(np.uint8)
        color_warp = np.dstack((warp_zero, warp_zero, warp_zero))
        # recast the x and y points into usable format for cv2.fillPoly()
        ploty = np.linspace(0, b.shape[0]-1, b.shape[0])
        pts_left = np.array([np.transpose(np.vstack([left, ploty]))])
        pts_right = np.array([np.flipud(np.transpose(np.vstack([right, ploty])))])
        pts = np.hstack((pts_left, pts_right))
        # draw the lane onto the warped blank image
        pts = np.int_([pts])
        cv2.fillPoly(color_warp, pts, (0, 255, 0))
        
        # warp the blank back to original image space using inverse perspective matrix (Minv)
        newwarp = cv2.warpPerspective(color_warp, self.Minv, (img.shape[1], img.shape[0]))         
        return newwarp
    
    def pipeline(self, img):
        copy = np.copy(img)
        und = self.undistort(copy)
        wrp = self.warp(und, self.M)
        bnr = self.apply_colorgrad(wrp)
        
        lft_coeffs = None
        rght_coeffs = None
        lft_line = None
        rght_line = None
        
        # find lines coeffs
        if not self.detected:
            lft_coeffs = self.find_line(bnr, 'left')
            rght_coeffs = self.find_line(bnr, 'right')     
        else:
            lft_coeffs_prev = self.buffer_left_coeffs[-1]
            lft_coeffs = self.search_margin(bnr, lft_coeffs_prev, 'left')
            rght_coeffs_prev = self.buffer_right_coeffs[-1]
            rght_coeffs = self.search_margin(bnr, rght_coeffs_prev, 'right')
        
        # check if parallel left and right
        if lft_coeffs is not None and rght_coeffs is not None:
            if self.parallel_left_right(lft_coeffs, rght_coeffs):
                if len(self.buffer_left_coeffs) > 0 and len(self.buffer_right_coeffs) > 0:
                    self.buffer_left_coeffs.append(lft_coeffs)
                    self.buffer_right_coeffs.append(rght_coeffs)
                    lft_coeffs = np.average(self.buffer_left_coeffs, axis=0, 
                                            weights=range(len(self.buffer_left_coeffs),0,-1))
                    rght_coeffs = np.average(self.buffer_right_coeffs, axis=0, 
                                             weights=range(len(self.buffer_right_coeffs),0,-1))
                lft_line = self.create_line(bnr.shape, lft_coeffs)
                rght_line = self.create_line(bnr.shape, rght_coeffs)
                self.detected = True
            else:
                lft_coeffs = None
                rght_coeffs = None
        
        # check left/right side
        if lft_line is not None and rght_line is not None:
            mid = bnr.shape[1] / 2
            if lft_line[-1] > mid or rght_line[-1] < mid:
                lft_coeffs = None
                rght_coeffs = None
                lft_line = None
                rght_line = None
                self.buffer_left_coeffs = self.buffer_left_coeffs[:-1]
                self.buffer_right_coeffs = self.buffer_right_coeffs[:-1]
                self.detected = False
                
        # if not found, use buffer
        if lft_coeffs is None and rght_coeffs is None:
            if len(self.buffer_left_coeffs) > 0 and len(self.buffer_right_coeffs) > 0:
                lft_coeffs = np.average(self.buffer_left_coeffs, axis=0, 
                                        weights=range(len(self.buffer_left_coeffs),0,-1))
                rght_coeffs = np.average(self.buffer_right_coeffs, axis=0, 
                                        weights=range(len(self.buffer_right_coeffs),0,-1))
                lft_line = self.create_line(bnr.shape, lft_coeffs)  
                rght_line = self.create_line(bnr.shape, rght_coeffs)
                self.detected = False
        
        # draw and update
        if lft_coeffs is not None and rght_coeffs is not None:
            img = self.draw_back(img, bnr, lft_line, rght_line)
        if self.detected:
            self.buffer_left_coeffs.append(lft_coeffs)
            self.buffer_right_coeffs.append(rght_coeffs)
        
        return img

In [4]:
vehicle_tracker = VehicleTracker()
lane_tracker = LaneTracker()

def vehicle_and_lane(img):
    vehicle = vehicle_tracker.vehicle_tracking(np.copy(img))
    lane = lane_tracker.pipeline(np.copy(img))
    # combine vehicle detection and lane finding
    return cv2.addWeighted(vehicle, 1, lane, 0.5, 0)

In [5]:
test_vid_out_name = 'videos_output/out_video.mp4'
test_clip = VideoFileClip('project_video.mp4')#.subclip(5,35)
test_out_clip = test_clip.fl_image(vehicle_and_lane)
%time test_out_clip.write_videofile(test_vid_out_name, audio=False)

[MoviePy] >>>> Building video videos_output/out_video.mp4
[MoviePy] Writing video videos_output/out_video.mp4


100%|█████████▉| 1260/1261 [07:10<00:00,  2.95it/s]


[MoviePy] Done.
[MoviePy] >>>> Video ready: videos_output/out_video.mp4 

CPU times: user 8min 40s, sys: 16.5 s, total: 8min 56s
Wall time: 7min 11s
