PIXELFINDV2

This script takes in an image of a road, and attepts to use a pixel matching algorithm to find the road area within the image. A mask is created that could be used to crop the image leaving only the road, however this is not done with this script

Pandas - maths library  
Numpy - Array management  
Matplotlib - plotting and image dispalying  
requests - URL requests  
cv2 - Image processing library  
queue - create and manage queue objects  
itertools - iteration over more complex structures  
pynput - allows input from user  

'currentImage' defines the starting point for iteration through the folder of images  
'initP' defines the initial pixel where the algorithm starts

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import cv2
import queue
import itertools
from pynput import keyboard

currentImage = 1
initP = (700,450)

This function defines the inital starting point for the algorithm. The 'xRange' and 'yRange' variables define the size of this starting area, which is centred around the inital pixel set above. All of the pixels within this area are added to a list , which is then iterated through to obtain the average H,S,V and R,G,B values of the entire area. These are saved as the mean and variance, and allow the algorithm to have a solid starting point rather than relying on a single pixel

In [None]:
def initColors(imgBGR, imgHSV):
    yRange = 30
    xRange = 30
    xList = []
    yList = []
    H,S,V,B,G,R = [],[],[],[],[],[]
    for i in range(-xRange,2*xRange):
        x = initP[0] + i
        y = initP[1] + i
        xList.append(x)
        yList.append(y)

    neighbours = list(itertools.product(xList, yList))

    for index,pixel in enumerate(neighbours):
        H.append(np.int_(imgHSV[pixel[0],pixel[1]][0]))
        S.append(np.int_(imgHSV[pixel[0],pixel[1]][1]))
        V.append(np.int_(imgHSV[pixel[0],pixel[1]][2]))

        B.append(np.int_(imgBGR[pixel[0],pixel[1]][0]))
        G.append(np.int_(imgBGR[pixel[0],pixel[1]][1]))
        R.append(np.int_(imgBGR[pixel[0],pixel[1]][2]))
    
    meanH = np.mean(H)
    meanS = np.mean(S)
    meanV = np.mean(V)
    meanB = np.mean(B)
    meanG = np.mean(G)
    meanR = np.mean(R)
    varH = np.var(H)
    varS = np.var(S)
    varV = np.var(V)
    varB = np.var(B)
    varG = np.var(G)
    varR = np.var(R)
    return meanH,meanS,meanV,meanB,meanG,meanR,varH,varS,varV,varB,varG,varR

This is the start of the main function for this script. It begins by setting up three unique sets that will be used to contain groups of pixels later on.  
'prevP' is the set of all pixels that have previously been run through the algorithm  
'failed' is the set of pixels that have failed the algorithm test and cannot be added to the queue again.  
'inQueue' is the set of pixels that have not been tested yet, but have been identified as neighbours of pixels in the 'prevP' set, and are therefor in the queue to be tested  
A few other variables are also set up here:  
Hprev etc store the previous H,S,V and R,G,B values so the current number can be checked against them  
'img' and 'img2' hold the image that is being run against the algorithm currently  
'edges' uses the edge detection function within OpenCV to find the edges of the image
'lines' uses the Hough line detection function within OpenCV to find straight lines within the image which are then written onto the image using a for loop  
A HSV copy of the image is created as well, and the color averages are initialised using the above function. As the final step of the setup the weights of the 6 color values are set, which should be changed to affect how the algorithm spreads

In [None]:
def runImage(imageNum):
    basePath = "images\"
    
    qP = queue.Queue()
    prevP = set()
    failed = set()
    inQueue = set()
    Hprev,Sprev,Vprev,Bprev,Gprev,Rprev = 0,0,0,0,0,0
    img = cv2.imread(basePath +"img"+str(imageNum)+".png")
    img2 = cv2.imread(basePath +"img"+str(imageNum)+".png")
    edges = cv2.Canny(img,0,500)
    lines = cv2.HoughLines(edges, 1, np.pi/90, 200)
    try:
        for line in lines:
            rho,theta = line[0]
            # skip near-vertical lines
            if (abs(theta-np.pi/45) < np.pi/9) or (abs(np.pi/2-np.pi/15 <theta < np.pi/2+np.pi/15)):
                continue
            a = np.cos(theta)
            b = np.sin(theta)
            x0 = a*rho
            y0 = b*rho
            x1 = int(x0 + 10000*(-b))
            y1 = int(y0 + 10000*(a))
            x2 = int(x0 - 10000*(-b))
            y2 = int(y0 - 10000*(a))
            #cv2.line(img,(x1,y1),(x2,y2),(255,255,255),2)
            cv2.line(img2,(x1,y1),(x2,y2),(255,255,255),2)
    except:
        pass

    edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)



    imgHSV = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    imgHSV = cv2.add(imgHSV,edges)
    imgBGR = cv2.add(img,edges)

    meanH,meanS,meanV,meanB,meanG,meanR,varH,varS,varV,varB,varG,varR = initColors(imgBGR,imgHSV)
    img2 = cv2.add(img2,edges)
    
    width,height = imgHSV.shape[:2]
    print(width)
    print(height)

    Hweight = 6
    Sweight = 0.2
    Vweight = 0.2
    Bweight = 0.5
    Gweight = 0.6
    Rweight = 0.5


    qP.put(initP)
    count = 0

This while loop is the core of the function and keeps iterating until the queue of pixels is complete. The queue is seeded with the initial pixel originally, and finds the neighbouring pixels of this starting point. The 6 color values of these neighbours are compared to the average color values of all the pixels calculated so far (initially using the starting area as described above). The difference in value for all 6 color variables is then multiplied with the variable weights set above, and summed together. If this total sum is less than the threshold the neighbouring pixel is accepted and added to the 'inQueue' set as long as it is not already within one of the three sets described in the setup. It is then added to the end of the 'qP' queue.If it is not within any of the sets but fails the threshold test it is instead placed into the 'failed' set and not added to the queue. This set process ensures that failed pixels are never added to the queue and that no pixel gets placed into the queue twice.  
Finally, the current pixel gets popped off the front of the queue and added to the 'prevP' set to indicate it has already been through the algorithm and shouldnt be tested again. The queue then moves onto the next pixel in the queue. This process allows each pixel in the image to be checked to see if its color values match the road, while also allowing the average of the road colors to change over time as the algorithm gathers more data

In [None]:
while not qP.empty():
        currP = qP.get()
        count = count +1

        prevP.add(currP)
        currX = currP[0]
        currY = currP[1]

        nX = currX -1
        nY = currY -1
        pX = currX +1
        pY = currY +1

        cH = np.int_(imgHSV[currX,currY][0])
        cS = np.int_(imgHSV[currX,currY][1])
        cV = np.int_(imgHSV[currX,currY][2])

        

        img2[currX,currY] = [0,255,0]
        cB = np.int_(imgBGR[currX,currY][0])
        cG = np.int_(imgBGR[currX,currY][1])
        cR= np.int_(imgBGR[currX,currY][2])

        Hprev = Hprev + cH
        Sprev= Sprev + cS
        Vprev= Vprev + cV
        Bprev= Bprev + cB
        Gprev= Gprev + cG
        Rprev= Rprev + cR

        avgH = (meanH + Hprev)/(count+1)
        avgS = (meanS + Sprev)/(count+1)
        avgV = (meanV + Vprev)/(count+1)
        avgB = (meanB + Bprev)/(count+1)
        avgG = (meanG + Gprev)/(count+1)
        avgR = (meanR + Rprev)/(count+1)
        
        group1 = [currX,nX,pX]
        group2 = [currY,nY,pY]

        neighbours = list(itertools.product(group1, group2))
        for nP in neighbours:
            if (0 <nP[0] < width) and (0 <nP[1] < height) and not(nP in prevP) and not (nP in failed) and not (nP in inQueue):
                
                H = imgHSV[nP[0],nP[1]][0]
                S = imgHSV[nP[0],nP[1]][1]
                V = imgHSV[nP[0],nP[1]][2]

                B = imgBGR[nP[0],nP[1]][0]
                G = imgBGR[nP[0],nP[1]][1]
                R = imgBGR[nP[0],nP[1]][2]

                HfromAvg = abs(avgH-H) * Hweight
                SfromAvg = abs(avgS-S) * Sweight
                VfromAvg = abs(avgV-V) * Vweight
                BfromAvg = abs(avgB-B) * Bweight
                GfromAvg = abs(avgG-G) * Gweight
                RfromAvg = abs(avgR-R) * Rweight

                totalFromAverage = HfromAvg + SfromAvg + VfromAvg + BfromAvg+ GfromAvg+ RfromAvg

                test = (abs(avgB-B) + abs(avgG-G) + abs(avgR-R))/3
                testB = abs(test - abs(avgB-B))
                testG = abs(test - abs(avgG-G))
                testR = abs(test - abs(avgR-R))


                #HfromVari = abs(vH-H)
                #SfromVari = abs(vS-S)
                #VfromVari = abs(vV-V)
                #BfromVari = abs(vB-B)
                #GfromVari = abs(vG-G)
                #RfromVari = abs(vR-R)

                #totalFromVariance = HfromVari + SfromVari + VfromVari + BfromVari +GfromVari +RfromVari


                #if (avgH - diffH) <= H <= (avgH + diffH) and (avgS - diffS) <= S <= (avgS + diffS) and (avgV - diffV) <= V <= (avgV + diffV):    
                if not (totalFromAverage > 80):
                    img2[nP[0],nP[1]] = [255,0,0]
                    inQueue.add(nP)
                    qP.put(nP)
                else:
                    failed.add(nP)
                    img2[nP[0],nP[1]] = [255,255,255]

        if (count % 10000)==0:
            cv2.imshow("img", img2)
            cv2.waitKey(1)
            print (currP)

cv2.imshow("img", img2)
cv2.waitKey(0)

Below are helper functions for testing that allow the image to be cycled by pressing the arrow keys. This was used to help finetune the weightings by making it easier to go through multiple images

In [None]:
def on_press(key):
    try:
        print('alphanumeric key {0} pressed'.format(
            key.char))
            
    except AttributeError:
        print('special key {0} pressed'.format(
            key))

In [None]:
def on_release(key):
    global currentImage
    print('{0} released'.format(
        key))
    if key == keyboard.Key.esc:
        # Stop listener
        return False
    if key == keyboard.Key.left:
        if currentImage > 0:
            currentImage = currentImage -1
            runImage(currentImage)
        else:
            print("At First Image")
    if key == keyboard.Key.right:
        if currentImage  < 4500:
            currentImage = currentImage +1
            runImage(currentImage)
        else:
            print("At Last Image")

In [None]:
with keyboard.Listener(
        on_press=on_press,
        on_release=on_release) as listener:
    listener.join()
