# UE Computer Vision | Project : Visual tracking of video objects
# Baseline models

In [41]:
import numpy as np
import cv2
from skimage import io
import os, glob
from pathlib import Path
from scipy.interpolate import interp2d
from scipy.ndimage import convolve
from skimage import filters
from skimage.color import rgb2gray
import matplotlib.pyplot as plt
from tqdm import tqdm

In [22]:
##Params
sequence_name = "swan"
gaussian_filter_sigma = 1.5

method_name = "HS"
HS_lambda = .01

#method_name = "LK"
#method_name = "Fa"

In [14]:
#get number frames
files = glob.glob("../data/sequences-train/"+sequence_name+"-*.png")
im_begin = 1
im_end = len(files)

## Util functions

In [15]:
def estimate_derivatives(img1, img2):
    kernelX = np.array([[-1, -1],[1, 1]])  # kernel for computing d/dx
    kernelY = np.array([[-1, 1],[ -1, 1]]) # kernel for computing d/dy
    kernelT = np.ones((2,2))*.25

    fx = convolve(img1+img2/2, kernelX)
    fy = convolve(img1+img2/2, kernelY)
    ft = convolve(img1 - img2, kernelT)
    return fx, fy, ft

In [16]:
def propagate_mask(flow, img_current, mask_begin):
    new_mask = np.zeros(shape=img_current.shape[:2])
    for x in range(img_current.shape[0]):
        for y in range(img_current.shape[1]):
            x_, y_ = np.rint(x+flow[x,y,1]).astype(int), np.rint(y+flow[x,y,0]).astype(int)
            if (x_>=0) and (x_<img_current.shape[0]) and (y_>=0) and (y_<img_current.shape[1]):
                if mask_begin[x_,y_] > 0:
                    new_mask[x,y] = 255
    return new_mask.astype(np.uint8)

In [33]:
def flow_concatenation(unary_flow, to_ref_flow):
    flow = np.zeros((unary_flow.shape[0],unary_flow.shape[1],2), dtype=np.float64)
    x0 = np.arange(0, unary_flow.shape[0])
    y0 = np.arange(0, unary_flow.shape[1])
    xx, yy = np.meshgrid(x0, y0)
    z = to_ref_flow[xx,yy,1]
    fx = interp2d(x0,y0,z,kind='cubic')
    z = to_ref_flow[xx,yy,0]
    fy = interp2d(x0,y0,z,kind='cubic')
    for x in range(unary_flow.shape[0]):
        for y in range(unary_flow.shape[1]):
            flow_x = fx(x+unary_flow[x,y,1], y+unary_flow[x,y,0])
            flow_y = fy(x+unary_flow[x,y,1], y+unary_flow[x,y,0])
            flow[x,y,1] = unary_flow[x,y,1] + flow_x
            flow[x,y,0] = unary_flow[x,y,0] + flow_y
    return flow

## Flow functions for HS, LK and Fa

In [17]:
def lucas_kanade(img1, img2, window_size=21, tau=1e-2):
    w = int(window_size/2)
    fx, fy, ft = estimate_derivatives(img1, img2)
    u = np.zeros(img1.shape)
    v = np.zeros(img1.shape)
    for i in range(w, img1.shape[0]-w):
        for j in range(w, img1.shape[1]-w):
            Ix = fx[i-w:i+w+1, j-w:j+w+1].flatten()
            Iy = fy[i-w:i+w+1, j-w:j+w+1].flatten()
            It = ft[i-w:i+w+1, j-w:j+w+1].flatten()
            b = np.reshape(It, (It.shape[0],1))
            A = np.vstack((Ix, Iy)).T
            if np.min(abs(np.linalg.eigvals(np.matmul(A.T, A)))) >= tau:
                nu = np.matmul(np.linalg.pinv(A), b)
                u[i,j]=nu[0]
                v[i,j]=nu[1]
    flow = np.zeros((u.shape[0],u.shape[1],2))
    flow[:,:,0], flow[:,:,1]  = -v, -u
    return flow

In [18]:
def farneback(img1, img2):
    flow = np.zeros((img1.shape[0],img1.shape[1],2))
    img1 = (img1*255.).astype(np.uint8)
    img2 = (img2*255.).astype(np.uint8)
    return cv2.calcOpticalFlowFarneback(img1, img2, flow, pyr_scale=0.5, levels=3, winsize=7, iterations=3, poly_n=5, poly_sigma=1.2, flags=0)

In [19]:
def horn_schunck(img1, img2, lambda_=HS_lambda, Niter=200):
    kernelA = np.array([[1./12,1./6,1./12],[1./6,0,1./6], [1./12,1./6,1./12]], float)
    flow = np.zeros((img1.shape[0], img1.shape[1], 2))
    fx, fy, ft = estimate_derivatives(img1, img2)

    for it in range(Niter):
        avg_flow_u = convolve(flow[:,:,1], kernelA)
        avg_flow_v = convolve(flow[:,:,0], kernelA)

        flow[:,:,1] = avg_flow_u - fx * (fx*avg_flow_u + fy*avg_flow_v + ft)/(fx*fx + fy*fy + lambda_)
        flow[:,:,0] = avg_flow_v - fy * (fx*avg_flow_u + fy*avg_flow_v + ft)/(fx*fx + fy*fy + lambda_)

    return flow

In [20]:
def compute_flow(img1, img2):
    if method_name == "HS":
        return horn_schunck(img1, img2)
    elif method_name == "LK":
        return lucas_kanade(img1, img2)
    else:
        return farneback(img1, img2)

## Direct integration

In [25]:
#create directory to save files
save_dir = "../data/mask-outputs/direct-"+method_name+"_"+sequence_name
Path(save_dir).mkdir(parents=True, exist_ok=True)

In [None]:
img_begin = io.imread('../data/sequences-train/'+sequence_name+'-%0*d.bmp'%(3,im_begin)).astype(np.float32)/255.
mask_begin = io.imread('../data/sequences-train/'+sequence_name+'-%0*d.png'%(3,im_begin))
img_begin = filters.gaussian(rgb2gray(img_begin), gaussian_filter_sigma)

print("Processing sequence " + sequence_name + " with direct propagation using " + method_name)
for im in tqdm(range(im_begin+1,im_end+1)):
    img_current_full = io.imread('../data/sequences-train/'+sequence_name+'-%0*d.bmp'%(3,im)).astype(np.float32)/255.
    img_current = filters.gaussian(rgb2gray(img_current_full), gaussian_filter_sigma)

    flow = compute_flow(img_begin, img_current)
    propagation_mask = propagate_mask(flow, img_current, mask_begin)

    plt.imsave(save_dir+'/%0*d.png'%(3,im), propagation_mask, cmap="gray")

    plt.show()

## Sequential integration

In [34]:
#create directory to save files
save_dir = "../data/mask-outputs/seq-"+method_name+"_"+sequence_name
Path(save_dir).mkdir(parents=True, exist_ok=True)

In [44]:
from skimage import filters
from skimage.color import rgb2gray

img_begin = io.imread('../data/sequences-train/'+sequence_name+'-%0*d.bmp'%(3,im_begin)).astype(np.float32)/255.
mask_begin = io.imread('../data/sequences-train/'+sequence_name+'-%0*d.png'%(3,im_begin))
img_begin = filters.gaussian(rgb2gray(img_begin), gaussian_filter_sigma)

img_previous = img_begin
flow_shape = (img_begin.shape[0], img_begin.shape[1], 2)
to_ref_flow = np.zeros(flow_shape)

print("Processing sequence " + sequence_name + " with sequential propagation using " + method_name)
for im in tqdm(range(im_begin+1,im_end+1)):
    img_current_full = io.imread('../data/sequences-train/'+sequence_name+'-%0*d.bmp'%(3,im)).astype(np.float32)/255.
    img_current = filters.gaussian(rgb2gray(img_current_full), gaussian_filter_sigma)

    flow = compute_flow(img_previous, img_current)
    img_previous = img_current

    to_ref_flow = flow_concatenation(flow, to_ref_flow)
    propagation_mask = propagate_mask(to_ref_flow, img_current, mask_begin)

    plt.imsave(save_dir+'/%0*d.png'%(3,im), propagation_mask, cmap="gray")

    plt.show()

Processing sequence swan with sequential propagation using HS


  0%|          | 0/49 [00:05<?, ?it/s]


KeyboardInterrupt: 