# Advanced Lane Finding Project - Pipeline Implementation

In this notebook, I implement an advanced lane finding pipline and run it on a example video.

**Pipeline steps:**

1. Undistort image with saved camera calibrations
2. "Binarize" image using color and gradient
3. Fit lane lines using A. (scanning) or B. (look-ahead filter) approaches
    1. Scan image for lane pixels using windowed approach
    2. Search for lane pixels around existing fit
4. Perform sanity check on lane-line fits
    1. Ensure lines are reasonable distance apart
    2. Ensure lines are roughly parallel
5. Draw lane fit onto image


- When fitting lane lines, approach B. (look-ahead filter) is used if previous good fits exists in memory.
- For drawing the detected lane, the output is smoothed over the previous 30 frames.
- If sanity check fails for a fit, the last good fit is used.  If sanity check fails 15 consecutive frames, the pipeline falls back to searching for a fit using approach A. (scanning).

---

In [2]:
from pathlib import Path
import os.path as osp
import glob
import pickle

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

from moviepy.editor import VideoFileClip
from IPython.display import HTML

In [2]:
# import helper functions form utils.py
from utils import get_binary_img, warper, draw_detected_lane
from utils import fit_lane_lines, fit_lane_lines_from_previous_fit

In [3]:
from collections import deque

# Define conversions in x and y from pixels space to meters
YM_PER_PIX = 30 / 720 # meters per pixel in y dimension
XM_PER_PIX = 3.7 / 700 # meters per pixel in x dimension
IMSHAPE = 1280, 720 # hardcoded

class Line():
    """A class to receive the characteristics of each line detection"""
    
    def __init__(self, buffer_size=30):        
        # number of previous fits to store
        self.buffer_size = buffer_size 
        
        # x values of the last n fits of the line
        self.recent_xfitted = deque(maxlen=self.buffer_size) 
        
        #polynomial coefficients for the most recent fit
        self.current_fit = [np.array([False])]  

        #y values for detected line pixels
        self.ally = np.linspace(0, IMSHAPE[0]-1, IMSHAPE[0])
        
    def update_fit(self, fit):
        self.current_fit = fit
        self.recent_xfitted.append(fit)
        
    def clear(self):
        self.recent_xfitted.clear()
        self.current_fit = [np.array([False])]  
           
    @property
    def detected(self):
        """was the line detected in the last iteration?"""
        return len(self.recent_xfitted) > 0
    
    @property
    def best_fit(self):
        """polynomial coefficients averaged over the last n iterations"""
        return np.array(self.recent_xfitted).sum(axis=0) / len(self.recent_xfitted)

    @property
    def allx(self):
        """x values for detected line pixels"""
        return self.current_fit[0]*self.ally**2 + self.current_fit[1]*self.ally + self.current_fit[2]
    
    @property
    def bestx(self):
        """average x values of the fitted line over the last n iterations"""
        return self.best_fit[0]*self.ally**2 + self.best_fit[1]*self.ally + self.best_fit[2]
    
    @property
    def radius_of_curvature(self):
        """radius of curvature of the line in some units"""
        y_eval = np.max(self.ally)
        # Fit new polynomials to x,y in world space
        fit_cr = np.polyfit(self.ally*YM_PER_PIX, self.bestx*XM_PER_PIX, 2)
        # Calculate the new radii of curvature
        curverad = ((1 + (2*fit_cr[0]*y_eval*YM_PER_PIX + fit_cr[1])**2)**1.5) / np.absolute(2*fit_cr[0])
        return curverad
        
    @property
    def line_base_pos(self):
        """position in pixels of line at bottom of image"""
        y_eval = np.max(self.ally)
        return self.best_fit[0]*y_eval**2 + self.best_fit[1]*y_eval + self.best_fit[2]

In [4]:
left_line = Line()
right_line = Line()

n_invalid_fits = 0

# camera calibration and trasform matrices
mtx, dist = pickle.load(open('camera_cal.pkl', 'rb'))

def sanity_check_fits(left_fit, right_fit, lane_width_bounds=(2.5, 5.0), parallel_thresh=100):
    """Performs follwing sanity checks on fits:
        1. Determine if fits yield lane width that is between `lane_width_bounds`
        
        2. Determine if lines are roughly parallel.
        
        Function returns `True` if all sanity checks are met.
    """
    fit_is_sane = True
    calc_x_bottom = lambda fit: fit[0]*1279**2 + fit[1]*1279 + fit[2] 
    calc_x_top = lambda fit: fit[0]*0**2 + fit[1]*0 + fit[2] 

    bot_diff = calc_x_bottom(right_fit) - calc_x_bottom(left_fit)
    top_diff = calc_x_top(right_fit) - calc_x_top(left_fit)

    # lane width in meters
    lane_width = bot_diff * XM_PER_PIX 
    
    # difference between lane width (in pixels) at bottom of image
    # and lane width at top of image
    parallel_diff = top_diff - bot_diff 
    
    if lane_width < lane_width_bounds[0] or lane_width > lane_width_bounds[1]:
        fit_is_sane = False
    elif abs(parallel_diff) > parallel_thresh:
        fit_is_sane = False
        
    return fit_is_sane

def process_image(image, reset_thresh=15):
    global n_invalid_fits
    
    # undistort image
    image = cv2.undistort(image, mtx, dist, None, mtx)
    
    binary_img = get_binary_img(image)
    binary_warped_img = warper(binary_img)
    
    if not left_line.detected or not right_line.detected:
        # previous line was not detected for new fits
        left_fit, right_fit, _ = fit_lane_lines(binary_warped_img)
    else:
        # previous frame yielded a valid fit, search around it
        left_fit, right_fit = fit_lane_lines_from_previous_fit(binary_warped_img,
            left_line.best_fit, right_line.best_fit)
    
    # Sanity check
    fit_is_sane = sanity_check_fits(left_fit, right_fit)
     
    if fit_is_sane:
        # if qc passess, update fit
        left_line.update_fit(left_fit)
        right_line.update_fit(right_fit)
        n_invalid_fits = 0 # reset 
    else:
        n_invalid_fits += 1
        
        if n_invalid_fits == reset_thresh:
            n_invalid_fits = 0
            # if qc fails for `reset_thresh` consecutive times, clear cache
            left_line.clear()
            right_line.clear()
    
    if left_line.detected and right_line.detected:
        left_line.update_fit(left_fit)
        right_line.update_fit(right_fit)

        out_img = draw_detected_lane(image, binary_warped_img,
             left_line.ally, left_line.bestx, right_line.bestx)    

        # Write text on output image
        avg_curverad = (left_line.radius_of_curvature +  right_line.radius_of_curvature) / 2
        rad_str = 'Radius of Curvature = %dm' % np.round(avg_curverad)

        lane_center_px = (left_line.line_base_pos + right_line.line_base_pos) / 2
        lane_center_offset_px = lane_center_px - (binary_warped_img.shape[1] / 2)
        lane_center_offset_m = lane_center_offset_px * XM_PER_PIX
        offset_str = 'Vehicle is %.2fm off center of lane' % lane_center_offset_m

        cv2.putText(out_img, rad_str, (40, 75), cv2.FONT_HERSHEY_SIMPLEX,
            1, (255, 255, 255), 2)
        cv2.putText(out_img, offset_str, (40, 150), cv2.FONT_HERSHEY_SIMPLEX,
            1, (255, 255, 255), 2)
        
    else:
        out_img = image
        cv2.putText(out_img, 'Unable to fit lane lines!', (40, 75),
            cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    
    return out_img

In [5]:
# process vid
output_vid = 'project_video_output.mp4'
input_clip = VideoFileClip('project_video.mp4')
processed_clip = input_clip.fl_image(process_image) #NOTE: this function expects color images!!
%time processed_clip.write_videofile(output_vid, audio=False)

[MoviePy] >>>> Building video output_images/project_video_output.mp4
[MoviePy] Writing video output_images/project_video_output.mp4


100%|█████████▉| 1260/1261 [11:29<00:00,  1.91it/s]


[MoviePy] Done.
[MoviePy] >>>> Video ready: output_images/project_video_output.mp4 

CPU times: user 13min 16s, sys: 1min 9s, total: 14min 25s
Wall time: 11min 30s


In [5]:
HTML("""
<video width="960" height="540" controls>
  <source src="{0}">
</video>
""".format(output_vid))