In [1]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import random
import cv2

In [2]:
def getEdge(img, kernel_size=(5, 5), low_thr=50, high_thr=150):
    # Define a kernel size and apply Gaussian smoothing
    blured = cv2.GaussianBlur(img, kernel_size, 0)

    edges = cv2.Canny(blured, low_thr, high_thr)

    return edges

def getLines(pts_img_like, threshold=1, min_line_length=20, max_line_gap=5):
    # Define the Hough transform parameters
    # Make a blank the same size as our image to draw on
    rho = 1           # pixel resolution: 1 pixel
    theta = np.pi/180 # angle resolution: 1 degree
    
    # Run Hough on edge detected image
    lines = cv2.HoughLinesP(pts_img_like, rho, theta, threshold, np.array([]),
                                min_line_length, max_line_gap)
    return lines

def getLineLength(line):
    return np.sqrt((line[:,0] - line[:,2])**2 + (line[:,1] - line[:,3])**2)

def distanceToLine(line, pt):
    st = line[:,0:2]
    end = line[:,2:]
    vec0 = end - st
    vec1 = pt - st
    area = vec0[:,0]*vec1[:,1] - vec0[:,1]*vec1[:,0]
    length = area / getLineLength(line)
    return length

def calHoughSpace(line, center):
    theta = np.arctan((line[:,3]-line[:,1])/(line[:,2]-line[:,0]))
    r = distanceToLine(line, center)
    return np.stack((theta, r), axis = 1)


In [3]:
def perp( a ) :
    b = np.empty_like(a)
    b[0] = -a[1]
    b[1] = a[0]
    return b

# line segment a given by endpoints a1, a2
# line segment b given by endpoints b1, b2
# return 
def seg_intersect(a1,a2, b1,b2) :
    da = a2-a1
    db = b2-b1
    dp = a1-b1
    dap = perp(da)
    denom = np.dot( dap, db)
    num = np.dot( dap, dp )
    return (num / denom.astype(float))*db + b1

In [4]:
class RoadEstimator:
    def __init__(self, imgSize):
        self.ScatterScale = 0.5        # Map -1.57~1.57 to 0~resolution
        self.ScatterResolution = 100   # theta * Scale * Resoultion
        self.ScatterDecay = 0.9
        self.houghScatter = np.zeros((self.ScatterResolution, 3), dtype=np.float)
        self.imgSize = imgSize
        self.center = (self.imgSize[0]//2, 300)
        self.topLimit = 200
        self.weightedBounds = np.ones((4,2)) * -1
        self.trustable = np.array([False, False])
    
    def HoughPolarToXYLine(self, houghPoint):
        center = self.center
        th_angle = (houghPoint[0]-self.ScatterResolution/2)/self.ScatterResolution/self.ScatterScale
        l = houghPoint[2]/np.sin(th_angle)
        lowerPoint = (int(center[0]+l), center[1])
        upperPoint = (int(center[0]+l-(center[1]-self.topLimit)/np.tan(th_angle)), self.topLimit)
        return lowerPoint, upperPoint
    
    def calMainWeight(self, lines, houghPoints):
        # factors
        lineLength = getLineLength(lines)
        lineDisX = abs(lines[:,0]+lines[:,2])/2 - self.imgSize[0]/2
        lineDisY = self.imgSize[1] - abs(lines[:,1]+lines[:,3])/2
        lineDis = np.sqrt(lineDisX**2 + lineDisY**2)
        
        # Calculate weight for determining main line
        #factorA = houghPoints[:, 0] * houghPoints[:, 1] / np.mean(houghPoints[:, 1])
        factorB = lineLength / np.mean(lineLength)
        factorC = lineDisY / np.mean(lineDisY)
        factorD = -(lineDis / np.mean(lineDis))
        
        weight = houghPoints[:, 0] * (factorB+factorC+factorD) * 10
        return weight
    
    def calHoughScaledArray(self, lines):
        houghPoints = calHoughSpace(lines, self.center)
        
        # Filter nan values
        m = np.logical_not(np.isnan(houghPoints[:,1]))
        houghPoints = houghPoints[m]
        
        weight = self.calMainWeight(lines, houghPoints)
        
        # change into array size
        x = (houghPoints[:, 0] * self.ScatterScale*self.ScatterResolution).astype(int)
        
        # Clip into array size
        x = x.clip(-self.ScatterResolution//2, self.ScatterResolution//2-1)
        
        # parameters reset
        self.houghScatter[:, 1] = 0    # real r
        self.houghScatter[:, 2] = 0    # number of r samples
        for (dx, dw, dr) in zip(x, weight, houghPoints[:, 1]):
            self.houghScatter[dx+self.ScatterResolution//2, 0] += dw
            self.houghScatter[dx+self.ScatterResolution//2, 1] += dr
            self.houghScatter[dx+self.ScatterResolution//2, 2] += 1
        # samples = 0
        self.houghScatter[self.houghScatter[:, 2] <= 0.001, 2] = 1000000
        # average r
        self.houghScatter[:, 1] /= self.houghScatter[:, 2]
        
        # Filter minimux & maximum
        self.houghScatter[0, 0]=0
        self.houghScatter[-1, 0]=0
        mean = np.mean(self.houghScatter[:,0])
        #std = np.std(self.houghScatter[:,0])
        # concate all coulumns -> [ theta, weight, real_r, number_of_r_samples]
        scaled = np.column_stack((np.arange(self.ScatterResolution, dtype=np.int16), 
                                  self.houghScatter[:,0] / mean, 
                                  self.houghScatter[:,1:]))
        # decay
        self.houghScatter[:,0] *= self.ScatterDecay
        return scaled
        
    """
    Input: Hought Scaled array with weight
    Output: True/False Array
    """
    def getMainRoadPoints(self, scaled):
        criteria = np.logical_and(np.absolute(scaled[:,1])>15, scaled[:,2]>0.001)
        points = scaled[criteria]
        return points
    
    def drawSummary(self, img, scaled, points, mainPoints, roiRect, intersect):
        for p in scaled:
            startP = (int( p[0]-50+width//2 ), 200)
            cv2.line(img, startP, (startP[0], int(200-int(abs(p[1])))), (0, 0, 255), 1)
            cv2.line(img, startP, (startP[0], int(200+int(abs(p[2])))), (0, 255, 0), 1)
            
            
        for p in points:
            cv2.circle(img, (int(p[0])-50+width//2, 200), 3, (0, 255, 255), 1)
            
        for p in mainPoints:
            cv2.circle(img, (int(p[0])-50+width//2, 200), 5, (255, 255, 0), 2)
        """
        # left max
        cv2.circle(img, (int(mainPoints[0,0])-50+width//2, 200), 5, (255, 255, 0), 2)
        # Left avr
        cv2.circle(img, (int(mainPoints[0,1])-50+width//2, 200) , 5, (255, 0, 0), 2)
        # Right max  
        cv2.circle(img, (int(mainPoints[1,0])-50+width//2, 200), 5, (255, 255, 0), 2)
        # Right avr
        cv2.circle(img, (int(mainPoints[1,1])-50+width//2, 200) , 5, (255, 0, 0), 2)
        """
        #cv2.line(img, frameBound[0][0], frameBound[0][1], (255, 255, 255), 4)
        #cv2.line(img, frameBound[1][0], frameBound[1][1], (0, 0, 0), 4)
        if (np.all(self.trustable)):
            for i in range(4):
                cv2.line(img,tuple(roiRect[-1+i].astype(int)),tuple(roiRect[i].astype(int)),(0,255,0),2)

            cv2.circle(img, tuple(intersect.astype(int)), 5, (0, 255, 0), 3)
        return img
    
    def analyzePoints(self, img, points):
        lefts = points[points[:,0]<self.ScatterResolution//2]
        rights = points[points[:,0]>self.ScatterResolution//2]
        
        mainPoint = []
        # tl, tr, br, bl
        if(lefts.shape[0] > 0):
            self.trustable[0] = True
            leftMax = lefts[np.argmax(np.absolute(lefts), axis=0)[1]]
            mainPoint.append(leftMax)
            #mainPoint.append(np.mean(lefts, axis=0)[0])
            
            bounds = self.HoughPolarToXYLine(leftMax)
            cv2.line(img, bounds[0], bounds[1], (255, 255, 255), 4)
            
            if(np.any(self.weightedBounds[0] == -1) or np.any(self.weightedBounds[3] == -1)):
                self.weightedBounds[3] = bounds[0]
                self.weightedBounds[0] = bounds[1]
            else:
                self.weightedBounds[3] = self.weightedBounds[3]*0.9 + np.array(bounds[0])*0.1
                self.weightedBounds[0] = self.weightedBounds[0]*0.9 + np.array(bounds[1])*0.1
                
        if(rights.shape[0] > 0):
            self.trustable[1] = True
            rightMax = rights[np.argmax(np.absolute(rights), axis=0)[1]]
            mainPoint.append(rightMax)
            #mainPoint.append(np.mean(rights, axis=0)[0])
            
            bounds = self.HoughPolarToXYLine(rightMax)
            cv2.line(img, bounds[0], bounds[1], (0, 0, 0), 4)
            
            if(np.any(self.weightedBounds[2] == -1) or np.any(self.weightedBounds[1] == -1)):
                self.weightedBounds[2] = bounds[0]
                self.weightedBounds[1] = bounds[1]
            else:
                self.weightedBounds[2] = self.weightedBounds[2]*0.9 + np.array(bounds[0])*0.1
                self.weightedBounds[1] = self.weightedBounds[1]*0.9 + np.array(bounds[1])*0.1
        
        #print(self.weightedBounds)
        return mainPoint
    
    def getRoadRect(self):
        roadHeight = 400
        roadFlatten = 40
        
        #if self.trustable:
        intersect = seg_intersect(self.weightedBounds[1], self.weightedBounds[2], 
                                  self.weightedBounds[0], self.weightedBounds[3])

        #minWidth = self.weightedBounds[1,0] - self.weightedBounds[0,0]
        #if(minWidth > 0):

        roiRect = np.copy(self.weightedBounds)
        roiRect[0] = intersect + np.array((-roadFlatten, -10))
        roiRect[1] = intersect + np.array((roadFlatten, -10))
        roiRect[2] = self.weightedBounds[2] + np.array((roadFlatten, 0))
        roiRect[3] = self.weightedBounds[3] - np.array((roadFlatten, 0))

        

        maxWidth = roiRect[2,0] - roiRect[3,0]
        maxHeight = roiRect[2,1] - roiRect[1,1]
        """
        dst = np.array([
            [0, 0],
            [maxWidth - 1, 0],
            [maxWidth - 1, maxHeight - 1],
            [0, maxHeight - 1]], dtype = "float32")
        """
        dst = np.array([
            [0, 0],
            [maxWidth - 1, 0],
            [maxWidth - 1, roadHeight - 1],
            [0, roadHeight - 1]], dtype = "float32")
        imgSize = (int(maxWidth), int(roadHeight))
        
        return roiRect, dst, imgSize, intersect

In [7]:
# Read in and grayscale the image
"""
img = mpimg.imread('exit-ramp.jpg')
height = img.shape[0]
width = img.shape[1]
"""
cap = cv2.VideoCapture('highway_record4.mp4')

width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))  # float
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # float
fps = cap.get(cv2.CAP_PROP_FPS)

roi_base = (0, height//2)
roi_size = (width, height*3//4)

roadFrame = RoadEstimator(imgSize=(width, height))
while(cap.isOpened()):
#for i in range(3):
    ret, frame = cap.read()
    #frame = img
    roi = np.zeros([height,width], dtype=np.uint8)
    roi[height*3//5:height*4//5,0:width] = 255
    
    img = cv2.cvtColor(frame, cv2.COLOR_RGB2HLS)
    roi=cv2.add(img, np.zeros(np.shape(frame), dtype=np.uint8), mask=roi)
    
    #for idx in range(3):
    idx = 1
    channel = roi[:,:,idx]
    channel = cv2.morphologyEx(channel, cv2.MORPH_OPEN, (3, 3))
    edges = getEdge(channel)
    
    lines = np.squeeze(getLines(edges, threshold=1, min_line_length=10), axis=1)

    name = 'edge'+str(idx)
    cv2.imshow(name,roi)
    
    line_image = np.copy(frame) #creating a blank to draw lines on
    
    for x1,y1,x2,y2 in lines:
        cv2.line(line_image,(x1,y1),(x2,y2),(0,255,0),2)
    
    scaled = roadFrame.calHoughScaledArray(lines)
    points = roadFrame.getMainRoadPoints(scaled)
    mainPoints = roadFrame.analyzePoints(line_image, points)
    
    wrap_image = np.copy(frame)
    test = np.copy(frame)
    
    if(np.all(roadFrame.trustable)):
        roiRect, dst, imgSize, intersect = roadFrame.getRoadRect()
        
        M = cv2.getPerspectiveTransform(np.array(roiRect, dtype="float32"), dst)
        wrap_image = cv2.warpPerspective(wrap_image, M, imgSize)
        
        #cv2.polylines(img=test, pts=roiRect, isClosed=True, color=(0, 0, 255), thickness=-1 )
        
        #roi=cv2.add(img, np.zeros(np.shape(frame), dtype=np.uint8), mask=roi)
        wrap_gray = cv2.cvtColor(wrap_image, cv2.COLOR_RGB2GRAY)
        wrap_edge = getEdge(wrap_gray, kernel_size=(3, 3), low_thr=1, high_thr=200)
        #wrap_image = cv2.warpPerspective(wrap_image, M, imgSize, cv2.WARP_INVERSE_MAP)
        edge_lines = np.squeeze(getLines(wrap_edge, threshold=5, min_line_length=5), axis=1)
        for x1,y1,x2,y2 in edge_lines:
            cv2.line(wrap_image,(x1,y1),(x2,y2),(0,255,0),2)
            
        cv2.imshow('name', wrap_image)
        cv2.imshow('test', test)
        
    
        line_image = roadFrame.drawSummary(line_image, scaled, points, mainPoints, roiRect, intersect)
    
    cv2.imshow('frame',line_image)
    if cv2.waitKey(100) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

