**Student Name**: Uğur Ali Kaplan  
**Student ID**: 150170042

In [1]:
import moviepy.video.io.VideoFileClip as mpy
import moviepy.editor as mpyeditor
import cv2

import numpy as np
import matplotlib.pyplot as plt

from copy import deepcopy
from sklearn.preprocessing import normalize
from sklearn.metrics import mean_squared_error

## Part 1

In [2]:
def optical_flow(curr_frame, next_frame, center_pt, window_size):
    """
    Optical flow implementation with lucas-kanade.
    """
    assert window_size % 2 == 1
    assert window_size >= 3
    half = window_size // 2
    
    # Convert the frames into grayscale
    curr_img, next_img = deepcopy(curr_frame), deepcopy(next_frame)
    curr_gray, next_gray = cv2.cvtColor(curr_img, cv2.COLOR_BGR2GRAY), cv2.cvtColor(next_img, cv2.COLOR_BGR2GRAY)
    curr_gray, next_gray = np.float32(curr_gray), np.float32(next_gray)
    
    # Create the windows with the given window size
    curr_window = curr_gray[center_pt[1]-half:center_pt[1]+half+1, center_pt[0]-half:center_pt[0]+half+1]
    next_window = next_gray[center_pt[1]-half:center_pt[1]+half+1, center_pt[0]-half:center_pt[0]+half+1]
    
    larger_window = curr_gray[center_pt[1]-half-1:center_pt[1]+half+2, center_pt[0]-half-1:center_pt[0]+half+2]
    
    # Apply Gaussian Blur on the windows
    curr_window = cv2.GaussianBlur(curr_window, (window_size, window_size), 0)
    next_window = cv2.GaussianBlur(next_window, (window_size, window_size), 0)
    larger_window = cv2.GaussianBlur(larger_window, (window_size+2, window_size+2), 0)
    
    # Calculate gradients
    I_x = (larger_window[1:-1, 1:-1] - larger_window[1:-1, 0:-2])
    I_y = (larger_window[1:-1,1:-1] - larger_window[0:-2, 1:-1])
    I_t = next_window - curr_window
    
    # Solve the overdetermined system (close form least squares solution)
    sol = np.matmul(np.linalg.pinv(np.array([[np.sum(I_x**2), np.sum(I_x*I_y)], [np.sum(I_x*I_y), np.sum(I_y**2)]])), -1 * np.array([[np.sum(I_x * I_t)], [np.sum(I_y * I_t)]]))
    return sol

In [3]:
biped_vid = mpy.VideoFileClip("biped_1.avi")
frame_count = biped_vid.reader.nframes
video_fps = biped_vid.fps

walker_frame = list()
for i in range(frame_count):
    walker_frame.append(biped_vid.get_frame(i*1.0/video_fps))

vectored_images = list()

# A point on the hand
coor_0, coor_1 = 400, 330

for i in range(frame_count - 1):
    # Window size 7
    ans = optical_flow(walker_frame[i], walker_frame[i+1], (coor_0, coor_1), 7)
    
    # Make the vector length 30 so OF vector looks nice
    draw_vec = (normalize(ans, axis=0) * 30).astype(np.int)
    cp_image = deepcopy(walker_frame[i])
    vectored_images.append(cv2.arrowedLine(cp_image, (coor_0, coor_1), (coor_0 + draw_vec[0], coor_1 + draw_vec[1]), 255, thickness=2))
    
    # Clip the solution for better tracking
    ans = np.clip(ans, -2, +2)
    
    # Update the point
    coor_0 = int(np.round(coor_0 + ans[0])[0])
    coor_1 = int(np.round(coor_1 + ans[1])[0])
            
clip = mpyeditor.ImageSequenceClip(vectored_images, fps=video_fps)
clip.write_videofile("video_1.mp4", codec="libx264")

t:   5%|▌         | 8/153 [00:00<00:01, 74.36it/s, now=None]

Moviepy - Building video video_1.mp4.
Moviepy - Writing video video_1.mp4



                                                              

Moviepy - Done !
Moviepy - video ready video_1.mp4


## Part 2

In [4]:
def fill_whites(img):
    """
    Source: https://stackoverflow.com/a/51348091
    """
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    ret, thresh = cv2.threshold(gray, 240, 255, cv2.THRESH_BINARY)
    img[thresh == 255] = 0
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    erosion = cv2.erode(img, kernel, iterations = 1)
    return erosion

In [5]:
biped_vid = mpy.VideoFileClip("biped_2.avi")
frame_count = biped_vid.reader.nframes
video_fps = biped_vid.fps

walker_frame = list()
for i in range(frame_count):
    walker_frame.append(biped_vid.get_frame(i*1.0/video_fps))

vectored_images = list()

# A point in hand and corners on the wall
coor_0, coor_1 = 400, 330
pts = [(210, 183), (210, 315), (308, 183), (308, 315)]

# Window size for the wall is 49
w_size = 49

hand_vectors = []
mean_wall_vectors = []

for i in range(frame_count - 1):
    ans = [
        optical_flow(walker_frame[i], walker_frame[i+1], pts[0], w_size),
        optical_flow(walker_frame[i], walker_frame[i+1], pts[1], w_size),
        optical_flow(walker_frame[i], walker_frame[i+1], pts[2], w_size),
        optical_flow(walker_frame[i], walker_frame[i+1], pts[3], w_size),
        optical_flow(fill_whites(deepcopy(walker_frame[i])),
                     fill_whites(deepcopy(walker_frame[i+1])),
                     (coor_0, coor_1), 7)
    ]
    
    draw_vec = [
        (normalize(ans[0], axis=0) * 30).astype(np.int),
        (normalize(ans[1], axis=0) * 30).astype(np.int),
        (normalize(ans[2], axis=0) * 30).astype(np.int),
        (normalize(ans[3], axis=0) * 30).astype(np.int),
        (normalize(ans[4], axis=0) * 30).astype(np.int)
    ]
    
    cp_image = deepcopy(walker_frame[i])
    cv2.arrowedLine(cp_image, pts[0], (pts[0][0] + draw_vec[0][0], pts[0][1] + draw_vec[0][1]), 255, thickness=2)
    cv2.arrowedLine(cp_image, pts[1], (pts[1][0] + draw_vec[1][0], pts[1][1] + draw_vec[1][1]), 255, thickness=2)
    cv2.arrowedLine(cp_image, pts[2], (pts[2][0] + draw_vec[2][0], pts[2][1] + draw_vec[2][1]), 255, thickness=2)
    cv2.arrowedLine(cp_image, pts[3], (pts[3][0] + draw_vec[3][0], pts[3][1] + draw_vec[3][1]), 255, thickness=2)
    cv2.arrowedLine(cp_image, (coor_0, coor_1), (coor_0 + draw_vec[4][0], coor_1 + draw_vec[4][1]), 255, thickness=2)
    
    vectored_images.append(cp_image)
    
    mean_wall_vectors.append(np.mean(ans[0] + ans[1] + ans[2] + ans[3], axis=1))
    hand_vectors.append(ans[4])
    
    ans[4] = np.clip(ans[4], -2, 2)
    coor_0 = int(np.round(coor_0 + ans[4][0])[0])
    coor_1 = int(np.round(coor_1 + ans[4][1])[0])
            
clip = mpyeditor.ImageSequenceClip(vectored_images, fps=video_fps)
clip.write_videofile("video_2.mp4", codec="libx264")

t:   7%|▋         | 10/153 [00:00<00:01, 98.51it/s, now=None]

Moviepy - Building video video_2.mp4.
Moviepy - Writing video video_2.mp4



                                                              

Moviepy - Done !
Moviepy - video ready video_2.mp4


## Part 3

In [6]:
biped_vid = mpy.VideoFileClip("biped_3.avi")
frame_count = biped_vid.reader.nframes
video_fps = biped_vid.fps

walker_frame = list()
for i in range(frame_count):
    walker_frame.append(biped_vid.get_frame(i*1.0/video_fps))

vectored_images = list()

coor_0, coor_1 = 400, 330
pts = [(210, 183), (210, 315), (308, 183), (308, 315)]
w_size = 49

In [20]:
hand_vectors_part3 = []

for i in range(frame_count - 1):
    ans = optical_flow(fill_whites(deepcopy(walker_frame[i])),
                       fill_whites(deepcopy(walker_frame[i+1])),
                       (coor_0, coor_1), 7)
    
    hand_vectors_part3.append(ans)
    
    ans = np.clip(ans, -2, 2)
    coor_0 = int(np.round(coor_0 + ans[0])[0])
    coor_1 = int(np.round(coor_1 + ans[1])[0])

In [21]:
hand_vectors_part3_fixed = []

for i in range(frame_count - 1):
    ans = [
        optical_flow(walker_frame[i], walker_frame[i+1], pts[0], w_size),
        optical_flow(walker_frame[i], walker_frame[i+1], pts[1], w_size),
        optical_flow(walker_frame[i], walker_frame[i+1], pts[2], w_size),
        optical_flow(walker_frame[i], walker_frame[i+1], pts[3], w_size),
        optical_flow(fill_whites(deepcopy(walker_frame[i])),
                     fill_whites(deepcopy(walker_frame[i+1])),
                     (coor_0, coor_1), 7)
    ]
    
    # Difference between wall vectors in part 2 and part 3 must be due to the camera movement
    mean_wall = np.mean(ans[0] + ans[1] + ans[2] + ans[3], axis=1)
    
    translate_by = (mean_wall - mean_wall_vectors[i]).reshape(2,1)
    hand_vectors_part3_fixed.append(ans[4])
    ans[4] = np.clip(ans[4], -2, 2)
    
    # New coordinates are determined by optical flow - camera movement
    coor_0 = int(np.round(coor_0 + ans[4][0] - translate_by[0]/30)[0])
    coor_1 = int(np.round(coor_1 + ans[4][1] - translate_by[1]/30)[0])

In [22]:
part3_mse = mean_squared_error(np.array(hand_vectors).reshape(-1, 1).astype(np.float),
                               np.array(hand_vectors_part3).reshape(-1, 1).astype(np.float))

In [23]:
part3_mse_fixed = mean_squared_error(np.array(hand_vectors).reshape(-1, 1).astype(np.float),
                               np.array(hand_vectors_part3_fixed).reshape(-1, 1).astype(np.float))

In [24]:
print(f"MSE without correction: {part3_mse}\nMSE with correction: {part3_mse_fixed}")

MSE without correction: 156.0046011157686
MSE with correction: 104.53112335557307
