In [None]:
import cv2 as cv
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import time

sns.set_theme()

In [None]:
def normalize(data):
    return (data - np.min(data)) / (np.max(data) - np.min(data))

def plot_vs(u, v, cmap='inferno', title=''):
    
    v = [
        (u, 'u'),
        (v, 'v'),
        (np.sqrt(u**2 + v**2), r"$u^{2} * v^{2}$"), 
        (np.abs(np.arctan(u / (v + 1e-12))), r"$arctan(u/v)$")
    ]
    
    fig = plt.figure(figsize=(5.5*4, 5.1))
    fig.suptitle(title)
    
    i = 1
    for e, t in v:        
        ax = fig.add_subplot(1, 4, i)
        ax.imshow(e, cmap=cmap)
        ax.set_title(t)
        ax.axis('off')
        i += 1

    plt.tight_layout()
    plt.show()

In [None]:
def plot_uv(u, v, im, quivstep=11, scale=4, head_width=0.8, head_length=1, title='', ths=0.8):
    h, w = im.shape[:2]
    
    fig = plt.figure(figsize=(10, 5))
    fig.suptitle(title)
    
    ax1 = fig.add_subplot(1, 2, 1)
    ax2 = fig.add_subplot(1, 2, 2)
    
    ax1.imshow(im, cmap='gray')
    ax2.imshow(np.zeros_like(im, dtype=float), cmap='gray')
       
    for i in range(quivstep //2 , h + quivstep // 2, quivstep):
        for j in range(quivstep//2, w + quivstep // 2, quivstep):
            
            u_mean = np.mean(u[i:i + quivstep + 1, j:j + quivstep + 1]) * 10 * scale
            v_mean = np.mean(v[i:i + quivstep + 1, j:j + quivstep + 1]) * 10 * scale
            
            V = np.sqrt(u_mean ** 2 + v_mean ** 2) 
            
            if V < ths:
                continue
    
            ax1.arrow(
                j, i, v_mean, u_mean, color='red',
                head_width=head_width, head_length=head_length
            )
            ax2.arrow(
                j, i, v_mean, u_mean, color='white',
                head_width=head_width, head_length=head_length
            )

    ax1.axis('off')
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()

# Flujo Óptico

### Lucas Kanade: Mínimos cuadrados

In [None]:
def lk_pinv(ix, iy, it):
    
    _A = np.zeros((2, 2))
    
    _A[0, 0] = np.sum(ix ** 2)
    _A[1, 0] = _A[0, 1] = np.sum(ix * iy)
    _A[1, 1] = np.sum(iy ** 2)
    
    _b = np.zeros((2, 1))
    
    _b[0, 0] = - np.sum(ix * it)
    _b[1, 0] = - np.sum(iy * it)
    
    _V = np.linalg.pinv(_A) @ _b
     
    return _V

### Lucas Kanade: Directo

In [None]:
def lk_direct(ix, iy, it):
    
    ix2  = np.sum(ix * ix)
    iy2  = np.sum(iy * iy)
    ixiy = np.sum(ix * iy)

    ixit = np.sum(ix * it)
    iyit = np.sum(iy * it)

    d = ix2 * iy2 - ixiy ** 2 + 1e-9

    u = (-iy2 * ixit + ixiy * iyit) / d 
    v = ( ixiy * ixit - ix2 * iyit) / d 
 
    return [u, v]

In [None]:
def gradient_x(im1, im2, ksize=3):
    return (cv.Sobel(im1, cv.CV_32F, 0, 1, ksize=ksize) + cv.Sobel(im2, cv.CV_32F, 0, 1, ksize=ksize)) / 2

def gradient_y(im1, im2, ksize=3):
    return (cv.Sobel(im1, cv.CV_32F, 1, 0, ksize=ksize) + cv.Sobel(im2, cv.CV_32F, 1, 0, ksize=ksize)) / 2

def gradient_t(im1, im2, ksize=3):
    return cv.filter2D(im2, cv.CV_32F, np.ones((ksize, ksize))) - cv.filter2D(im1, cv.CV_32F, np.ones((ksize, ksize)))

In [None]:
def optical_flow_lk(im_prev, im_next, window_size=3, funct=lk_pinv):
    pad = window_size // 2
    
    # Normalize images between (0, 1)
    im0, im1 = im_prev.copy(), im_next.copy()

    # Pad images
    im0, im1 = np.pad(im0, pad), np.pad(im1, pad)

    # Compute Ix, Iy, It
    ix = gradient_x(im0, im1)
    iy = gradient_y(im0, im1)
    it = gradient_t(im0, im1)

    # Allocate space for V = (u, v)
    u, v = [np.zeros_like(ix) for _ in range(2)]
    h, w = ix.shape

    for row in range(pad, h - pad):
        for col in range(pad, w - pad):
            # Calcule slice indexes
            ri, rj = row - pad, row + pad + 1
            ci, cj = col - pad, col + pad + 1
            
            # Slice and flat the matrixes
            _ix = ix[ri:rj, ci:cj]
            _iy = iy[ri:rj, ci:cj]
            _it = it[ri:rj, ci:cj]
            
            # Compute optical flow
            u[row, col], v[row, col] = funct(_ix, _iy, _it)
    
    # Return non-padded (u, v) images.
    return u[pad:h - pad, pad:w - pad], v[pad:h - pad, pad:w - pad]

### Análisis de tiempos y comparación de resultados

In [None]:
_im_prev = cv.imread('images/fr00100.png', 0).astype(np.int16)
_im_next = cv.imread('images/fr00101.png', 0).astype(np.int16)

In [None]:
kernel_sizes = [3, 5, 7, 31, 71, 121, 253]
lk_pinv_times, lk_direct_times = [], []
lk_pinv_results, lk_direct_results = [], []

In [None]:
for k_size in kernel_sizes:
    t = time.time()
    u, v = optical_flow_lk(
        _im_prev, _im_next, window_size=k_size, funct=lk_pinv)
    tf = time.time() - t
    
    lk_pinv_times.append(tf)
    lk_pinv_results.append((u, v))

    t = time.time()
    u, v = optical_flow_lk(
        _im_prev, _im_next, window_size=k_size, funct=lk_direct)
    tf = time.time() - t
    
    lk_direct_times.append(tf)
    lk_direct_results.append((u, v))

In [None]:
x = list(range(len(kernel_sizes)))

y_p = lk_pinv_times
y_d = lk_direct_times

plt.plot(x, y_p, 'o-', label='LK Pinv')
plt.plot(x, y_d, 'o-', label='LK Direct')

plt.ylabel('Tiempo (segs)')
plt.xlabel('Tamaño de la ventana de integración (px)')

plt.xticks(x, kernel_sizes)

plt.tight_layout()

plt.legend()
plt.savefig('lk.svg')
plt.show()

#### Análisis del flujo óptico con diferentes tamaños de ventana de integración

In [None]:
quivstep = 20
scale = 1
ths = 4

# Arrow properties
head_width = 5
head_length = 3

for i, k in enumerate(kernel_sizes):
    u, v = lk_pinv_results[i]
    plot_vs(u, v, title=f'LK Pinv {k}x{k}')
    plot_uv(u, v, _im_prev, quivstep, scale, head_width, 
            head_length, title='', ths=ths)    
    u, v = lk_direct_results[i]
    plot_vs(u, v, title=f'LK Direct {k}x{k}')
    plot_uv(u, v, _im_prev, quivstep, scale, head_width, 
            head_length, title='', ths=ths)

## Horn&Schunck

In [None]:
def compute_error(u, v, ix, iy, it, lamb):
    
    ux, uy = np.gradient(u)
    vx, vy = np.gradient(v)
    
    eps_s = ux**2 + uy**2 + vx**2 + vy**2
    eps = (ix * u + iy * v + it) ** 2

    E = np.sum(eps_s + lamb * eps)
    
    return 1 / E


def compute(ix, iy, it, lamb, n_iter, min_error=0.001):
    
    u, v = np.zeros_like(ix), np.zeros_like(iy)
    errors = []
    
    for _ in range(n_iter):
        # Compute means
        u_avg = cv.GaussianBlur(u, (3, 3), 0)
        v_avg = cv.GaussianBlur(v, (3, 3), 0)
        
        # Common part
        n = ix * u_avg + iy * v_avg + it  
        d = lamb ** 2 + ix ** 2 + iy ** 2
        r = n / d
        
        # Update new (u, v) values
        u = u_avg - ix * r
        v = v_avg - iy * r
        
        # Compute error
        errors.append(
            compute_error(u, v, ix, iy, it, lamb)
        )
        
    return u, v, errors
    

def optical_flow_hs(im_prev, im_next, 
                    lamb=0.001, n_iter=8):
    
    # Normalize images between (0, 1)
    im0, im1 = im_prev.copy() / 255., im_next.copy() / 255.
    
    # Compute Ix, Iy, It
    ix = (np.gradient(im0, axis=0) + np.gradient(im1, axis=0)) / 2
    iy = (np.gradient(im0, axis=1) + np.gradient(im1, axis=1)) / 2
    it = im1 - im0
     
    return compute(ix, iy, it, lamb, n_iter)

In [None]:
def plot_errors(errors_hs, lambdas, threshold=1e-3):
    stab_points = []
    
    fig = plt.figure(figsize=(5.5*2, 5.1*4))
    fig.suptitle(f"Horn Schunk Errors")
    
    i = 1
    for errors, lamb in zip(errors_hs, lambdas):        

        cut = len(errors) - 1
        errs = [errors[i-1] - errors[i] 
                for i in range(1, len(errors))]
        
        for j in range(len(errs)):
            if errs[j] < threshold:
                cut = j
                break
        
        ax = fig.add_subplot(4, 2, i)
        ax.plot(list(range(len(errors))), errors, '.-')
        # ax.plot([cut for _ in errors], errors, 'r--')
        
        ax.set_title(f"lambda={lamb}")
        ax.set_xlabel("Iteraciones")
        ax.set_ylabel(r"Error")
        
        i += 1
    
    plt.tight_layout()
    plt.savefig('errors.svg')
    plt.show()
    
    
def draw_optical_flow_hs(
    results_hs, lambdas, 
    quivstep = 20,
    scale = 1,
    ths = 4,
    head_width = 5,
    head_length = 3):

    for i, k in enumerate(lambdas):
        u, v = results_hs[i]
        plot_vs(u, v, title=f'Horn& Schunk, lambda={k}')
        plot_uv(u, v, _im_prev, quivstep, scale, head_width, 
                head_length, title=f'', ths=ths)

####  Análisis del flujo óptico con diferentes $\lambda$

In [None]:
lambdas = [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1]
n_iter = 20

In [None]:
results_hs, errors_hs = [], []
for lamb in lambdas:
    u, v, errors = optical_flow_hs(_im_prev, _im_next, lamb, n_iter)
    results_hs.append((u, v))
    errors_hs.append(errors)

In [None]:
draw_optical_flow_hs(results_hs, lambdas)

# Pruebas sobre otras imágenes

In [None]:
import os

In [None]:
def compute_optical_flow(im0, im1, im_to_show):
    u_lk, v_lk = optical_flow_lk(im0, im1, window_size=15, funct=lk_direct)
    u_hs, v_hs, _ = optical_flow_hs(im0, im1, lamb=0.001, n_iter=20)
        
    plot_vs(u_lk, v_lk, title=f'Lucas Kanade')
    plot_uv(u_lk, v_lk, im_to_show, quivstep=15, scale=1, 
            head_width=head_width, head_length=head_length, title='', ths=3)
    
    plot_vs(u_hs, v_hs, title=f'Horn&Schunk')
    plot_uv(u_hs, v_hs, im_to_show, quivstep=15, scale=1, 
            head_width=head_width, head_length=head_length, ths=3)

### Sphere

In [None]:
path = './images/sphere/'

_im_prev_rgb, _im_next_rgb = [cv.imread(path + im) 
                              for im in os.listdir(path)[:2]]
compute_optical_flow(
    cv.cvtColor(_im_prev_rgb, cv.COLOR_BGR2GRAY), 
    cv.cvtColor(_im_next_rgb, cv.COLOR_BGR2GRAY), 
    cv.cvtColor(_im_prev_rgb, cv.COLOR_BGR2RGB))

## Cubo de Rubic

In [None]:
path = './images/rubic/'

_im_prev_rgb, _im_next_rgb = [cv.imread(path + im) 
                              for im in os.listdir(path)[:2]]
compute_optical_flow(
    cv.cvtColor(_im_prev_rgb, cv.COLOR_BGR2GRAY), 
    cv.cvtColor(_im_next_rgb, cv.COLOR_BGR2GRAY), 
    cv.cvtColor(_im_prev_rgb, cv.COLOR_BGR2RGB))

### Office

In [None]:
path = './images/office/'

_im_prev_rgb, _im_next_rgb = [cv.imread(path + im) 
                              for im in os.listdir(path)[:2]]
compute_optical_flow(
    cv.cvtColor(_im_prev_rgb, cv.COLOR_BGR2GRAY), 
    cv.cvtColor(_im_next_rgb, cv.COLOR_BGR2GRAY), 
    cv.cvtColor(_im_prev_rgb, cv.COLOR_BGR2RGB))