In [1]:
# import libraries and utilities
import sys 
sys.path.insert(1, './object_detection')
from models import *
from utils import *
import os, sys, time, datetime, random
import cv2
import torch 
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.autograd import Variable
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image

In [2]:
# video utilities class 
class VideoUtils: 
    # extract frames from input video
    def extract_frames(inputvid_filename):
        video = cv2.VideoCapture(inputvid_filename)
        frames = []
        valid, frame = video.read()
        while valid: 
            frames.append(frame)
            valid, frame = video.read()
        video.release()
        return frames
    
    # generate a video from an array of frames 
    def compile_frames(frames, outputvid_filename):
        height, width, layers = frames[0].shape
        size = (width, height)
        fps = 60
        output_video = cv2.VideoWriter(outputvid_filename, cv2.VideoWriter_fourcc(*'DIVX'), fps, size)
        for frame in frames: 
            output_video.write(frame) 

In [3]:
# lane detection utilities class 
class LaneDetectionUtils: 
    # convert a color frame to a grayscale one
    def _grayscale_frame(frame):
        grayscale_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        return grayscale_frame
    
    # mask a frame to only include a region of interest
    def _mask_frame(frame):
        frame_height = frame.shape[0]
        frame_width = frame.shape[1]
        polygons = np.array([ 
            [(frame_width * 3 // 11, frame_height), (frame_width * 9 // 11, frame_height), (frame_width * 5 // 8, frame_height // 5 * 4), (frame_width * 3 // 7, frame_height // 5 * 4)] 
        ])
        mask = np.zeros_like(frame) 
        cv2.fillPoly(mask, polygons, 255)
        masked_frame = cv2.bitwise_and(frame, mask)
        return masked_frame
    
    # threshold frame to emphasize lane lines 
    def _threshold_frame(frame):
        ret, thresholded_frame = cv2.threshold(frame, 40, 145, cv2.THRESH_BINARY)
        return thresholded_frame
    
    # determine start and end points for lane marker
    def _mark_lane_points(frame, params):
        slope, intercept = params[0], params[1]
        y1 = frame.shape[0]
        y2 = int(y1 * 4 / 5)
        x1 = int((y1 - intercept) / slope)
        x2 = int((y2 - intercept) / slope)
        return np.array([x1, y1, x2, y2])
    
    # find both lane lines and and their marker points
    def _find_lane_lines(frame, lines):
        left_lane_fit, right_lane_fit = [], []
        if lines is None: 
            return None
        for line in lines:
            x1, y1, x2, y2 = line.reshape(4)
            parameters = np.polyfit((x1, x2), (y1, y2), 1)
            slope, intercept = parameters[0], parameters[1]
            if slope < 0: 
                left_lane_fit.append((slope, intercept))
            else: 
                right_lane_fit.append((slope, intercept))
        left_lane_avgfit = np.average(left_lane_fit, axis=0)
        if np.any(np.isnan(left_lane_avgfit)): 
            return None
        left_lane_line = LaneDetectionUtils._mark_lane_points(frame, left_lane_avgfit)
        right_lane_avgfit = np.average(right_lane_fit, axis=0)
        if np.any(np.isnan(right_lane_avgfit)): 
            return None
        right_lane_line = LaneDetectionUtils._mark_lane_points(frame, right_lane_avgfit)
        return np.array([left_lane_line, right_lane_line])
    
    # draw lane lines on the image
    def _draw_lane_lines(frame, lane_lines): 
        lane_line_frame = np.zeros_like(frame)
        if lane_lines is not None: 
            for x1, y1, x2, y2 in lane_lines:
                try: 
                    cv2.line(lane_line_frame, (x1, y1), (x2, y2), (252, 173, 76), 10)
                except: 
                    return None
        return lane_line_frame
    
    # return an array of frames with lanes detected
    def detect_lanes(frames):
        lane_detected_frames = []
        for frame in frames: 
            grayscaled_frame = LaneDetectionUtils._grayscale_frame(frame)
            masked_frame = LaneDetectionUtils._mask_frame(grayscaled_frame)
            thresholded_frame = LaneDetectionUtils._threshold_frame(masked_frame)
            detected_lines = cv2.HoughLinesP(thresholded_frame, 2, np.pi / 180, 100, np.array([]), minLineLength = 10, maxLineGap = 5)
            lane_lines = LaneDetectionUtils._find_lane_lines(frame, detected_lines)
            if lane_lines is None: 
                lane_detected_frames.append(frame)
                continue
            annotated_frame = LaneDetectionUtils._draw_lane_lines(frame, lane_lines)
            if annotated_frame is None: 
                lane_detected_frames.append(frame)
                continue
            combined_frame = cv2.addWeighted(frame, 0.8, annotated_frame, 1, 1)
            lane_detected_frames.append(combined_frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        return lane_detected_frames

In [4]:
# configure object detection model 
config_path = 'object_detection/config/yolov3.cfg'
weights_path = 'object_detection/config/yolov3.weights'
class_path = 'object_detection/config/coco.names'
image_size = 416
conf_thres = 0.8
nms_thres = 0.4

In [5]:
# load model and weights
model = Darknet(config_path, img_size=image_size)
model.load_weights(weights_path)
if torch.cuda.is_available():
    model.cuda()
model.eval()
classes = load_classes(class_path)
Tensor = torch.FloatTensor



In [6]:
# road entity detection utilities class 
class RoadEntityDetectionUtils:
    # road entities to detect
    road_entities = set([
        'person', 
        'bicycle', 
        'car', 
        'truck', 
        'motorcycle', 
        'bus', 
        'train', 
        'boat', 
        'skis',
        'snowboard'
        'skateboard',
        'surfboard'
    ])

    # preprocess image 
    def _preprocess_image(image):
        ratio = min(image_size / image.size[0], image_size / image.size[1])
        image_width = round(image.size[0] * ratio)
        image_height = round(image.size[1] * ratio)
        image_transforms = transforms.Compose([
            transforms.Resize((image_height, image_width)),
            transforms.Pad(
                (
                    max(int((image_height - image_width) / 2), 0), 
                    max(int((image_width - image_height) / 2), 0), 
                    max(int((image_height - image_width) / 2), 0),
                    max(int((image_width - image_height) / 2), 0)
                ), 
                (128, 128, 128)
            ),
            transforms.ToTensor()
         ])
        image_tensor = image_transforms(image).float()
        image_tensor = image_tensor.unsqueeze_(0)
        input_image = Variable(image_tensor.type(Tensor))
        return input_image
    
    # find road entities in image 
    def _find_road_entities(image):
        input_image = RoadEntityDetectionUtils._preprocess_image(image)
        with torch.no_grad(): 
            detections = model(input_image)
            detections = non_max_suppression(detections, 80, conf_thres, nms_thres)
        return detections[0]
    
    # detect road entities in frame
    def detect_road_entities(frames):
        road_entity_detected_frames = []
        for frame in frames: 
            image = Image.fromarray(frame)
            detections = RoadEntityDetectionUtils._find_road_entities(image)
            image = np.array(image)
            padding_x = max(image.shape[0] - image.shape[1], 0) * (image_size / max(image.shape))
            padding_y = max(image.shape[1] - image.shape[0], 0) * (image_size / max(image.shape))
            unpadded_height, unpadded_width = image_size - padding_y, image_size - padding_x
            if detections is not None:
                tracked_road_entities = detections.cpu().detach().numpy()
                unique_labels = detections[:, -1].cpu().unique()
                num_class_preds = len(unique_labels.detach().cpu().numpy())
                for x1, y1, x2, y2, _, _, class_pred in tracked_road_entities: 
                    box_height = int(((y2 - y1) / unpadded_height) * image.shape[0])
                    box_width = int(((x2 - x1) / unpadded_width) * image.shape[1])
                    y1 = int(((y1 - padding_y // 2) / unpadded_height) * image.shape[0])
                    x1 = int(((x1 - padding_x // 2) / unpadded_width) * image.shape[1])
                    class_name = classes[int(class_pred)]
                    if class_name in RoadEntityDetectionUtils.road_entities: 
                        collision_risk = False
                        if collision_risk:
                            color = (0, 0, 255, 1.0)
                        else: 
                            color = (103, 230, 114, 1.0)
                        cv2.rectangle(frame, (x1, y1), (x1 + box_width, y1 + box_height), color, 4)
                        cv2.rectangle(frame, (x1, y1 - 35), (x1 + len(class_name) * 19, y1), color, -1)
                        cv2.putText(frame, class_name, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 3)
            road_entity_detected_frames.append(frame)
        return road_entity_detected_frames            

In [7]:
# CarCam computer vision pipeline
def carcam_pipeline(inputvid_filename, outputvid_filename):
    print('Input video: {}'.format(inputvid_filename))
    print('Executing CarCam computer vision pipeline. Please wait...')
    frames = VideoUtils.extract_frames(inputvid_filename)
    lane_detection_frames = LaneDetectionUtils.detect_lanes(frames)
    road_entity_detection_frames = RoadEntityDetectionUtils.detect_road_entities(lane_detection_frames)
    VideoUtils.compile_frames(road_entity_detection_frames, outputvid_filename)
    cv2.destroyAllWindows()
    print('Output video: {}'.format(outputvid_filename))
    print('All done, check out your output video!')

In [8]:
# Execute pipeline
carcam_pipeline('input.mov', 'output.mp4')

Input video: input.mov
Executing CarCam computer vision pipeline. Please wait...


  avg = a.mean(axis)
  ret = ret.dtype.type(ret / rcount)


Output video: output.mp4
All done, check out your output video!
