In [None]:
import cv2
import glob
import pickle
import numpy as np
import matplotlib.image as mpimg
import matplotlib.pyplot as plt

from matplotlib.backends.backend_pdf import PdfPages
from moviepy.editor import VideoFileClip
from multiprocessing import Pool

%matplotlib inline

xm_per_pix = 3.7 / 700
ym_per_pix = 30 / 720

draw_plots_flag = False

In [None]:
def grayscale(img):
    return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

In [None]:
def draw_plots(images):
    amount = len(images)
    max_cols = 4
    if amount > max_cols:
        rows = amount // max_cols + 1
        cols = amount // rows + amount % rows
    else:
        rows = 1
        cols = amount

    if draw_plots_flag == True:
        f, ax_tuple = plt.subplots(rows, cols, figsize = (24, 9), squeeze = False)
        f.tight_layout()

        for i in range(amount):
            x = i // cols
            y = i % cols

            ax_tuple[x][y].imshow(images[i]['img'])
            ax_tuple[x][y].set_title(images[i]['title'], fontsize = 50)        

        plt.subplots_adjust(left = 0., right = 1., top = 1.5, bottom = 0.)

In [None]:
def save_plots(images, filename):
    amount = len(images)
    with PdfPages(filename) as pdf:
        for i in range(amount):
            fig = plt.figure()
            plt.title(images[i]['title'], fontsize = 50)
            plt.imshow(images[i]['img'])
            pdf.savefig(fig)

In [None]:
def camera_calibration():
    # prepare object points, like (0,0,0), (1,0,0), (2,0,0) ....,(6,5,0)
    objp = np.zeros((6*9,3), np.float32)
    objp[:,:2] = np.mgrid[0:9,0:6].T.reshape(-1,2)

    # Arrays to store object points and image points from all the images.
    objpoints = [] # 3d points in real world space
    imgpoints = [] # 2d points in image plane.

    # Make a list of calibration images
    images = glob.glob('camera_cal/calibration*.jpg')

    # Step through the list and search for chessboard corners
    for fname in images:
        img = cv2.imread(fname)
        gray = grayscale(img)

        # Find the chessboard corners
        ret, corners = cv2.findChessboardCorners(
            image = gray,
            patternSize = (9,6),
            corners = None
        )

        # If found, add object points, image points
        if ret == True:
            objpoints.append(objp)
            imgpoints.append(corners)

            # Draw and display the corners
            img = cv2.drawChessboardCorners(
                image = img,
                patternSize = (9,6),
                corners = corners,
                patternWasFound = ret)
    
    ret, mtx, dist, rvec, tvecs = cv2.calibrateCamera(
        objectPoints = objpoints,
        imagePoints = imgpoints,
        imageSize = gray.shape[::-1],
        cameraMatrix = None,
        distCoeffs = None
    )

    cam_calib = {"cam_matrix": mtx, "dist_coeffs": dist}
    with open("output_images/cam_calib.p", "wb") as f:
        pickle.dump(
            obj = cam_calib,
            file = f
        )  

In [None]:
def undistort_image(img):
    dist_pickle = pickle.load( open("output_images/cam_calib.p", "rb") )
    mtx = dist_pickle["cam_matrix"]
    dist = dist_pickle["dist_coeffs"]
    dst = cv2.undistort(
        src = img,
        cameraMatrix = mtx,
        distCoeffs = dist,
        dst = None,
        newCameraMatrix = mtx
    
    )
    return dst

In [None]:
def warper(img):

    src = np.float32([[200, 720], [1100, 720], [595, 450], [685, 450]])
    dst = np.float32([[300, 720], [980, 720], [300, 0], [980, 0]])

    M = cv2.getPerspectiveTransform(src, dst)
    M_inv = cv2.getPerspectiveTransform(dst, src)
    warped = cv2.warpPerspective(
        src = img,
        M = M,
        dsize = (img.shape[1], img.shape[0]),
        flags = cv2.INTER_LINEAR
    )

    draw_plots((
        {"img": img, "title": "Original"},
        {"img": warped, "title": "Warped"}
    ))

    return warped, M_inv

In [None]:
def apply_sobel(gray_img, direction):
    return cv2.Sobel(
        src = gray_img,
        ddepth = cv2.CV_64F,
        dx = int('x' in direction),
        dy = int('y' in direction)
    )

In [None]:
def abs_sobel(gray_img, direction):
    return (apply_sobel(gray_img, 'x') ** 2 + apply_sobel(gray_img, 'y') ** 2) ** 0.5 if direction == 'xy' else np.absolute(apply_sobel(gray_img, direction)) 

In [None]:
def scaled_sobel(gray_img, direction):
    abs_sob = abs_sobel(gray_img, direction)
    return np.uint8(255 * abs_sob / np.max(abs_sob))

In [None]:
def binary(range_param, threshold):
    binary = np.zeros_like(range_param)
    binary[(threshold[0] <= range_param) & (range_param <= threshold[1])] = 1
    return binary

In [None]:
def binary_image(img):

    #TODO: Place for improvement. Need to find proper combination of binaries to be able to pass the challenge_ and harder_challenge_video
    
    threshold = (50, 255)
    gray = grayscale(img)

    xbin = binary(
        range_param = scaled_sobel(gray, 'x'),
        threshold = threshold
    )

    magnitude_bin = binary(
        range_param = scaled_sobel(gray, 'xy'),
        threshold = threshold
    )

    direction_bin = binary(
        range_param = np.arctan2(abs_sobel(gray, 'y'), abs_sobel(gray, 'x')),
        threshold = (.7, 1.3)
    )

    hls = cv2.cvtColor(img, cv2.COLOR_BGR2HLS)
    
    sbin = binary(
        range_param = hls[:,:,2],
        threshold = (180, 255)
    )

    combined = np.zeros_like(direction_bin)
    combined[((xbin == 1) | ((magnitude_bin == 1) & (direction_bin == 1))) | (sbin == 1)] = 1

    draw_plots((
        {"img": img, "title": "Original"},
        {"img": xbin, "title": "Sobel X"},
        {"img": magnitude_bin, "title": "Sobel XY"},
        {"img": direction_bin, "title": "Directions"},
        {"img": sbin, "title": "HLS -> S"},
        {"img": combined, "title": "Combined"}
    ))

    return combined

In [None]:
def find_lane_pixels(warped_binary_img):
    histogram = np.sum(warped_binary_img[warped_binary_img.shape[0] // 2:, :], axis = 0)
    out_img = np.dstack((warped_binary_img, warped_binary_img, warped_binary_img))
    midpoint = np.int(histogram.shape[0] // 2)
    left_x_base = np.argmax(histogram[:midpoint])
    right_x_base = np.argmax(histogram[midpoint:]) + midpoint

    nwindows = 9
    margin = 100
    minpix = 50

    window_height = np.int(warped_binary_img.shape[0] // nwindows)

    nonzero = warped_binary_img.nonzero()
    nonzero_y = np.array(nonzero[0])
    nonzero_x = np.array(nonzero[1])

    left_x_current = left_x_base
    right_x_current = right_x_base

    left_lane_indices = []
    right_lane_indices = []

    for window in range(nwindows):
        win_y_low = warped_binary_img.shape[0] - (window + 1) * window_height
        win_y_high = warped_binary_img.shape[0] - window * window_height

        win_x_left_low = left_x_current - margin
        win_x_left_high = left_x_current + margin
        win_x_right_low = right_x_current - margin
        win_x_right_high = right_x_current + margin

        cv2.rectangle(
            img = out_img,
            pt1 = (win_x_left_low, win_y_low),
            pt2 = (win_x_left_high, win_y_high),
            color = (0, 255, 0),
            thickness = 2
        )

        cv2.rectangle(
            img = out_img,
            pt1 = (win_x_right_low, win_y_low),
            pt2 = (win_x_right_high, win_y_high),
            color = (0, 255, 0),
            thickness = 2
        )

        good_left_indices = (   (win_y_low <= nonzero_y)
                                &(nonzero_y < win_y_high)
                                &(win_x_left_low <= nonzero_x)
                                &(nonzero_x < win_x_left_high)
                            ).nonzero()[0]

        good_right_indices = (  (win_y_low <= nonzero_y)
                                &(nonzero_y < win_y_high)
                                &(win_x_right_low <= nonzero_x)
                                &(nonzero_x < win_x_right_high)
                            ).nonzero()[0]

        left_lane_indices.append(good_left_indices)
        right_lane_indices.append(good_right_indices)

        if len(good_left_indices) > minpix:
            left_x_current = np.int(np.mean(nonzero_x[good_left_indices]))

        if len(good_right_indices) > minpix:
            right_x_current = np.int(np.mean(nonzero_x[good_right_indices]))

    left_lane_indices = np.concatenate(left_lane_indices)
    right_lane_indices = np.concatenate(right_lane_indices)

    left_x = nonzero_x[left_lane_indices]
    left_y = nonzero_y[left_lane_indices]
    right_x = nonzero_x[right_lane_indices]
    right_y = nonzero_y[right_lane_indices]

    left_fit = np.polyfit(left_y, left_x, 2)
    right_fit = np.polyfit(right_y, right_x, 2)

    draw_plots((
        {"img": warped_binary_img, "title": "Warped"},
        {"img": out_img, "title": "Output"}
    ))

    return left_fit, right_fit, nonzero_x, nonzero_y, left_lane_indices, right_lane_indices, out_img

In [None]:
def detect_lanes(warped_binary_img, left_fit, right_fit):
    margin = 100

    nonzero = warped_binary_img.nonzero()
    nonzero_x = np.array(nonzero[1])
    nonzero_y = np.array(nonzero[0])

    left_poly = left_fit[0] * (nonzero_y ** 2) + left_fit[1] * nonzero_y + left_fit[2]
    right_poly = right_fit[0] * (nonzero_y ** 2) + right_fit[1] * nonzero_y + right_fit[2]

    left_lane_indices = ((left_poly - margin < nonzero_x) & (nonzero_x < left_poly + margin))
    right_lane_indices = ((right_poly - margin < nonzero_x) & (nonzero_x < right_poly + margin))

    left_x = nonzero_x[left_lane_indices]
    left_y = nonzero_y[left_lane_indices]
    right_x = nonzero_x[right_lane_indices]
    right_y = nonzero_y[right_lane_indices]

    left_fit = np.polyfit(left_y, left_x, 2)
    right_fit = np.polyfit(right_y, right_x, 2)

    return left_fit, right_fit, nonzero_x, nonzero_y, left_lane_indices, right_lane_indices

In [None]:
def measure_curvation_real(warped_binary_img, left_lane_indices, right_lane_indices, nonzero_x, nonzero_y):

    #TODO: Need to be tested if the math here is proper

    y_eval = ym_per_pix * warped_binary_img.shape[0]

    left_x = xm_per_pix * nonzero_x[left_lane_indices]
    right_x = xm_per_pix * nonzero_x[right_lane_indices]

    left_y = ym_per_pix * nonzero_y[left_lane_indices]
    right_y = ym_per_pix * nonzero_y[right_lane_indices]

    left_fit_cr = np.polyfit(left_y, left_x, 2)
    right_fit_cr = np.polyfit(right_y, right_x, 2)

    '''
    R_curve = ((1 + (2Ay + B)^2 )^1.5) / |2A|
    '''
    left_curverad = ((1 + (2 * left_fit_cr[0] * y_eval + left_fit_cr[1]) ** 2) ** 1.5) / (np.absolute(2 * left_fit_cr[0])) 
    right_curverad = ((1 + (2 * right_fit_cr[0] * y_eval + right_fit_cr[1]) ** 2) ** 1.5) / (np.absolute(2 * right_fit_cr[0])) 

    return left_curverad, right_curverad

In [None]:
def vehicle_offset(undist, left_fit, right_fit):
    bottom_y = undist.shape[0] - 1
    bottom_x_left = left_fit[0] * (bottom_y ** 2) + left_fit[1] * bottom_y + left_fit[2]
    bottom_x_right = right_fit[0] * (bottom_y ** 2) + right_fit[1] * bottom_y + right_fit[2]
    offset = xm_per_pix * (undist.shape[1] / 2 - (bottom_x_left + bottom_x_right) / 2)

    return offset

In [None]:
def visualize(undist, left_fit, right_fit, M_inv, left_curve, right_curve, offset):
    ploty = np.linspace(0, undist.shape[0] - 1, undist.shape[0])
    left_fit_x = left_fit[0] * ploty ** 2 + left_fit[1] * ploty + left_fit[2]
    right_fit_x = right_fit[0] * ploty ** 2 + right_fit[1] * ploty + right_fit[2]

    color_warp = np.zeros_like(undist)

    pts_left = np.array([np.transpose(np.vstack([left_fit_x, ploty]))])
    pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fit_x, ploty])))])

    pts = np.hstack((pts_left, pts_right))

    cv2.fillPoly(
        img = color_warp,
        pts = np.int_([pts]),
        color = (0, 255, 0)
    )

    unwarped = cv2.warpPerspective(
        src = color_warp,
        M = M_inv,
        dsize = (undist.shape[1], undist.shape[0]),
        flags = cv2.INTER_LINEAR
    )

    result = cv2.addWeighted(
        src1 = undist,
        alpha = 1,
        src2 = unwarped,
        beta = .3,
        gamma = 0
    )

    curve = (left_curve + right_curve) / 2
    label = "Radius of curvation: {:.1f} m".format(curve)
    result = cv2.putText(
        img = result,
        text = label,
        org = (30, 40),
        fontFace = 0,
        fontScale = 1,
        color = (0, 0, 0),
        thickness = 2,
        lineType = cv2.LINE_AA
    )

    label = "Offset from lane center: {:.1f} m".format(offset)
    result = cv2.putText(
        img = result,
        text = label,
        org = (30, 70),
        fontFace = 0,
        fontScale = 1,
        color = (0, 0, 0),
        thickness = 2,
        lineType = cv2.LINE_AA
    )

    draw_plots((
        {"img": undist, "title": "Undistorted"},
        {"img": color_warp, "title": "Color warped"},
        {"img": unwarped, "title": "Color unwarped"},
        {"img": result, "title": "Output"}
    ))

    return result

In [None]:
class RoadLine():
    def __init__(self, fps):
        #Curvation formula: x = Ay^2 + By + C
        self._A = []
        self._B = []
        self._C = []
        self._fps = fps

    def get_curvation_params(self):
        return (np.mean(self._A), np.mean(self._B), np.mean(self._C))

    def add_curvation_params(self, tup):
        (A, B, C) = tup

        if len(self._A) >= self._fps:
            self._A.pop(0)
            self._B.pop(0)
            self._C.pop(0)

        self._A.append(A)
        self._B.append(B)
        self._C.append(C)

    curvation = property(get_curvation_params, add_curvation_params)

In [None]:
def process_image(img, **kwargs):
    undist = undistort_image(img)
    bin_img = binary_image(undist)
    warp_img, M_inv = warper(bin_img)

    calculate_fit = True
    if 'lines' in kwargs:
        left_line = kwargs['lines']['left']
        right_line = kwargs['lines']['right']

        if left_line.curvation[0] > 0. and right_line.curvation[0] > 0.:
            calculate_fit = False
            left_fit = left_line.curvation
            right_fit = right_lane.curvation

    if calculate_fit == True:
        left_fit, right_fit, nonzero_x, nonzero_y, left_lane_indices, right_lane_indices, poly_img = find_lane_pixels(warp_img)
    else:
        left_fit, right_fit, nonzero_x, nonzero_y, left_lane_indices, right_lane_indices = detect_lanes(warp_img, left_fit, right_fit)

    left_curverad, right_curverad = measure_curvation_real(warp_img, left_lane_indices, right_lane_indices, nonzero_x, nonzero_y)
    offset = vehicle_offset(undist, left_fit, right_fit)
    result_img = visualize(undist, left_fit, right_fit, M_inv, left_curverad, right_curverad, offset)

    draw_plots((
        {"img": img, "title": "Original"},
        {"img": result_img, "title": "Output"}
    ))
            
    if 'filename' in kwargs:
        output_filename_plots = "output_images/plots_" + kwargs['filename'] + ".pdf"
        save_plots((
            {"img": img, "title": "Original"},
            {"img": undist, "title": "Undistorted"},
            {"img": bin_img, "title": "Binary"},
            {"img": warp_img, "title": "Bird eye view"},
            {"img": poly_img, "title": "Poly"},
            {"img": result_img, "title": "Result"}
        ), filename = "output_images/plots_" + kwargs['filename'] + ".pdf")

    return result_img

In [None]:
def process_picture(fname):
    filename = fname[fname.rfind('/', 0) + 1:fname.rfind('.', 0)]
    img = cv2.imread(fname)
    process_image(img, filename = filename)
    print("Image '{}' is finished".format(filename), flush = True)

In [None]:
def picture_test():
    global draw_plots_flag
    camera_calibration()
    images = glob.glob("test_images/*.jpg")
    draw_plots_flag = False
    p = Pool()
    result = p.map_async(process_picture, images)
    result.wait()
    print("Image pool done")

In [None]:
def process_video_image(img):
    fps = 5
    left_line = RoadLine(fps)
    right_line = RoadLine(fps)
    result_img = process_image(img, lines = {"left": left_line, "right": right_line})
    return result_img

In [None]:
def process_video(fname):
    video = VideoFileClip(fname)
    annot_video = video.fl_image(process_video_image)
    video_name = fname[:fname.rfind('.', 0)]
    annot_video.write_videofile("output_videos/" + video_name + "_out.mp4", audio = False)
    print("Video '{}' is finished".format(video_name), flush = True)

In [None]:
def video_test():
    global draw_plots_flag
    camera_calibration()
    videos = glob.glob("*.mp4")
    draw_plots_flag = False
    p = Pool()
    result = p.map_async(process_video, videos)
    result.wait()
    print("Video pool done")

In [None]:
picture_test()
video_test()