In [1]:
import cv2
import numpy as np
from collections import deque

import math
import time

from tqdm.notebook import tqdm, tnrange

<!-- __/!\ /!\ /!\__ Ne possède ni l'intertie de la direction ni la jauge de vitesse __/!\ /!\ /!\__ -->

Le fichier *line_prediction.py* dans le dossier titaniumcar reprend le code créé ici.

# Prise de decision faible

Après un pre-process commun (gray scale + blur + edge detection + region of interest), la fonction "HoughLinesP" est appliquée.

Avec ces lignes, le programme récupère le point de croisement entre la droite et le haut de l'image. Une moyenne pondérée par la longueur de la ligne est faite. Ce point résultant détermine la direction et la vitesse.

Cf partie 5 du rapport

### Segmentation du code

 1. Définition des constantes
 2. Création de la chaine de traitement
     * Pre-process
     * Hough lines
 3. Modèle pour la prise de décision
 5. Application à une vidéo

# Définition des constantes

In [2]:
WIDTH, HEIGHT = 456, 228

FOLDER_PATH = "../data/videos/"
OUTPUT_PATH = "../data/outputs/"

# Création de la chaine de traitement

In [3]:
class ProcessChain:
    def canny_trsf(self, image):
        """
        Apply a Gaussian blur to smooth the image
        Edge detection from an RGB image

        @param image: a RGB OpenCV image
        @return: the edges of image
        """
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        blur = cv2.GaussianBlur(gray, (3, 3), 0)
        canny = cv2.Canny(blur, 20, 100)
        return canny

    def region_of_interest(self, image):
        """
        Set to (0, 0, 0) each pixel outside the roi

        @param image: a grayscale OpenCV image
        @return: the image with only roi
        """
        height = image.shape[0]
        # coordinates of the roi
        poly = np.array([
            (0, 131),
            (0, height),
            (454, height),
            (454, 131),
            (300, 94),
            (150, 94)
        ])
        mask = np.zeros_like(image)
        cv2.fillPoly(mask, (poly,), 255)
        masked_image = cv2.bitwise_and(image, mask)
        return masked_image
        
    def detect_lines(self, image):
        """
        Find the segments in the picture
        
        @param image: a grayscale OpenCV image with only bound
        @return: lines in a Numpy array  
        """
        lines = cv2.HoughLinesP(image, 3, np.pi/180, 100, np.array([]), minLineLength=37, maxLineGap=37)
        return lines
        
    def line_process(self, lines):
        """
        Find the point of convergence of lines
        The lines are filtered to keep only relevant lines
        
        @param lines: list or Numpy array with coordinates of lines
        @return: None if all lines are not relevant else float
        """
        if lines is None:
            return None

        lenghts = np.array([])
        origins = np.array([])
        nb_ignored = 0
        
        for line in lines:
            # get lenght of the line
            # and coordinate of intersection of line and abscissa
            x1, y1, x2, y2 = line.reshape(4)
            if y1 == y2:
                continue
            
            a = (x1-x2)/(y1-y2)
            b = x1 - a*y1
            
            lenght = np.sqrt(np.square(x1-x2)+np.square(y1-y2))
            # if intersection point if too away, remove it
            if 0-300 < b < 456+300:
                origins = np.append(origins, b)
                lenghts = np.append(lenghts, lenght)
            # when length is too small, the uncertainty of the direction is too large 
            # so, just increment counter
            elif lenght > 75:
                nb_ignored += 1
        
        if len(origins) == 0:
            return None
        else:
            pt_mean = np.average(origins, weights=lenghts)  
            
            # If the convergence point is on the sides,
            # the ignored lines could be revelant  
            if pt_mean > 0.5:
                pt_mean *= 1 + nb_ignored / 3
            return  pt_mean

    def transform(self, image):
        """
        Apply all transformations
        
        @param image: a OpenCV image of dimension (456, 228, 3)
        @return: mean direction of the edges of the circuit (float)
        """
        image = self.canny_trsf(image)
        image = self.region_of_interest(image)
        lines = self.detect_lines(image)
        
        return self.line_process(lines)

# Model pour la prise de décision

In [4]:
class Image2Prediction():
    """
    Wrap the whole process from frame to apply predicted speed and direction
    """
    def __init__(self):
        """
        Initialization of the attributes
        and create preprocess pipeline with ProcessChain class
        """
        self.process = ProcessChain()
        self.queue = deque([0 for _ in range(12)])
    
    def analyze(self, frame):
        """
        For each frame, this method is called
        The steps are :
         * get the mean direction of the edges of the circuit  
         * predict the direction and the speed
        
        @param frame: a Numpy array usable like a OpenCV image
        
        @return dir_prediction: the predicted direction (between -1 and 1)
        @return speed_prediction: the predicted speed (between -1 and 1)
        """
        pt = self.process.transform(frame)
        if pt is not None:
            dir_prediction, speed_prediction = self.predict(pt)
        else:
            dir_prediction, speed_prediction = None, 0.33
        return dir_prediction, speed_prediction

    def predict(self, x, shape=(HEIGHT, WIDTH)):
        """
        Predict the speed and the direction with the target point and the previous ones
        
        If the direction is straightforward for a while,
        the speed can be increase!
        
        @param x: the taget point of the car
        @param shape: video frame dimensions
        
        @return p_dir: the desired direction (between -1 and 1)
        @return p_speed: the desired speed (between -1 and 1)
        """
        x = (x-shape[1]/2)/shape[1]
        p_dir = 2.5 * np.arctan(x)
        
        self.queue.appendleft(p_dir)
        self.queue.pop()
        
        if p_dir > 1:
            p_dir = 1
        elif p_dir < -1:
            p_dir = -1
        
        dA = np.mean([self.queue[i] for i in range(4)])
        dB = np.mean([self.queue[-i] for i in range(4)])
        p_speed = 1 - (np.abs(dA)*0.9)
                
        return p_dir, p_speed

# Application à une vidéo

In [5]:
cap = cv2.VideoCapture(FOLDER_PATH + 'move_by_hand1.mp4')
out = cv2.VideoWriter(
    OUTPUT_PATH + 'hough_lines_prediction.mp4',
    cv2.VideoWriter_fourcc('M','J','P','G'),
    20, (WIDTH, HEIGHT)
)

if (cap.isOpened()== False): 
    print("Error opening video stream or file")
    
# Progress bar
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
pbar = tqdm(total=length, desc="Frames")

model = Image2Prediction()

while(cap.isOpened()):
    # Capture frame-by-frame

    ret, frame = cap.read()
    if ret == True:
        dir_prediction, speed_prediction = model.analyze(frame)
        
        if dir_prediction:
            # translate the prediction to coordinates
            val_dir = dir_prediction*WIDTH/2
            val_dir += WIDTH/2
            val_dir = int(val_dir)
            val_speed = speed_prediction*HEIGHT/2
            val_speed += HEIGHT/2
            val_speed = HEIGHT - val_speed
            val_speed = int(val_speed)
            
        # show prediction in the frame
            cv2.circle(frame, (val_dir, val_speed), 10, (255, 0, 0), -1)
        # else draw red circle in the frame center
        else:
            cv2.circle(frame, (456//2, 228//2), 10, (0, 0, 255), -1)
        out.write(frame)
        pbar.update()
        
    # break at the end of the video
    else: 
        break

pbar.close()
out.release()
cap.release()

Frames:   0%|          | 0/3603 [00:00<?, ?it/s]