In [1]:
import cv2
import numpy as np
from numba import jit
import matplotlib.pyplot as plt

In [2]:
video_path = 'res/vtest.avi'
cap = cv2.VideoCapture(video_path)
# cap = cv2.VideoCapture(0)

ret, prev_frame = cap.read()
if not ret:
    print("Error: Could not read video file.")
    0/0

### Optical flow using Lucas Kanade method

In [3]:
def lucas_kanade_optical_flow(I1, I2, keypoints, window_size=5):
    """
    Implement Lucas-Kanade optical flow for sparse feature tracking.
    """

    Ix = cv2.Sobel(I1, cv2.CV_64F, 1, 0, ksize=3)
    Iy = cv2.Sobel(I1, cv2.CV_64F, 0, 1, ksize=3)
    It = I2 - I1

    flow = []

    half_w = window_size // 2

    for x, y in keypoints:
        x, y = int(x), int(y)

        # Extract the gradients in the window around the keypoint
        Ix_win = Ix[y - half_w:y + half_w + 1, x - half_w:x + half_w + 1].flatten()
        Iy_win = Iy[y - half_w:y + half_w + 1, x - half_w:x + half_w + 1].flatten()
        It_win = It[y - half_w:y + half_w + 1, x - half_w:x + half_w + 1].flatten()

        # Construct A and b matrices for Ax = b
        A = np.vstack((Ix_win, Iy_win)).T
        b = -It_win

        # Solve for the flow vector using least squares
        try:
            nu = np.linalg.lstsq(A, b, rcond=None)[0]
            flow.append((nu[0], nu[1]))
        except np.linalg.LinAlgError:
            flow.append((0, 0)) 

    return np.array(flow)

In [4]:
# Parameters for Shi-Tomasi corner detection
feature_params = {
    "maxCorners" : 100,
    "qualityLevel" : 0.3,
    "minDistance" : 7,
    "blockSize" : 7
}

# Colors for drawing tracks
np.random.seed(42)
color = np.random.randint(0, 255, (100, 3))

grey_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
keypoints = cv2.goodFeaturesToTrack(grey_frame, mask=None, **feature_params)
keypoints = np.array([kp.ravel() for kp in keypoints])

# Visualization colors
colors = np.random.randint(0, 255, (len(keypoints), 3))

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Compute optical flow using Lucas-Kanade from scratch
    flow_vectors = lucas_kanade_optical_flow(grey_frame, frame_gray, keypoints, window_size=10)

    # Update the keypoints with flow
    keypoints += flow_vectors

    # Draw the optical flow
    for i, (new, (dx, dy)) in enumerate(zip(keypoints, flow_vectors)):
        x, y = new.ravel().astype(int)
        x_prev, y_prev = (x - dx, y - dy)
        frame = cv2.line(frame, (int(x_prev), int(y_prev)), (x, y), colors[i].tolist(), 2)
        frame = cv2.circle(frame, (x, y), 3, colors[i].tolist(), -1)

    # Display the result
    cv2.imshow('Lucas-Kanade Optical Flow (From Scratch)', frame)

    # Exit on pressing 'q'
    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

    # Update for the next iteration
    grey_frame = frame_gray.copy()

cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
cv2.destroyAllWindows()

In [5]:
# Parameters for Shi-Tomasi corner detection
feature_params = {
    "maxCorners" : 100,
    "qualityLevel" : 0.3,
    "minDistance" : 7,
    "blockSize" : 7
}

# Parameters for Lucas-Kanade optical flow
lk_params = {
    "winSize" : (15, 15), 
    "maxLevel" : 2,
    "criteria" : (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
}

# Colors for drawing tracks
np.random.seed(42)
color = np.random.randint(0, 255, (100, 3))

grey_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
p0 = cv2.goodFeaturesToTrack(grey_frame, mask=None, **feature_params)

mask = np.zeros_like(prev_frame)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    p1, st, err = cv2.calcOpticalFlowPyrLK(grey_frame, frame_gray, p0, None, **lk_params)

    good_new = p1[st == 1]
    good_old = p0[st == 1]

    for i, (new, old) in enumerate(zip(good_new, good_old)):
        a, b = new.ravel().astype(np.int32)
        c, d = old.ravel().astype(np.int32)
        mask = cv2.line(mask, (a, b), (c, d), color[i].tolist(), 2)
        frame = cv2.circle(frame, (a, b), 5, color[i].tolist(), -1)

    output = cv2.add(frame, mask)

    cv2.imshow('Lucas-Kanade Optical Flow', output)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

    # Update previous frame and points
    grey_frame = frame_gray.copy()
    p0 = good_new.reshape(-1, 1, 2)

cap.set(cv2.CAP_PROP_POS_FRAMES, 1)
cv2.destroyAllWindows()


### Vector field visualization

In [6]:
def draw_optical_flow_arrows(flow, step=16):
    """Draws optical flow arrows on an image."""

    h, w = flow.shape[:2]
    arrow_image = np.zeros((h, w, 3), dtype=np.uint8)
    
    for y in range(0, h, step):
        for x in range(0, w, step):
            fx, fy = flow[y, x]
            cv2.arrowedLine(
                arrow_image,
                (x, y), 
                (int(x + fx), int(y + fy)), 
                (0, 255, 0), 1, tipLength=0.3
            )
    return arrow_image

In [7]:
grey_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

while cap.isOpened():

    ret, frame = cap.read()
    if not ret:
        break

    grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    flow = cv2.calcOpticalFlowFarneback(
        grey_frame, grey, None, 0.5, 3, 15, 3, 5, 1.2, 0
    )

    arrows = draw_optical_flow_arrows(flow, step=16)

    overlay = cv2.addWeighted(frame, 0.8, arrows, 0.8, 0)

    cv2.imshow("Optical Flow", overlay)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    grey_frame = grey

cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
cv2.destroyAllWindows()

In [8]:
cap.release()