In [1]:
import numpy as np
import matplotlib.pyplot as plt
import imageio
import cv2
from scipy import ndimage
from scipy import stats
import os
import sys
sys.path.append('..')
import cvutils as cvu
import math
from scipy import interpolate
import time
from scipy.interpolate import griddata
from numba import jit

In [2]:
plt.rcParams['figure.figsize'] = (10, 10)

# Lukas Kanade

### Gaussian Derivatives
We need the derivatives to find the direction of the optical flow. [See this notebook for more details](../local-filters/gaussian.ipynb). We use `cvu` library to access the functions we need

In [3]:
def warp_image(im, u, v, xx, yy):
    xx = xx + u
    yy = yy + v
    return interp2(im, xx, yy)

def interp2(im, xx, yy):
    h, w = im.shape
    interp_spline = interpolate.RectBivariateSpline(np.arange(h), np.arange(w), im)
    ww, hh = xx.shape
    print(ww, hh)
    x = np.linspace(np.min(xx), np.max(xx) + 1, hh)
    y = np.linspace(np.min(yy), np.max(yy) + 1, ww)
    res = interp_spline(y, x)
    return res

def draw_rectangle(im, rect):
    (x1, y1), (x2, y2) = rect
    res = im.copy()
    res[y1, np.arange(x1, (x2+1)), :] = [0, 0, 255]
    res[y2, np.arange(x1, (x2+1)), :] = [0, 0, 255]
    res[np.arange(y1, (y2+1)), x1, :] = [0, 0, 255]
    res[np.arange(y1, (y2+1)), x2, :] = [0, 0, 255]
    return res

# Lucas Kanade

In [4]:
def lk_small(im1, im2, Ix_interp, Iy_interp, It_interp, window_size=5):
    height, width = im1.shape
    u = np.zeros_like(im1)
    v = np.zeros_like(im2)
    ws_half = np.int(window_size / 2)
    for i in range(width):
        row_indices = np.arange(max(i - ws_half, 0), min(i + ws_half + 1, width))
        for j in range(height):
            column_indices = np.arange(max(j - ws_half, 0), min(j + ws_half + 1, height))
            Ix_win = Ix_interp(column_indices, row_indices).reshape((-1, 1))
            Iy_win = Iy_interp(column_indices, row_indices).reshape((-1, 1))
            It_win = It_interp(column_indices, row_indices).reshape((-1))
            A = np.hstack((Ix_win, Iy_win))
            b = -It_win
            
            d = np.linalg.lstsq(A, b)[0]
            u[j, i] = d[0]
            v[j, i] = d[1]
            
    return u, v


def lucas_kanade(im1, im2, window_size=5, sigma = 1):
    height, width = im1.shape
    y, x = np.arange(height), np.arange(width)
    
    u = np.zeros_like(im1)
    v = np.zeros_like(im2)
    Ix, Iy = cvu.gauss_deriv_filter(im1, sigma)
    Ix_interp = interpolate.RectBivariateSpline(y, x, Ix)
    Iy_interp = interpolate.RectBivariateSpline(y, x, Iy)
    
    It = cvu.gauss_smooth_filter(im2, sigma) - cvu.gauss_smooth_filter(im1, sigma)
    It_interp = interpolate.RectBivariateSpline(y, x, It)
    
#     for i in range(width):
#         row_indices = np.arange(max(i - ws_half, 0), min(i + ws_half + 1, width))
#         for j in range(height):
#             column_indices = np.arange(max(j - ws_half, 0), min(j + ws_half + 1, height))
#             Ix_win = Ix_interp(column_indices, row_indices).reshape((-1, 1))
#             Iy_win = Iy_interp(column_indices, row_indices).reshape((-1, 1))
#             It_win = It_interp(column_indices, row_indices).reshape((-1))
#             A = np.hstack((Ix_win, Iy_win))
#             b = -It_win
            
#             d = np.linalg.lstsq(A, b)[0]
#             u[j, i] = d[0]
#             v[j, i] = d[1]
    
#     return u, v
    return lk_small(im1, im2, Ix_interp, Iy_interp, It_interp, window_size)

In [5]:
im1ex = cv2.imread("./sequence-01/0150.jpg", cv2.IMREAD_GRAYSCALE).astype(np.float32)
im2ex = cv2.imread("./sequence-01/0151.jpg", cv2.IMREAD_GRAYSCALE).astype(np.float32)
%time lucas_kanade(im1ex, im2ex)

  app.launch_new_instance()


CPU times: user 23.9 s, sys: 12.1 ms, total: 23.9 s
Wall time: 24 s


(array([[-0.87729377,  0.13610645,  0.20062475, ...,  0.03568921,
          0.03646551,  0.03832131],
        [-0.3238559 ,  0.06859834,  0.18568963, ...,  0.02425791,
          0.02681748,  0.03288487],
        [ 0.26725832,  0.2695329 ,  0.19302419, ...,  0.0373462 ,
          0.05668979,  0.09157817],
        ...,
        [ 1.4720231 ,  1.3709668 ,  1.3451769 , ..., -0.1463011 ,
         -0.11070872, -0.00563199],
        [ 0.37461257,  0.29426238,  0.19594777, ..., -0.07942375,
         -0.10088883, -0.07044145],
        [ 0.23740785,  0.18414333, -0.09971307, ..., -0.05329672,
         -0.07812724, -0.06551766]], dtype=float32),
 array([[ 0.1150495 ,  0.16344965,  0.16919759, ..., -0.15108292,
         -0.08819071,  0.08358338],
        [ 0.14495553,  0.17498465,  0.18833737, ...,  0.056097  ,
          0.10883509,  0.22639754],
        [ 0.15853685,  0.18267709,  0.19042805, ...,  0.0838266 ,
          0.02128884, -0.1565484 ],
        ...,
        [ 0.06044991,  0.05992878,  0.0

In [6]:
def lk_apply(in_stream, out_stream):
    with in_stream as stream, out_stream as ostream:
        for i in range(100):
            # Ignoring the first 100 frames since nothing happens in them
            next(stream)
        _, frame = next(stream)
        prev_frame = np.mean(frame, 2)
#         prev_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        for _, frame in stream:
            curr_frame = np.mean(frame, 2)
#             curr_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            
            u, v = lucas_kanade(prev_frame, curr_frame)
            prev_frame = curr_frame
            
            u_c = cv2.cvtColor(u.astype('uint8'), cv2.COLOR_GRAY2BGR)
            v_c = cv2.cvtColor(v.astype('uint8'), cv2.COLOR_GRAY2BGR)
            ostream([frame, u_c, v_c], (1, 3))

In [9]:
lk_apply(cvu.dir_stream("./sequence-01", wait_key=50, scale=0.2), cvu.window_stream())

  app.launch_new_instance()


# Lucas Kanade for Single Object Tracking

In [None]:
def lucas_kanade_template_tracking(prev_template, curr_template):
    sigma = 1
    Ix, Iy = gaussderiv(prev_template, sigma)
    It = gaussianfilter(curr_template, sigma) - gaussianfilter(prev_template, sigma)
    Ixy, Ixx, Iyy = np.sum(np.multiply(Ix, Iy)), np.sum(np.multiply(Ix, Ix)), np.sum(np.multiply(Iy, Iy))
    Ixt, Iyt = np.sum(np.multiply(Ix, It)), np.sum(np.multiply(Iy, It))
    A = np.array([[Ixx, Ixy], [Ixy, Iyy]])
    b = np.array([[Ixt], [Iyt]]) * (-1)
    
    d = np.linalg.lstsq(A, b)[0]
    
    return d

In [None]:
def lucas_kanade_tracking(in_stream, out_stream, win2track):
    top_left = win2track[0]
    bottom_right = win2track[1]
    
    dims = bottom_right - top_left
    # half_dims = (dims/2).astype(np.int)
    # center = (np.round(dims/2 + bottom_left)).astype(np.int)
    
    # Template window relative pixel positions
    # gridxx, gridyy = np.meshgrid(np.arange(-half_dims[0],half_dims[0] + 1),np.arange(-half_dims[1],half_dims[1] + 1))
    
    # Template window absolute pixel positions
    # xx = gridxx + center[0]
    # yy = gridyy + center[1]
    xx = np.arange(top_left[0], bottom_right[0] + 1)
    yy = np.arange(top_left[1], bottom_right[1] + 1)

    with in_stream as stream, out_stream as ostream:
        ret, prev_frame = next(stream)
        height, width, _ = prev_frame.shape
        x, y = np.arange(width), np.arange(height)
        prev_im = np.mean(prev_frame, axis=2)
        prev_interp = interpolate.RectBivariateSpline(y, x, prev_im)
        for ret, frame in stream:
            curr_im = np.mean(frame, axis=2)
            curr_interp = interpolate.RectBivariateSpline(y, x, curr_im)
            prev_template = prev_interp(yy, xx)
            curr_template = curr_interp(yy, xx)
            
            u, v = lucas_kanade_template_tracking(prev_template, curr_template)
            total_u, total_v = u, v
            
            # Iteratively computing optical flow (LK Refinement steps)
            # here: no convergence criterion, we always run 10 iterations
            for i in range(10):
                xx_u = xx + total_u
                yy_v = yy + total_v
                template_w = prev_interp(yy_v, xx_u)
                u, v = lucas_kanade_template_tracking(template_w, curr_template)
                
                # Hint: When alternatingly warping and estimating the flow,
                # small errors in either of these steps will propagate through 
                # the rest of the process. In order to prevent warping errors from 
                # propagating, you should not update your warped version of the 
                # image in each step by iteratively warping it with each flow field
                # you compute. Instead, you should first add your flow field to the
                # accumulated flow, and then apply the accumulated flow to the 
                # original image. This way, flow estimation will be more stable.
                total_u = total_u + u;
                total_v = total_v + v;
                
            # Adding flow to pixels in the win2track
            xx = xx + total_u;
            yy = yy + total_v;
            
            # Computing the template postion in next image
            # min(min()) gives minimal x/y coordinate + template width / height
            x1 = np.int(np.min(xx))
            y1 = np.int(np.min(yy))
            x2 = x1 + dims[0]
            y2 = y1 + dims[1]
            #rect = np.array([[x1, y1], [x2, y2]]);
            
            color = (0, 0, 255) 
            res_im = cv2.rectangle(frame, (x1, y1), (x2, y2), color=color, thickness=2) 
            #res_im = draw_rectangle(frame, rect)
            
            prev_interp = curr_interp
            
            ostream([res_im], [1, 1])

In [None]:
win2track = np.array([
    [294,251],
    [328,279]
])
lucas_kanade_tracking(cvu.dir_stream("./sequence-02", wait_key=1), cvu.window_stream(), win2track)

In [None]:
def lucas_kanade_tracking2(in_stream, out_stream, win2track):
    top_left = win2track[0]
    bottom_right = win2track[1]
    
    dims = bottom_right - top_left
    # half_dims = (dims/2).astype(np.int)
    # center = (np.round(dims/2 + bottom_left)).astype(np.int)
    
    # Template window relative pixel positions
    # gridxx, gridyy = np.meshgrid(np.arange(-half_dims[0],half_dims[0] + 1),np.arange(-half_dims[1],half_dims[1] + 1))
    
    # Template window absolute pixel positions
    # xx = gridxx + center[0]
    # yy = gridyy + center[1]
    xx = np.arange(top_left[0], bottom_right[0] + 1)
    yy = np.arange(top_left[1], bottom_right[1] + 1)
    XX, YY = np.meshgrid(xx, yy)

    with in_stream as stream, out_stream as ostream:
        ret, prev_frame = next(stream)
        height, width, _ = prev_frame.shape
        x, y = np.arange(width), np.arange(height)
        X, Y = np.meshgrid(x, y)
        points = np.column_stack([X.reshape((-1, 1)), Y.reshape((-1, 1))])
        prev_im = np.mean(prev_frame, axis=2)
        prev_im_flat = prev_im.reshape((-1,))
        for ret, frame in stream:
            curr_im = np.mean(frame, axis=2)
            curr_im_flat = curr_im.reshape((-1, ))
            prev_template = griddata(points, prev_im_flat, (XX, YY))
            curr_template = griddata(points, curr_im_flat, (XX, YY))
            
            u, v = lucas_kanade_template_tracking(prev_template, curr_template)
            total_u, total_v = u, v
            
            # Iteratively computing optical flow (LK Refinement steps)
            # here: no convergence criterion, we always run 10 iterations
            for i in range(10):
                XX_u = XX + total_u
                YY_v = XX + total_v
                template_w = griddata(points, prev_im_flat, (XX_u, YY_v))
                u, v = lucas_kanade_template_tracking(template_w, curr_template)
                
                # Hint: When alternatingly warping and estimating the flow,
                # small errors in either of these steps will propagate through 
                # the rest of the process. In order to prevent warping errors from 
                # propagating, you should not update your warped version of the 
                # image in each step by iteratively warping it with each flow field
                # you compute. Instead, you should first add your flow field to the
                # accumulated flow, and then apply the accumulated flow to the 
                # original image. This way, flow estimation will be more stable.
                total_u = total_u + u;
                total_v = total_v + v;
                
            # Adding flow to pixels in the win2track
            XX = XX + total_u;
            YY = YY + total_v;
            
            # Computing the template postion in next image
            # min(min()) gives minimal x/y coordinate + template width / height
            x1 = np.int(np.min(XX))
            y1 = np.int(np.min(YY))
            x2 = x1 + dims[0]
            y2 = y1 + dims[1]
            #rect = np.array([[x1, y1], [x2, y2]]);
            
            color = (0, 0, 255) 
            res_im = cv2.rectangle(frame, (x1, y1), (x2, y2), color=color, thickness=2) 
            #res_im = draw_rectangle(frame, rect)
            
            prev_im_flat = curr_im_flat
            
            ostream([res_im], [1, 1])

In [None]:
win2track = np.array([
    [294,251],
    [328,279]
])
lucas_kanade_tracking2(cvu.dir_stream("./sequence-02", wait_key=1), cvu.window_stream(), win2track)

In [None]:
from scipy import interpolate
x = np.arange(-5.01, 5.01, 0.05)
y = np.arange(-5.01, 5.01, 0.05)
xx, yy = np.meshgrid(x, y)
z = np.sin(xx**2+yy**2)
f = interpolate.interp2d(x, y, z, kind='cubic')

import matplotlib.pyplot as plt
xnew = np.arange(-5.01, 5.01, 1e-2)
ynew = np.arange(-5.01, 5.01, 1e-2)
znew = f(xnew, ynew)
plt.plot(x, z[0, :], 'ro-', xnew, znew[0, :], 'b-')
plt.show()

In [None]:
A = np.array([
    [1, 2, 3, 4],
    [5, 6, 7, 8],
    [9, 10, 11, 12],
    [13, 14, 15, 16],
    [17, 18, 19, 20]
])
x = np.arange(4)
y = np.arange(5)

In [None]:
X, Y = np.meshgrid(x, y)
points = np.column_stack([Y.reshape((-1, 1)), X.reshape((-1, 1))])
values = A.reshape((-1,))

print(points.shape)
print(values.shape)

In [None]:
griddata(points, values, ([[4, 1], [2, 3]], [[3, 2], [0, 0.5]]))

In [None]:
img = cv2.imread('./sequence-02/0001.jpg')
rows,cols,ch = img.shape
print(rows, cols)
pts1 = np.float32([[294,251],[328,251],[328,279]])
pts2 = np.float32([[0,0],[640,0],[640,480]])
M = cv2.getAffineTransform(pts1,pts2)
dst = cv2.warpAffine(img,M,(cols,rows))
print(dst.shape)
plt.subplot(121),plt.imshow(img),plt.title('Input')
plt.subplot(122),plt.imshow(dst),plt.title('Output')
plt.show()