In [1]:
#!pip install -r requirements.txt

import av
import cv2
import numpy as np
import skimage
from skimage.color import rgb2gray
from skimage.exposure import rescale_intensity
from PIL import Image
from matplotlib import pyplot as plt

In [2]:
def load_frames(video_path):
    
    frames = []
    v = av.open(video_path)
    for packet in v.demux():
        for frame in packet.decode():
            img = frame.to_image()
            arr = np.asarray(img)
            frames.append(arr)
    return frames

In [3]:
def equalize_intensity(img):
    p2, p98 = np.percentile(img, (0, 18))
    return rescale_intensity(img, in_range=(p2,p98))

In [4]:
def rgb_threshold(im, thresholds):
    'Thresholds RGB image'
    
    c = im.copy()
    
    mask = c[:,:,0] > thresholds[0][0]

    for i, (l_thr, u_thr) in enumerate(thresholds):
        mask &= (c[:,:,i] > l_thr)
        mask &= (c[:,:,i] < u_thr)
    
    c[~mask] = (0,0,0)
    
    return c

In [5]:
def createIndexGrid(image_shape, distance):
    'Creates an evenly spaced grid of coordinates across an image'
    
    indices = []
    
    for xi in range(image_shape[0]):
        for yi in range(image_shape[1]):
            if (xi%distance, yi%distance)==(0,0): 
                indices.append((xi, yi))            
    return indices

In [6]:
def neighbourInRange(lower, upper, neighbour, _):
    'Helper fn for region growing'
    
    return lower < neighbour < upper

In [7]:
def collectRegion(seed, visited, im, lower_threshold, upper_threshold, fn=neighbourInRange, HIGHEST_PIX_VALUE=255):
    'Returns a region of pixel coordinate neighbours which satisfy the region criterion set by _fn_'

    detected = set([seed])
    region = set()

    x_min = y_min = 0
    x_max, y_max = im.shape
    
    while len(detected):
        
        pix = detected.pop()
        
        if pix in visited: continue
                
        pix_val = im[pix]
        
        x, y = pix
    
        for xi in range(max(x-1, x_min), min(x+2, x_max), 2):
            if ((xi, y)) in visited: continue
            if fn(lower_threshold, upper_threshold, im[xi, y], pix_val): detected.add((xi, y))
        for yi in range(max(y-1, y_min), min(y+2, y_max), 2):
            if ((x, yi)) in visited: continue
            if fn(lower_threshold, upper_threshold, im[x, yi], pix_val): detected.add((x, yi))
                
        region.add(pix)
        visited.add(pix)
        
    return list(region)

In [8]:
def collectAllRegions(seeds, im, min_region_size, max_region_size, l_thr, u_thr, fn=neighbourInRange):
    'Runs collectRegion for every seed and returns a list of all connex regions in the image'
    
    regions = []
    visited = set()

    for seed in seeds:
        
        if seed in visited: continue
            
        region = collectRegion(seed, visited, im, l_thr, u_thr, fn=fn)
        
        if min_region_size <= len(region) <= max_region_size: regions.append(region)
        
        
    return np.array(regions), visited

In [9]:
def markIndices(im, indices, exaggerate=False):
    'Returns a copy of the image where indices are white'
    'Exaggerate to ease visualization'
    
    tp = im.copy()
    ind = np.array(indices)
    x, y = ind[:,0], ind[:,1]
    tp[x, y] = 255
    
    if exaggerate: 
        for i in range(-1,2):
            for j in range(-1,2):
                tp[x-i, y-j] = 255
    return tp

In [10]:
def rgb_to_binary(im):
    c = im.copy()
    grayscale = (rgb2gray(c)*256).astype('uint8')
    grayscale[grayscale != 0] = 1
    return grayscale

In [11]:
def locate_frame(region):
    max_x = max(region[:,0])
    max_y = max(region[:,1])
    min_x = min(region[:,0])
    min_y = min(region[:,1])
    
    return max_x, max_y, min_x, min_y

In [12]:
def draw_bw_frame(im_shape, max_x, max_y, min_x, min_y):
        
    g = np.zeros(im_shape)
    
    g[max_x-2:max_x+2, min_y:max_y] = 255
    g[min_x-2:min_x+2, min_y:max_y] = 255

    g[min_x:max_x, min_y-2:min_y+2] = 255
    g[min_x:max_x, max_y-2:max_y+2] = 255
    
    return g

In [13]:
def gray_to_color(im, color):
    colors = {'red':0, 'green':1, 'blue':2}
    assert (color in colors)
    
    c = colors[color]
    
    rgb_cell = [0,0,0]
    rgb_cell[c] = 255
    
    frame = np.array([[[0,0,0] for _ in range(im.shape[1])] for _ in range(im.shape[0])])
    
    frame[im>0] = rgb_cell
    
    return frame    

In [14]:
def locate_rgb_regions(rgb_im, seed, min_size, max_size, l_thr, u_thr):
    c = rgb_im.copy()
    
    black_white = rgb_to_binary(c)*255
    seeds = createIndexGrid(black_white.shape, seed)
    regions, _ = collectAllRegions(seeds, black_white, min_size, max_size, l_thr, u_thr)
    
    return regions

In [15]:
def overlap_frames(underlying, overlying):
    c = underlying.copy()
    c[np.where(overlying)] = overlying[np.where(overlying)]
    
    return c

In [16]:
def extract_candidate_frame(im, max_x, max_y, min_x, min_y):
    
    x_limit, y_limit = im.shape[:-1]
    
    width = max(max_x-min_x, max_y-min_y)

    x_delta = width - (max_x-min_x)
    min_x -= x_delta//2
    buffer = min(0, min_x)
    
    max_x += x_delta//2
    'If border point'
    if buffer: 
        max_x += (-buffer)
        min_x = 0
    else:
        buffer = max_x - (x_limit-1)
        if buffer > 0: 
            min_x -= buffer
            max_x = x_limit-1
    
    y_delta = width - (max_y-min_y)
    min_y -= y_delta//2
    buffer = min(0, min_y)

    max_y += y_delta//2
    'If border point'
    if buffer: 
        max_y += (-buffer)
        min_y = 0
    else:
        buffer = max_y - (y_limit-1)
        if buffer > 0: 
            min_y -= buffer
            max_y = y_limit-1
            
    img = im.copy()
    img = img[min_x:max_x, min_y:max_y]
    
    img = Image.fromarray(img)
    img = img.resize((32,32))
    
    return np.array(img)

In [17]:
def output_frames(frames, path):
    container = av.open(path, mode='w')
    
    stream = container.add_stream('mpeg4', rate=2)
    (h, w) = frames[0].shape[:-1]

    stream.width = w
    stream.height = h
    stream.pix_fmt = 'yuv420p'
    
    for f in frames:
        frame = av.VideoFrame.from_ndarray(f, format='rgb24')
        for packet in stream.encode(frame): container.mux(packet)
    for packet in stream.encode(): container.mux(packet)
    container.close()

In [18]:
def main():

    'Load frames'
    src_path = 'src/robot_parcours_1.avi'
    frames = load_frames(src_path)

    n = len(frames)
    print('(main) Loaded {} frames'.format(n))

    'Equalize frames'
    eq_frames = [equalize_intensity(f) for f in frames]
    first_frame = eq_frames[0]

    'Threshold frames to find the _red_ arrow'
    arrows = [rgb_threshold(f, ((180, 256), (-1,190), (-1,190))) for f in eq_frames]

    'Identify coordinates of the arrows using region growing'
    'Then create black frame with colored rectangle surrounding the arrow'
    arrow_frames = []

    'Extract from the equalized first image the area beneath vehicle'
    'This snippet will possibly be evaluated by NN'
    symbol_candidates = []

    for i, a in enumerate(arrows):

        arrow_regions = locate_rgb_regions(a, 10, 1000, 3000, 250, 256)
        assert (len(arrow_regions)==1), 'Found no arrow in frame {}'.format(i)

        'Draw surrounding frame'
        max_x, max_y, min_x, min_y = locate_frame(arrow_regions[0])
        frame = draw_bw_frame(a.shape[:-1], max_x, max_y, min_x, min_y)
        arrow_frames.append(gray_to_color(frame, 'green'))

        print('(main) Identified arrow no. {}/{}'.format(i, n), end='\r')

        'Generate a 32x32 image from the area underneath the arrow'
        symbol_candidates.append(extract_candidate_frame(first_frame, max_x, max_y, min_x, min_y))

    print('(main) Successfully identified all arrows!')

    'Add the surrounding rectangles to the original footage'
    original_and_arrow_trace = [overlap_frames(f, a) for f, a in zip(frames, arrow_frames)]

    'Output'
    output_frames(original_and_arrow_trace, 'out/test.avi')

In [19]:
main()

(main) Loaded 42 frames
(main) Successfully identified all arrows!
