In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
from skimage.feature import canny
from skimage.transform import hough_line, hough_line_peaks
from skimage.measure import LineModelND, ransac
from copy import deepcopy
from sklearn.linear_model import LinearRegression

# Some of my functions just skip it

# Functions from Compare Algos

In [2]:
class FrameBuffer:
    def __init__(self, delta = 5, period = 2000):
        self.data = []
        self.delta = delta
        self.period = period
        self.maxLen = period/delta
        self.len = 0
    def store(self, frame):
        self.data.append(frame)
        self.len+=1
        if self.len>self.maxLen:
            self.data = self.data[1:]
            self.len-=1
    def express(self, distance):
        if distance//self.delta>self.len-2:
            out = self.data[0], self.data[-1]
        else:
            out = self.data[-1-distance//self.delta], self.data[-1]
        return out

In [3]:
class DetectedObject:#thread unsafe private class
    def __init__(self, x1, y1, x2, y2, time):
        self.x1, self.y1, self.x2, self.y2 = x1, y1, x2, y2
        self.center = (np.array([x1, y1], dtype='float64')+ np.array([x2, y2], dtype='float64'))/2#[(x1+x2)/2, (y1+y2)/2]
        self.xyxy_story =[[[self.x1, self.y1],[self.x2, self.y2]]]
        self.center_story = [self.center]
        self.time = [time]
    def update(self, x1, y1, x2, y2, time):
        self.x1, self.y1, self.x2, self.y2 = x1, y1, x2, y2
        self.center = (np.array([x1, y1])+np.array([x2, y2]))/2
        self.xyxy_story.append([[self.x1, self.y1],[self.x2, self.y2]])
        self.center_story.append(self.center)
        self.time.append(time)
    def print(self):
        print('xyxy       :', self.x1, self.y1, self.x2, self.y2)

In [4]:
class ObjectStore:
    def __init__(self):
        self.data = [] #iteration on it without mutex is thread unsafe
        self.deleted_data = []
        self.thresh = 0.5
        self.time = None
    def print(self):
        k = 0
        for i in self.data:
            print('######', k, '######')
            k+=1
            i.print()
        
        print('######END######')
    def find(self, x1, y1, x2, y2):#private function
        r = None
        for i in self.data:
            xx1 = max(x1, i.x1)
            xx2 = min(x2, i.x2)
            yy1 = max(y1, i.y1)
            yy2 = min(y2, i.y2)
            xxx1 = min(x1, i.x1)
            xxx2 = max(x2, i.x2)
            yyy1 = min(y1, i.y1)
            yyy2 = max(y2, i.y2)
            ww = xx2-xx1
            hh = yy2-yy1
            w = xxx2-xxx1
            h = yyy2-yyy1
            #print(ww, hh, w, h, ww*hh/(w*h))
            if ww<=0 or hh<=0:
                continue
            if ww*hh/(w*h)>self.thresh:
                return i, True
        return None, False
    def store(self, x1, y1, x2, y2, time):
        self.time = time
        i, F = self.find(x1, y1, x2, y2)
        if F:
            i.update(x1, y1, x2, y2, time)
        else:
            i = DetectedObject(x1, y1, x2, y2, time)
            self.data.append(i)
    def delete_old(self,delta=5):
        to_del = []
        centers = []
        for i, b in enumerate(self.data):
            if abs(b.time[-1]-self.time)>delta:
                to_del += [i]
                centers.append(deepcopy(b.center_story))
        for i in sorted(to_del, reverse=True):
            self.deleted_data +=[self.data[i]]
            del self.data[i]
        return centers

In [5]:
def prepareFrame(frame, crop=False, crop_v=[0, 200, 0, 800], ksize_val=5):
    if crop:
        frame = frame[crop_v[0]:crop_v[1], crop_v[2]:crop_v[3]]
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    frame = cv2.GaussianBlur(src=frame, ksize=(ksize_val, ksize_val), sigmaX=0)
    return frame
def compareFrames(frame1, frame2, dilate=True, dil_val=5, erode=True, erode_val=5, thresh_val=30):
    diff = cv2.absdiff(src1=frame1, src2=frame2)
    #cv2.imshow('Diff',diff)
    #diff = cv2.adaptiveThreshold(diff,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
    #        cv2.THRESH_BINARY,171,25)
    #cv2.imshow('Adaptive',diff)
   # _,diff_ = cv2.threshold(diff,100,255,cv2.THRESH_BINARY)
    #cv2.imshow('Binary',diff)
    if dilate:
        kernel = np.ones((dil_val, dil_val))
        diff = cv2.dilate(diff, kernel, 1)
        #cv2.imshow('Dilate',diff)
    if erode:
        kernel = np.ones((erode_val, erode_val))
        diff = cv2.erode(diff, kernel, 1)
        #cv2.imshow('Erode',diff)
    diff = cv2.threshold(src=diff, thresh=thresh_val, maxval=255, type=cv2.THRESH_BINARY)[1]
    contours, _ = cv2.findContours(image=diff, mode=cv2.RETR_EXTERNAL, method=cv2.CHAIN_APPROX_SIMPLE)
    return diff, contours
def filterContours(contours, area_val=5, wh_val=10):
    con_f = []
    
    for contour in contours:
        area = cv2.contourArea(contour)
        if area < area_val:
            continue
        (x, y, w, h) = cv2.boundingRect(contour)
        if w*h<wh_val:
            continue
        con_f.append(contour)
    return con_f
def drawContours(im, contours, crop=False, crop_v=[0, 200, 0, 800], color=(0, 255, 255), thickness=1, with_rectangles=False, color_rectangles=(255,0,255), thickness_rectangles=2):
    if crop and (crop_v[0]!=0 or crop_v[2]!=0):
        contours = contours.copy()
        for contour in contours:
            contour += [crop_v[0], crop_v[2]]
    cv2.drawContours(image=im, contours=contours, contourIdx=-1, color=color, thickness=thickness, lineType=cv2.LINE_AA)
    if with_rectangles:
        for contour in contours:
            (x, y, w, h) = cv2.boundingRect(contour)
            cv2.rectangle(img=im, pt1=(x, y), pt2=(x + w, y + h), color=color_rectangles, thickness=thickness_rectangles)

In [6]:
def compare_two_frames(prev, curr, out, objectStore, time, area_val=50, wh_val=100,  crop=False, crop_v=[0, 200, 0, 800], draw=True, color=(0, 255, 255), color_super=(0, 0, 0), thickness=1, with_rectangles=False, color_rectangles=(255,0,0), thickness_rectangles=2):
    prev, curr = prepareFrame(prev, crop=crop, crop_v=crop_v, ksize_val=3), prepareFrame(curr, crop=crop, crop_v=crop_v, ksize_val=3)
    diff, cnt = compareFrames(prev, curr, dilate=True, dil_val=31, erode=True, erode_val=31, thresh_val=25)
    con_f = filterContours(cnt, area_val=area_val, wh_val=wh_val)
    for contour in con_f:
            (x, y, w, h) = cv2.boundingRect(contour)
            objectStore.store(x, y, x+w, y+h, time)
    if draw:
        drawContours(out, con_f, crop=crop, crop_v=crop_v, color=color, thickness=thickness, with_rectangles=with_rectangles, color_rectangles=color_rectangles, thickness_rectangles=thickness_rectangles)
    return con_f, diff

In [7]:
def drawObjectRectangles(pic, objectStore):
    for b in objectStore.data:
        cv2.rectangle(pic, pt1=(int(b.x1), int(b.y1)), pt2=(int(b.x2), int(b.y2)), color=(0, 255, 0), thickness=5)

# Functions to projects detected cars speed

In [8]:
def get_background(video, num_frames=50):
    frames_num = video.get(cv2.CAP_PROP_FRAME_COUNT) * np.random.uniform(size=num_frames)
    frames_num = frames_num.astype('int')
    frames = []
    for frame_num in frames_num:
        video.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        ret, frame = video.read()
        frames.append(frame)
    backgroundFrame = np.median(frames, axis=0).astype('int')
    return backgroundFrame

In [9]:
# two auxilary functions
def get_line(a, b, x_lim, y_lim=None):
    x = np.linspace(*x_lim)
    y = a * x + b
    if y_lim is not None:
        y = y[y>=y_lim[0]]
        y = y[y<=y_lim[1]]
    return x, y

def hough_thetaro_to_ab(theta, ro):
    a = -1 /np.tan(theta)
    b = ro / np.sin(theta)
    return a, b

def show(img):
    plt.figure(figsize=(16,16))
    if len(img.shape) == 3:
        img = img.astype('uint8')
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        plt.imshow(img)    
    else:
        plt.imshow(img, cmap='gray')

In [10]:
def get_lines(img, sigma=2):
    img = img.astype('uint8')
    if len(img.shape) == 3: 
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    else:
        gray = img.copy()
    edges_img = canny(gray, sigma=3)
    
    h_all, theta_all, ro_all = hough_line(edges_img)
    h_selected, theta_selected, ro_selected = hough_line_peaks(h_all, theta_all, ro_all)
    
    ab_selected = np.array(
        [hough_thetaro_to_ab(theta, ro)
         for theta, ro in zip(theta_selected, ro_selected)])
    model, inliers = ransac(ab_selected, LineModelND, min_samples=3, residual_threshold=0.05)
    y_max, x_max = edges_img.shape
    plt.figure(figsize = (8,5), dpi=300)
    a_mean = 0
    b_mean = 0
    for i, (theta, ro) in enumerate(zip(theta_selected, ro_selected)):
        if inliers[i]:
            a, b = hough_thetaro_to_ab(theta, ro)
            #plt.plot(*get_line(a, b, [0, x_max]), 'g', lw=2)
            a_mean += a
            b_mean += b
    a_mean /= len(theta_selected)
    b_mean /= len(theta_selected)
    
    return a_mean, b_mean, x_max

In [11]:
def project(xs, ys, dot):
    start = np.array([xs[0], ys[0]])
    end = np.array([xs[-1], ys[-1]])
    if np.linalg.norm(start - end) < 0.1:
        return dot
    t = np.sum((dot - start) * (end - start)) / (np.linalg.norm(end - start)**2)
    #t = max(0, min(1, t))
    projection = start + t * (end - start)
    return projection

In [12]:
def get_lines(img, sigma=2):
    img = img.astype('uint8')
    if len(img.shape) == 3: 
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    else:
        gray = img.copy()
    edges_img = canny(gray, sigma=3)
    
    h_all, theta_all, ro_all = hough_line(edges_img)
    h_selected, theta_selected, ro_selected = hough_line_peaks(h_all, theta_all, ro_all)
    
    ab_selected = np.array(
        [hough_thetaro_to_ab(theta, ro)
         for theta, ro in zip(theta_selected, ro_selected)])
    model, inliers = ransac(ab_selected, LineModelND, min_samples=3, residual_threshold=0.05)
    y_max, x_max = edges_img.shape
    plt.figure(figsize = (8,5), dpi=300)
    a_mean = 0
    b_mean = 0
    for i, (theta, ro) in enumerate(zip(theta_selected, ro_selected)):
        if inliers[i]:
            a, b = hough_thetaro_to_ab(theta, ro)
            #plt.plot(*get_line(a, b, [0, x_max]), 'g', lw=2)
            a_mean += a
            b_mean += b
    a_mean /= len(theta_selected)
    b_mean /= len(theta_selected)
    
    return (a_mean, b_mean, x_max)

In [13]:
# Use this functions with projected centers
def get_speed(centers):
    speed = np.array([np.linalg.norm(abs(centers[i+1] - centers[i])) for i in range(len(centers) - 1)])
    return speed

In [14]:
def get_centers(cap):
    
    centers = []
    
    skip = 1
    delta_compare = 4
    frameBuffer = FrameBuffer(delta=skip, period=delta_compare * 3)
    objectStore = ObjectStore()
    crop=False
    crop2=False
    crop_v2 = [0, 170, 0, 1000]
    time = 0
    prev, curr = None, None
    while(cap.isOpened()):
        ret, frame = cap.read()
        time += 1
        if time % skip != 0:
            continue
        if ret == True:
            out_pic = frame.copy()
            frameBuffer.store(frame)
            prev, curr = frameBuffer.express(delta_compare)
            con_f, diff = compare_two_frames(prev, curr, out_pic, objectStore, time, area_val=1000, wh_val=2000, crop=crop2, crop_v=crop_v2, draw=True, color=(0, 255, 255), thickness=3, with_rectangles=False, color_rectangles=(255,0,255), thickness_rectangles=5)
            
            centers_temp = objectStore.delete_old(20)
            if centers_temp != []:
                centers.append(deepcopy(centers_temp))
        elif ret == False:
            break
    cap.release()
    return centers

In [15]:
def process_centers(centers):
    # Obtaining the line on which the projection will occur
    video = cv2.VideoCapture(source)
    background = get_background(video)
    (a, b, x_max) = get_lines(background)
    (xs, ys) = get_line(a, b, [0, x_max])
    
    # Get normal array of centers of cars
    centers_reshaped = []
    for step in centers:
        for center in step:
            if len(center) >  4: 
                centers_reshaped.append(center)
                
    # Project each point of centers on the line
    centers_projected = []
    for center in centers_reshaped:
        temp_center = []
        for point in center:
            temp_center.append(project(xs, ys, point))
        centers_projected.append(temp_center)
         
    speed_arr = []
    coord_arr = []
    for center in centers_projected:
        center = np.array(center)
        coord_arr.append(center.mean(axis=0))
        speed = get_speed(center)
        speed_arr.append(speed.mean())

    coord_arr = np.array(coord_arr)
    speed_arr = np.array(speed_arr)
    model = LinearRegression()
    model.fit(coord_arr, speed_arr)
    predictions = model.predict(coord_arr)
    err = np.array([predictions[i] - speed_arr[i] for i in range(len(predictions))])
    std = err.std()
    
    return model, std, xs, ys

In [77]:
def jet_process_centers(centers, frame, model, std, xs, ys):
    centers_reshaped = []
    for center in centers:
        # Add only arrays, which have more than 4 entries
        if len(center) > 3:
            centers_reshaped.append(center)
    # Return if center_reshaped is empty
    if len(centers_reshaped) == 0:
        return frame
    
    # Project each point of centers on the line
    centers_projected = []
    for center in centers_reshaped:
        temp_center = []
        for point in center:
            temp_center.append(project(xs, ys, point))
        centers_projected.append(temp_center)
        
    speed_arr = []
    coord_arr = []
    for center in centers_projected:
        center = np.array(center)
        coord_arr.append(center.mean(axis=0))
        speed = get_speed(center)
        speed_arr.append(speed.mean())
        
    coord_arr = np.array(coord_arr)
    speed_arr = np.array(speed_arr)
    
    predictions = model.predict(coord_arr)
    err = np.array([(speed_arr[i] - predictions[i]) for i in range(len(predictions))])
    
    plot_red = []
    plot_yellow = []
    for ind, error in enumerate(err):
        if error >= 3 * std:
            plot_red.append(coord_arr[ind].astype('int'))
        elif error >= 2 * std:
            plot_yellow.append(coord_arr[ind].astype('int'))
    for red in plot_red:
        overlay = frame.copy()
        cv2.rectangle(overlay, (red[0]-150, red[1]-150), (red[0]+150, red[1] + 150), (0,0,255), -1)
        alpha = 0.4
        frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
    for yellow in plot_yellow:
        overlay = frame.copy()
        cv2.rectangle(overlay, (yellow[0]-150, yellow[1]-150), (yellow[0] + 150, yellow[1] + 150), (0,255,255), -1) 
        alpha = 0.4
        frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
    return frame

# Centers extraction from video "centers"

In [73]:
source = './vids/light1.mp4'
video = cv2.VideoCapture(source)

In [74]:
# Get centers of detected cars from video
centers = get_centers(video)

In [75]:
model, std, xs, ys = process_centers(centers)

<Figure size 2400x1500 with 0 Axes>

In [86]:
source = './vids/light1.mp4'
cap = cv2.VideoCapture(source)
skip = 1
delta_compare = 4
frameBuffer = FrameBuffer(delta=skip, period=delta_compare * 3)
objectStore = ObjectStore()
crop=False
crop2=False
crop_v2 = [0, 170, 0, 1000]
time = 0
prev, curr = None, None
while(cap.isOpened()):
    ret, frame = cap.read()
    time += 1
    if time % skip != 0:
        continue
    if ret == True:
        out_pic = frame.copy()
        frameBuffer.store(frame)
        prev, curr = frameBuffer.express(delta_compare)
        con_f, diff = compare_two_frames(prev, curr, out_pic, objectStore, time, area_val=1000, wh_val=2000, crop=crop2, crop_v=crop_v2, draw=True, color=(0, 255, 255), thickness=3, with_rectangles=False, color_rectangles=(255,0,255), thickness_rectangles=5)
        out_pic = frame.copy()
        centers_temp = objectStore.delete_old(20)
        if centers_temp != []:
            out_pic = jet_process_centers(centers_temp, out_pic, model, std, xs, ys)
        cv2.imshow('Frame', out_pic)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
    elif ret == False:
        break
cap.release()

cv2.destroyAllWindows()