Exercise 1

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import sobel

def load_frames():
    """
    Simulate loading two consecutive grayscale frames from the Middlebury dataset.
    In practice, replace this with actual image loading (e.g., plt.imread).
    
    Returns:
        tuple: Two frames (numpy arrays) of shape (FRAME_SIZE, FRAME_SIZE).
    """
    frame_t = np.random.rand(FRAME_SIZE, FRAME_SIZE)  # Frame at time t
    frame_t_plus_1 = np.random.rand(FRAME_SIZE, FRAME_SIZE)  # Frame at time t+1
    return frame_t, frame_t_plus_1

def compute_spatial_gradient(frame, axis):
    """
    Compute spatial gradient using Sobel filter along the specified axis.
    
    Args:
        frame (numpy array): Input grayscale frame.
        axis (int): Axis along which to compute the gradient (0 for y, 1 for x).
    
    Returns:
        numpy array: Gradient of the frame along the specified axis.
    """
    return sobel(frame, axis=axis)

def compute_temporal_gradient(frame_t, frame_t_plus_1):
    """
    Compute temporal gradient by differencing two consecutive frames.
    
    Args:
        frame_t (numpy array): Frame at time t.
        frame_t_plus_1 (numpy array): Frame at time t+1.
    
    Returns:
        numpy array: Temporal gradient (frame_t_plus_1 - frame_t).
    """
    return frame_t_plus_1 - frame_t

def visualize_gradients(grad_x, grad_y, grad_t):
    """
    Visualize the spatial and temporal gradients as heatmaps using Matplotlib.
    
    Args:
        grad_x (numpy array): Gradient in x-direction (I_x).
        grad_y (numpy array): Gradient in y-direction (I_y).
        grad_t (numpy array): Temporal gradient (I_t).
    """
    fig, axes = plt.subplots(1, NUM_GRADIENTS, figsize=(FIGURE_WIDTH, FIGURE_HEIGHT))

    # Visualize I_x
    im_x = axes[0].imshow(grad_x, cmap='jet')
    axes[0].set_title('X-Gradient (I_x)')
    plt.colorbar(im_x, ax=axes[0])

    # Visualize I_y
    im_y = axes[1].imshow(grad_y, cmap='jet')
    axes[1].set_title('Y-Gradient (I_y)')
    plt.colorbar(im_y, ax=axes[1])

    # Visualize I_t
    im_t = axes[2].imshow(grad_t, cmap='jet')
    axes[2].set_title('Temporal Gradient (I_t)')
    plt.colorbar(im_t, ax=axes[2])

    plt.tight_layout()
    plt.show()

def main():
    """Main function to compute and visualize image gradients."""
    # Load frames
    frame_t, frame_t_plus_1 = load_frames()

    # Compute gradients
    grad_x = compute_spatial_gradient(frame_t, axis=1)  # I_x
    grad_y = compute_spatial_gradient(frame_t, axis=0)  # I_y
    grad_t = compute_temporal_gradient(frame_t, frame_t_plus_1)  # I_t

    # Visualize results
    visualize_gradients(grad_x, grad_y, grad_t)

if __name__ == "__main__":
    main()


Exercise 2

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Constants
FRAME_SIZE = 256
GRID_SIZE = 10
PATCH_SIZE = 5
PATCH_HALF_SIZE = PATCH_SIZE // 2
FIGURE_SIZE = (6, 6)

def generate_gradients():
    """
    Simulate gradients I_x, I_y, I_t for two frames (256x256).
    In practice, use the gradients computed from Exercise 1.
    
    Returns:
        tuple: (I_x, I_y, I_t), each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    # For demonstration, use random gradients; replace with actual gradients
    grad_x = np.random.randn(FRAME_SIZE, FRAME_SIZE)  # I_x
    grad_y = np.random.randn(FRAME_SIZE, FRAME_SIZE)  # I_y
    grad_t = np.random.randn(FRAME_SIZE, FRAME_SIZE)  # I_t
    return grad_x, grad_y, grad_t

def get_patch_coordinates(grid_x, grid_y):
    """
    Compute the pixel coordinates for a 5x5 patch centered at (grid_x, grid_y).
    
    Args:
        grid_x (int): X-coordinate of the grid point.
        grid_y (int): Y-coordinate of the grid point.
    
    Returns:
        tuple: Arrays of x and y coordinates within the patch.
    """
    start_x = max(grid_x - PATCH_HALF_SIZE, 0)
    end_x = min(grid_x + PATCH_HALF_SIZE + 1, FRAME_SIZE)
    start_y = max(grid_y - PATCH_HALF_SIZE, 0)
    end_y = min(grid_y + PATCH_HALF_SIZE + 1, FRAME_SIZE)
    
    patch_x, patch_y = np.meshgrid(range(start_x, end_x), range(start_y, end_y))
    return patch_x.flatten(), patch_y.flatten()

def solve_optical_flow_patch(grad_x, grad_y, grad_t, patch_x, patch_y):
    """
    Solve for optical flow (u, v) in a patch using least squares.
    
    Args:
        grad_x (numpy array): I_x gradient (256x256).
        grad_y (numpy array): I_y gradient (256x256).
        grad_t (numpy array): I_t gradient (256x256).
        patch_x (numpy array): X-coordinates of the patch.
        patch_y (numpy array): Y-coordinates of the patch.
    
    Returns:
        tuple: Optical flow components (u, v).
    """
    # Extract gradients for the patch
    I_x = grad_x[patch_y, patch_x]
    I_y = grad_y[patch_y, patch_x]
    I_t = grad_t[patch_y, patch_x]
    
    # Form the system A * [u, v] = b
    A = np.vstack((I_x, I_y)).T  # Shape: (num_pixels, 2)
    b = -I_t  # Shape: (num_pixels,)
    
    # Solve using least squares: (A^T A) [u, v] = A^T b
    try:
        solution, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
        u, v = solution
    except np.linalg.LinAlgError:
        u, v = 0.0, 0.0  # Fallback if matrix is singular
    
    return u, v

def compute_optical_flow(grad_x, grad_y, grad_t):
    """
    Compute optical flow (u, v) on a 10x10 grid using 5x5 patches.
    
    Args:
        grad_x (numpy array): I_x gradient (256x256).
        grad_y (numpy array): I_y gradient (256x256).
        grad_t (numpy array): I_t gradient (256x256).
    
    Returns:
        tuple: (grid_positions_x, grid_positions_y, flow_u, flow_v) for the 10x10 grid.
    """
    # Define the 10x10 grid (sample every 25.6 pixels, rounded)
    step = FRAME_SIZE // GRID_SIZE  # Approx 25 pixels
    grid_positions = np.arange(0, FRAME_SIZE, step)
    
    # Adjust to ensure we have exactly 10 points
    grid_positions = grid_positions[:GRID_SIZE]
    
    flow_u = np.zeros((GRID_SIZE, GRID_SIZE))
    flow_v = np.zeros((GRID_SIZE, GRID_SIZE))
    
    for i, gy in enumerate(grid_positions):
        for j, gx in enumerate(grid_positions):
            # Get the 5x5 patch coordinates
            patch_x, patch_y = get_patch_coordinates(gx, gy)
            # Solve for (u, v) in the patch
            u, v = solve_optical_flow_patch(grad_x, grad_y, grad_t, patch_x, patch_y)
            flow_u[i, j] = u
            flow_v[i, j] = v
    
    grid_positions_x, grid_positions_y = np.meshgrid(grid_positions, grid_positions)
    return grid_positions_x, grid_positions_y, flow_u, flow_v

def visualize_optical_flow(grid_x, grid_y, flow_u, flow_v):
    """
    Visualize the optical flow field as a quiver plot.
    
    Args:
        grid_x (numpy array): X-coordinates of the grid (10x10).
        grid_y (numpy array): Y-coordinates of the grid (10x10).
        flow_u (numpy array): U-components of the flow (10x10).
        flow_v (numpy array): V-components of the flow (10x10).
    """
    plt.figure(figsize=FIGURE_SIZE)
    plt.quiver(grid_x, grid_y, flow_u, flow_v, angles='xy', scale_units='xy', scale=1, color='b')
    plt.title('Optical Flow Field (10x10 Grid)')
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.gca().invert_yaxis()  # Match image coordinate system
    plt.grid(True)
    plt.show()

def main():
    """Main function to compute and visualize optical flow."""
    # Generate or load gradients (from Exercise 1)
    grad_x, grad_y, grad_t = generate_gradients()
    
    # Compute optical flow on a 10x10 grid
    grid_x, grid_y, flow_u, flow_v = compute_optical_flow(grad_x, grad_y, grad_t)
    
    # Visualize the flow field
    visualize_optical_flow(grid_x, grid_y, flow_u, flow_v)

if __name__ == "__main__":
    main()

Exercise 3

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import sobel, convolve

# Constants
FRAME_SIZE = 256
LAMBDA = 0.1
LAMBDA_INV = 1 / LAMBDA
MAX_ITERATIONS = 100
CONVERGENCE_THRESHOLD = 1e-3
FIGURE_SIZE = (6, 6)

def load_frames():
    """
    Simulate loading two consecutive grayscale frames (256x256).
    Replace with actual Middlebury dataset loading.
    
    Returns:
        tuple: Two frames (numpy arrays) of shape (FRAME_SIZE, FRAME_SIZE).
    """
    frame_t = np.random.rand(FRAME_SIZE, FRAME_SIZE)  # I(x, y, t)
    frame_t_plus_1 = np.random.rand(FRAME_SIZE, FRAME_SIZE)  # I(x, y, t+1)
    return frame_t, frame_t_plus_1

def compute_gradients(frame_t, frame_t_plus_1):
    """
    Compute spatial and temporal gradients using Sobel filter and frame differencing.
    
    Args:
        frame_t (numpy array): Frame at time t.
        frame_t_plus_1 (numpy array): Frame at time t+1.
    
    Returns:
        tuple: (grad_x, grad_y, grad_t), each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    grad_x = sobel(frame_t, axis=1)  # I_x
    grad_y = sobel(frame_t, axis=0)  # I_y
    grad_t = frame_t_plus_1 - frame_t  # I_t
    return grad_x, grad_y, grad_t

def compute_local_average(field):
    """
    Compute the local average of a field using a 3x3 neighborhood.
    
    Args:
        field (numpy array): Input field (u or v) of shape (FRAME_SIZE, FRAME_SIZE).
    
    Returns:
        numpy array: Local average of the field.
    """
    # Use a 3x3 averaging kernel (excluding the center for proper averaging)
    kernel = np.ones((3, 3)) / 8  # 9 neighbors, but exclude center
    kernel[1, 1] = 0
    return convolve(field, kernel, mode='nearest')

def horn_schunck_update(grad_x, grad_y, grad_t, u, v):
    """
    Perform one iteration of the Horn-Schunck update.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
        u (numpy array): Current u field.
        v (numpy array): Current v field.
    
    Returns:
        tuple: Updated (u, v) fields.
    """
    # Compute local averages
    u_avg = compute_local_average(u)
    v_avg = compute_local_average(v)
    
    # Compute numerator: I_x * u_avg + I_y * v_avg + I_t
    numerator = grad_x * u_avg + grad_y * v_avg + grad_t
    
    # Compute denominator: lambda^-1 + I_x^2 + I_y^2
    denominator = LAMBDA_INV + grad_x**2 + grad_y**2
    
    # Avoid division by zero
    denominator = np.where(denominator == 0, 1e-10, denominator)
    
    # Update u and v
    u_new = u_avg - (numerator / denominator) * grad_x
    v_new = v_avg - (numerator / denominator) * grad_y
    
    return u_new, v_new

def compute_flow_magnitude_and_angle(u, v):
    """
    Compute the magnitude and angle of the flow field for visualization.
    
    Args:
        u (numpy array): U-component of the flow.
        v (numpy array): V-component of the flow.
    
    Returns:
        tuple: (magnitude, angle) of the flow field.
    """
    magnitude = np.sqrt(u**2 + v**2)
    angle = np.arctan2(v, u)  # Angle in radians
    return magnitude, angle

def visualize_flow_field(u, v):
    """
    Visualize the optical flow field as a color-coded map using HSV.
    
    Args:
        u (numpy array): U-component of the flow.
        v (numpy array): V-component of the flow.
    """
    # Compute magnitude and angle
    magnitude, angle = compute_flow_magnitude_and_angle(u, v)
    
    # Normalize magnitude for visualization
    magnitude = np.clip(magnitude, 0, np.percentile(magnitude, 99))  # Clip outliers
    magnitude = magnitude / (magnitude.max() + 1e-10)  # Normalize to [0, 1]
    
    # Convert angle to hue (0 to 2π -> 0 to 1)
    hue = (angle + np.pi) / (2 * np.pi)  # Map to [0, 1]
    
    # Create HSV image
    hsv = np.zeros((FRAME_SIZE, FRAME_SIZE, 3))
    hsv[..., 0] = hue  # Hue
    hsv[..., 1] = 1.0  # Saturation
    hsv[..., 2] = magnitude  # Value (brightness)
    
    # Convert HSV to RGB for display
    import matplotlib.colors as mcolors
    rgb = mcolors.hsv_to_rgb(hsv)
    
    # Plot the flow field
    plt.figure(figsize=FIGURE_SIZE)
    plt.imshow(rgb)
    plt.title('Optical Flow Field (Color-Coded)')
    plt.axis('off')
    plt.show()

def compute_horn_schunck_flow(grad_x, grad_y, grad_t):
    """
    Compute dense optical flow using the Horn-Schunck algorithm.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
    
    Returns:
        tuple: Final (u, v) flow fields.
    """
    # Initialize u and v to zero
    u = np.zeros((FRAME_SIZE, FRAME_SIZE))
    v = np.zeros((FRAME_SIZE, FRAME_SIZE))
    
    # Iterate until convergence or max iterations
    for iteration in range(MAX_ITERATIONS):
        u_old, v_old = u.copy(), v.copy()
        
        # Perform one update step
        u, v = horn_schunck_update(grad_x, grad_y, grad_t, u, v)
        
        # Check for convergence
        u_diff = np.mean(np.abs(u - u_old))
        v_diff = np.mean(np.abs(v - v_old))
        if u_diff < CONVERGENCE_THRESHOLD and v_diff < CONVERGENCE_THRESHOLD:
            print(f"Converged after {iteration + 1} iterations.")
            break
    
    return u, v

def main():
    """Main function to compute and visualize Horn-Schunck optical flow."""
    # Load frames
    frame_t, frame_t_plus_1 = load_frames()
    
    # Compute gradients
    grad_x, grad_y, grad_t = compute_gradients(frame_t, frame_t_plus_1)
    
    # Compute optical flow
    u, v = compute_horn_schunck_flow(grad_x, grad_y, grad_t)
    
    # Visualize the flow field
    visualize_flow_field(u, v)

if __name__ == "__main__":
    main()

Exercise 4

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import sobel, convolve

# Constants (reused from Exercise 3)
FRAME_SIZE = 256
LAMBDA = 0.1
LAMBDA_INV = 1 / LAMBDA
MAX_ITERATIONS = 100
CONVERGENCE_THRESHOLD = 1e-3
FIGURE_SIZE = (12, 6)  # Adjusted for side-by-side plots

# Reused functions from Exercise 3
def load_frames():
    """
    Simulate loading two consecutive grayscale frames (256x256).
    Replace with actual Middlebury dataset loading.
    
    Returns:
        tuple: Two frames (numpy arrays) of shape (FRAME_SIZE, FRAME_SIZE).
    """
    frame_t = np.random.rand(FRAME_SIZE, FRAME_SIZE)
    frame_t_plus_1 = np.random.rand(FRAME_SIZE, FRAME_SIZE)
    return frame_t, frame_t_plus_1

def compute_gradients(frame_t, frame_t_plus_1):
    """
    Compute spatial and temporal gradients using Sobel filter and frame differencing.
    
    Args:
        frame_t (numpy array): Frame at time t.
        frame_t_plus_1 (numpy array): Frame at time t+1.
    
    Returns:
        tuple: (grad_x, grad_y, grad_t), each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    grad_x = sobel(frame_t, axis=1)
    grad_y = sobel(frame_t, axis=0)
    grad_t = frame_t_plus_1 - frame_t
    return grad_x, grad_y, grad_t

def compute_local_average(field):
    """
    Compute the local average of a field using a 3x3 neighborhood.
    
    Args:
        field (numpy array): Input field (u or v) of shape (FRAME_SIZE, FRAME_SIZE).
    
    Returns:
        numpy array: Local average of the field.
    """
    kernel = np.ones((3, 3)) / 8
    kernel[1, 1] = 0
    return convolve(field, kernel, mode='nearest')

def horn_schunck_update(grad_x, grad_y, grad_t, u, v):
    """
    Perform one iteration of the Horn-Schunck update.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
        u (numpy array): Current u field.
        v (numpy array): Current v field.
    
    Returns:
        tuple: Updated (u, v) fields.
    """
    u_avg = compute_local_average(u)
    v_avg = compute_local_average(v)
    numerator = grad_x * u_avg + grad_y * v_avg + grad_t
    denominator = LAMBDA_INV + grad_x**2 + grad_y**2
    denominator = np.where(denominator == 0, 1e-10, denominator)
    u_new = u_avg - (numerator / denominator) * grad_x
    v_new = v_avg - (numerator / denominator) * grad_y
    return u_new, v_new

def compute_horn_schunck_flow(grad_x, grad_y, grad_t):
    """
    Compute dense optical flow using the Horn-Schunck algorithm.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
    
    Returns:
        tuple: Final (u, v) flow fields.
    """
    u = np.zeros((FRAME_SIZE, FRAME_SIZE))
    v = np.zeros((FRAME_SIZE, FRAME_SIZE))
    for iteration in range(MAX_ITERATIONS):
        u_old, v_old = u.copy(), v.copy()
        u, v = horn_schunck_update(grad_x, grad_y, grad_t, u, v)
        u_diff = np.mean(np.abs(u - u_old))
        v_diff = np.mean(np.abs(v - v_old))
        if u_diff < CONVERGENCE_THRESHOLD and v_diff < CONVERGENCE_THRESHOLD:
            print(f"Converged after {iteration + 1} iterations.")
            break
    return u, v

def compute_flow_magnitude_and_angle(u, v):
    """
    Compute the magnitude and angle of the flow field for visualization.
    
    Args:
        u (numpy array): U-component of the flow.
        v (numpy array): V-component of the flow.
    
    Returns:
        tuple: (magnitude, angle) of the flow field.
    """
    magnitude = np.sqrt(u**2 + v**2)
    angle = np.arctan2(v, u)
    return magnitude, angle

# New functions for Exercise 4
def load_ground_truth_flow():
    """
    Simulate loading the ground truth flow field (u_gt, v_gt) from the Middlebury dataset.
    Replace with actual dataset loading.
    
    Returns:
        tuple: Ground truth (u_gt, v_gt), each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    # Simulate ground truth (random flow for demonstration)
    u_gt = np.random.randn(FRAME_SIZE, FRAME_SIZE)
    v_gt = np.random.randn(FRAME_SIZE, FRAME_SIZE)
    return u_gt, v_gt

def compute_endpoint_error(u, v, u_gt, v_gt):
    """
    Compute the Endpoint Error (EPE) between estimated and ground truth flow.
    
    Args:
        u (numpy array): Estimated u field.
        v (numpy array): Estimated v field.
        u_gt (numpy array): Ground truth u field.
        v_gt (numpy array): Ground truth v field.
    
    Returns:
        tuple: (epe_field, avg_epe), where epe_field is the per-pixel EPE and avg_epe is the mean.
    """
    # EPE = sqrt((u - u_gt)^2 + (v - v_gt)^2)
    epe_field = np.sqrt((u - u_gt)**2 + (v - v_gt)**2)
    avg_epe = np.mean(epe_field)
    return epe_field, avg_epe

def visualize_flow_fields(u_est, v_est, u_gt, v_gt):
    """
    Visualize the estimated and ground truth flow fields side-by-side as color-coded maps.
    
    Args:
        u_est (numpy array): Estimated u field.
        v_est (numpy array): Estimated v field.
        u_gt (numpy array): Ground truth u field.
        v_gt (numpy array): Ground truth v field.
    """
    # Compute magnitude and angle for both flow fields
    mag_est, angle_est = compute_flow_magnitude_and_angle(u_est, v_est)
    mag_gt, angle_gt = compute_flow_magnitude_and_angle(u_gt, v_gt)
    
    # Normalize magnitudes for visualization
    mag_est = np.clip(mag_est, 0, np.percentile(mag_est, 99))
    mag_est = mag_est / (mag_est.max() + 1e-10)
    mag_gt = np.clip(mag_gt, 0, np.percentile(mag_gt, 99))
    mag_gt = mag_gt / (mag_gt.max() + 1e-10)
    
    # Convert angles to hue
    hue_est = (angle_est + np.pi) / (2 * np.pi)
    hue_gt = (angle_gt + np.pi) / (2 * np.pi)
    
    # Create HSV images
    hsv_est = np.zeros((FRAME_SIZE, FRAME_SIZE, 3))
    hsv_gt = np.zeros((FRAME_SIZE, FRAME_SIZE, 3))
    
    hsv_est[..., 0], hsv_est[..., 1], hsv_est[..., 2] = hue_est, 1.0, mag_est
    hsv_gt[..., 0], hsv_gt[..., 1], hsv_gt[..., 2] = hue_gt, 1.0, mag_gt
    
    # Convert HSV to RGB
    import matplotlib.colors as mcolors
    rgb_est = mcolors.hsv_to_rgb(hsv_est)
    rgb_gt = mcolors.hsv_to_rgb(hsv_gt)
    
    # Plot side-by-side
    fig, axes = plt.subplots(1, 2, figsize=FIGURE_SIZE)
    
    axes[0].imshow(rgb_est)
    axes[0].set_title('Estimated Flow')
    axes[0].axis('off')
    
    axes[1].imshow(rgb_gt)
    axes[1].set_title('Ground Truth Flow')
    axes[1].axis('off')
    
    plt.tight_layout()
    plt.show()

def main():
    """Main function to evaluate Horn-Schunck optical flow accuracy."""
    # Load frames and ground truth
    frame_t, frame_t_plus_1 = load_frames()
    u_gt, v_gt = load_ground_truth_flow()
    
    # Compute gradients
    grad_x, grad_y, grad_t = compute_gradients(frame_t, frame_t_plus_1)
    
    # Compute estimated flow using Horn-Schunck
    u_est, v_est = compute_horn_schunck_flow(grad_x, grad_y, grad_t)
    
    # Compute EPE
    epe_field, avg_epe = compute_endpoint_error(u_est, v_est, u_gt, v_gt)
    print(f"Average Endpoint Error (EPE): {avg_epe:.4f}")
    
    # Visualize both flow fields
    visualize_flow_fields(u_est, v_est, u_gt, v_gt)

if __name__ == "__main__":
    main()

Exercise 5

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import sobel, convolve
from scipy.interpolate import RegularGridInterpolator

# Constants (reused from Exercise 3)
FRAME_SIZE = 256
LAMBDA = 0.1
LAMBDA_INV = 1 / LAMBDA
MAX_ITERATIONS = 100
CONVERGENCE_THRESHOLD = 1e-3
FIGURE_SIZE = (6, 6)
NUM_FRAMES = 10
NUM_FEATURES = 50

# Reused functions from Exercise 3
def load_frames(num_frames=NUM_FRAMES):
    """
    Simulate loading a sequence of grayscale frames (256x256).
    Replace with actual dataset loading.
    
    Args:
        num_frames (int): Number of frames to load.
    
    Returns:
        list: List of frames, each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    frames = [np.random.rand(FRAME_SIZE, FRAME_SIZE) for _ in range(num_frames)]
    return frames

def compute_gradients(frame_t, frame_t_plus_1):
    """
    Compute spatial and temporal gradients using Sobel filter and frame differencing.
    
    Args:
        frame_t (numpy array): Frame at time t.
        frame_t_plus_1 (numpy array): Frame at time t+1.
    
    Returns:
        tuple: (grad_x, grad_y, grad_t), each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    grad_x = sobel(frame_t, axis=1)
    grad_y = sobel(frame_t, axis=0)
    grad_t = frame_t_plus_1 - frame_t
    return grad_x, grad_y, grad_t

def compute_local_average(field):
    """
    Compute the local average of a field using a 3x3 neighborhood.
    
    Args:
        field (numpy array): Input field (u or v) of shape (FRAME_SIZE, FRAME_SIZE).
    
    Returns:
        numpy array: Local average of the field.
    """
    kernel = np.ones((3, 3)) / 8
    kernel[1, 1] = 0
    return convolve(field, kernel, mode='nearest')

def horn_schunck_update(grad_x, grad_y, grad_t, u, v):
    """
    Perform one iteration of the Horn-Schunck update.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
        u (numpy array): Current u field.
        v (numpy array): Current v field.
    
    Returns:
        tuple: Updated (u, v) fields.
    """
    u_avg = compute_local_average(u)
    v_avg = compute_local_average(v)
    numerator = grad_x * u_avg + grad_y * v_avg + grad_t
    denominator = LAMBDA_INV + grad_x**2 + grad_y**2
    denominator = np.where(denominator == 0, 1e-10, denominator)
    u_new = u_avg - (numerator / denominator) * grad_x
    v_new = v_avg - (numerator / denominator) * grad_y
    return u_new, v_new

def compute_horn_schunck_flow(grad_x, grad_y, grad_t):
    """
    Compute dense optical flow using the Horn-Schunck algorithm.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
    
    Returns:
        tuple: Final (u, v) flow fields.
    """
    u = np.zeros((FRAME_SIZE, FRAME_SIZE))
    v = np.zeros((FRAME_SIZE, FRAME_SIZE))
    for iteration in range(MAX_ITERATIONS):
        u_old, v_old = u.copy(), v.copy()
        u, v = horn_schunck_update(grad_x, grad_y, grad_t, u, v)
        u_diff = np.mean(np.abs(u - u_old))
        v_diff = np.mean(np.abs(v - v_old))
        if u_diff < CONVERGENCE_THRESHOLD and v_diff < CONVERGENCE_THRESHOLD:
            print(f"Converged after {iteration + 1} iterations.")
            break
    return u, v

# New functions for Exercise 5
def initialize_feature_points(num_points=NUM_FEATURES):
    """
    Initialize a set of feature points with random (x, y) coordinates.
    
    Args:
        num_points (int): Number of feature points to initialize.
    
    Returns:
        numpy array: Array of shape (num_points, 2) with (x, y) coordinates.
    """
    points = np.random.uniform(0, FRAME_SIZE, size=(num_points, 2))
    return points

def interpolate_flow(u, v, points):
    """
    Interpolate the optical flow (u, v) at given points (x, y).
    
    Args:
        u (numpy array): U-component of the flow field.
        v (numpy array): V-component of the flow field.
        points (numpy array): Array of shape (num_points, 2) with (x, y) coordinates.
    
    Returns:
        tuple: Interpolated (u, v) values at the given points.
    """
    x = np.arange(FRAME_SIZE)
    y = np.arange(FRAME_SIZE)
    u_interpolator = RegularGridInterpolator((y, x), u, bounds_error=False, fill_value=0)
    v_interpolator = RegularGridInterpolator((y, x), v, bounds_error=False, fill_value=0)
    
    u_vals = u_interpolator(points)
    v_vals = v_interpolator(points)
    return u_vals, v_vals

def track_features(frames, initial_points):
    """
    Track feature points across frames using optical flow.
    
    Args:
        frames (list): List of frames.
        initial_points (numpy array): Initial feature points of shape (num_points, 2).
    
    Returns:
        list: Trajectories of shape (num_frames, num_points, 2).
    """
    trajectories = [initial_points.copy()]  # List to store positions at each frame
    current_points = initial_points.copy()
    
    for t in range(len(frames) - 1):
        # Compute optical flow between consecutive frames
        grad_x, grad_y, grad_t = compute_gradients(frames[t], frames[t + 1])
        u, v = compute_horn_schunck_flow(grad_x, grad_y, grad_t)
        
        # Interpolate flow at current points
        u_vals, v_vals = interpolate_flow(u, v, current_points)
        
        # Update positions: x_{t+1} = x_t + u, y_{t+1} = y_t + v
        current_points[:, 0] += u_vals  # Update x
        current_points[:, 1] += v_vals  # Update y
        
        # Clamp points to stay within frame bounds
        current_points = np.clip(current_points, 0, FRAME_SIZE - 1)
        
        trajectories.append(current_points.copy())
    
    return np.array(trajectories)

def visualize_trajectories(frames, trajectories):
    """
    Visualize feature point trajectories on the last frame.
    
    Args:
        frames (list): List of frames.
        trajectories (numpy array): Trajectories of shape (num_frames, num_points, 2).
    """
    plt.figure(figsize=FIGURE_SIZE)
    plt.imshow(frames[-1], cmap='gray')
    
    # Plot trajectories for each point
    num_points = trajectories.shape[1]
    colors = plt.cm.get_cmap('tab20')(np.linspace(0, 1, num_points))  # Different colors for each trajectory
    
    for i in range(num_points):
        x_traj = trajectories[:, i, 0]
        y_traj = trajectories[:, i, 1]
        plt.plot(x_traj, y_traj, color=colors[i], linewidth=1, alpha=0.8)
        # Mark the final position
        plt.scatter(x_traj[-1], y_traj[-1], color=colors[i], s=10)
    
    plt.title('Feature Point Trajectories on Last Frame')
    plt.axis('off')
    plt.show()

def main():
    """Main function to track features and visualize trajectories."""
    # Load frames
    frames = load_frames()
    
    # Initialize feature points
    initial_points = initialize_feature_points()
    
    # Track features across frames
    trajectories = track_features(frames, initial_points)
    
    # Visualize trajectories on the last frame
    visualize_trajectories(frames, trajectories)

if __name__ == "__main__":
    main()

Exercise 6

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import sobel, convolve
import pandas as pd

# Constants (reused from previous exercises)
FRAME_SIZE = 256
MAX_ITERATIONS = 100
CONVERGENCE_THRESHOLD = 1e-3
FIGURE_SIZE = (18, 6)  # Adjusted for three side-by-side plots
LAMBDA_VALUES = [0.01, 0.1, 1.0]

# Reused functions from Exercises 3 and 4
def load_frames():
    """
    Simulate loading two consecutive grayscale frames (256x256).
    Replace with actual Middlebury dataset loading.
    
    Returns:
        tuple: Two frames (numpy arrays) of shape (FRAME_SIZE, FRAME_SIZE).
    """
    frame_t = np.random.rand(FRAME_SIZE, FRAME_SIZE)
    frame_t_plus_1 = np.random.rand(FRAME_SIZE, FRAME_SIZE)
    return frame_t, frame_t_plus_1

def load_ground_truth_flow():
    """
    Simulate loading the ground truth flow field (u_gt, v_gt).
    Replace with actual dataset loading.
    
    Returns:
        tuple: Ground truth (u_gt, v_gt), each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    u_gt = np.random.randn(FRAME_SIZE, FRAME_SIZE)
    v_gt = np.random.randn(FRAME_SIZE, FRAME_SIZE)
    return u_gt, v_gt

def compute_gradients(frame_t, frame_t_plus_1):
    """
    Compute spatial and temporal gradients using Sobel filter and frame differencing.
    
    Args:
        frame_t (numpy array): Frame at time t.
        frame_t_plus_1 (numpy array): Frame at time t+1.
    
    Returns:
        tuple: (grad_x, grad_y, grad_t), each of shape (FRAME_SIZE, FRAME_SIZE).
    """
    grad_x = sobel(frame_t, axis=1)
    grad_y = sobel(frame_t, axis=0)
    grad_t = frame_t_plus_1 - frame_t
    return grad_x, grad_y, grad_t

def compute_local_average(field):
    """
    Compute the local average of a field using a 3x3 neighborhood.
    
    Args:
        field (numpy array): Input field (u or v) of shape (FRAME_SIZE, FRAME_SIZE).
    
    Returns:
        numpy array: Local average of the field.
    """
    kernel = np.ones((3, 3)) / 8
    kernel[1, 1] = 0
    return convolve(field, kernel, mode='nearest')

def horn_schunck_update(grad_x, grad_y, grad_t, u, v, lambda_inv):
    """
    Perform one iteration of the Horn-Schunck update.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
        u (numpy array): Current u field.
        v (numpy array): Current v field.
        lambda_inv (float): 1 / lambda (smoothness parameter inverse).
    
    Returns:
        tuple: Updated (u, v) fields.
    """
    u_avg = compute_local_average(u)
    v_avg = compute_local_average(v)
    numerator = grad_x * u_avg + grad_y * v_avg + grad_t
    denominator = lambda_inv + grad_x**2 + grad_y**2
    denominator = np.where(denominator == 0, 1e-10, denominator)
    u_new = u_avg - (numerator / denominator) * grad_x
    v_new = v_avg - (numerator / denominator) * grad_y
    return u_new, v_new

def compute_horn_schunck_flow(grad_x, grad_y, grad_t, lambda_val):
    """
    Compute dense optical flow using the Horn-Schunck algorithm.
    
    Args:
        grad_x (numpy array): I_x gradient.
        grad_y (numpy array): I_y gradient.
        grad_t (numpy array): I_t gradient.
        lambda_val (float): Smoothness parameter lambda.
    
    Returns:
        tuple: Final (u, v) flow fields.
    """
    lambda_inv = 1 / lambda_val
    u = np.zeros((FRAME_SIZE, FRAME_SIZE))
    v = np.zeros((FRAME_SIZE, FRAME_SIZE))
    for iteration in range(MAX_ITERATIONS):
        u_old, v_old = u.copy(), v.copy()
        u, v = horn_schunck_update(grad_x, grad_y, grad_t, u, v, lambda_inv)
        u_diff = np.mean(np.abs(u - u_old))
        v_diff = np.mean(np.abs(v - v_old))
        if u_diff < CONVERGENCE_THRESHOLD and v_diff < CONVERGENCE_THRESHOLD:
            print(f"Lambda {lambda_val}: Converged after {iteration + 1} iterations.")
            break
    return u, v

def compute_flow_magnitude_and_angle(u, v):
    """
    Compute the magnitude and angle of the flow field for visualization.
    
    Args:
        u (numpy array): U-component of the flow.
        v (numpy array): V-component of the flow.
    
    Returns:
        tuple: (magnitude, angle) of the flow field.
    """
    magnitude = np.sqrt(u**2 + v**2)
    angle = np.arctan2(v, u)
    return magnitude, angle

def compute_endpoint_error(u, v, u_gt, v_gt):
    """
    Compute the Endpoint Error (EPE) between estimated and ground truth flow.
    
    Args:
        u (numpy array): Estimated u field.
        v (numpy array): Estimated v field.
        u_gt (numpy array): Ground truth u field.
        v_gt (numpy array): Ground truth v field.
    
    Returns:
        tuple: (epe_field, avg_epe), where epe_field is the per-pixel EPE and avg_epe is the mean.
    """
    epe_field = np.sqrt((u - u_gt)**2 + (v - v_gt)**2)
    avg_epe = np.mean(epe_field)
    return epe_field, avg_epe

def visualize_flow_fields(flow_fields, lambda_values):
    """
    Visualize multiple flow fields side-by-side as color-coded maps.
    
    Args:
        flow_fields (list): List of (u, v) tuples for each lambda.
        lambda_values (list): List of lambda values.
    """
    fig, axes = plt.subplots(1, len(lambda_values), figsize=FIGURE_SIZE)
    
    for idx, (u, v) in enumerate(flow_fields):
        # Compute magnitude and angle
        mag, angle = compute_flow_magnitude_and_angle(u, v)
        
        # Normalize magnitude for visualization
        mag = np.clip(mag, 0, np.percentile(mag, 99))
        mag = mag / (mag.max() + 1e-10)
        
        # Convert angle to hue
        hue = (angle + np.pi) / (2 * np.pi)
        
        # Create HSV image
        hsv = np.zeros((FRAME_SIZE, FRAME_SIZE, 3))
        hsv[..., 0], hsv[..., 1], hsv[..., 2] = hue, 1.0, mag
        
        # Convert HSV to RGB
        import matplotlib.colors as mcolors
        rgb = mcolors.hsv_to_rgb(hsv)
        
        # Plot
        axes[idx].imshow(rgb)
        axes[idx].set_title(f'Flow (λ = {lambda_values[idx]})')
        axes[idx].axis('off')
    
    plt.tight_layout()
    plt.show()

def create_epe_table(lambda_values, epe_scores):
    """
    Create and display a table of lambda values and their corresponding EPE scores.
    
    Args:
        lambda_values (list): List of lambda values.
        epe_scores (list): List of corresponding EPE scores.
    """
    data = {'λ': lambda_values, 'Average EPE': [f"{score:.4f}" for score in epe_scores]}
    df = pd.DataFrame(data)
    print("\nEPE Scores for Different λ Values:")
    print(df.to_string(index=False))

def main():
    """Main function to tune the smoothness parameter λ and evaluate its effect."""
    # Load frames and ground truth
    frame_t, frame_t_plus_1 = load_frames()
    u_gt, v_gt = load_ground_truth_flow()
    
    # Compute gradients
    grad_x, grad_y, grad_t = compute_gradients(frame_t, frame_t_plus_1)
    
    # Store results
    flow_fields = []
    epe_scores = []
    
    # Run Horn-Schunck for each lambda value
    for lambda_val in LAMBDA_VALUES:
        print(f"\nComputing optical flow for λ = {lambda_val}...")
        u, v = compute_horn_schunck_flow(grad_x, grad_y, grad_t, lambda_val)
        flow_fields.append((u, v))
        
        # Compute EPE
        _, avg_epe = compute_endpoint_error(u, v, u_gt, v_gt)
        epe_scores.append(avg_epe)
    
    # Visualize flow fields
    visualize_flow_fields(flow_fields, LAMBDA_VALUES)
    
    # Create and display EPE table
    create_epe_table(LAMBDA_VALUES, epe_scores)

if __name__ == "__main__":
    main()

Exercise 7

In [None]:
import cv2 as cv
import numpy as np

# Constants
FRAME_WIDTH = 640
FRAME_HEIGHT = 480
MAX_CORNERS = 100
QUALITY_LEVEL = 0.3
MIN_DISTANCE = 7
BLOCK_SIZE = 7
ESC_KEY = 27
WINDOW_NAME = "Feature Tracking with Lucas-Kanade"

# Lucas-Kanade optical flow parameters
LK_PARAMS = dict(
    winSize=(15, 15),
    maxLevel=2,
    criteria=(cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03)
)

def initialize_webcam():
    """
    Initialize the webcam feed and set frame dimensions.
    
    Returns:
        cv.VideoCapture: Webcam capture object.
    """
    cap = cv.VideoCapture(0)  # 0 for default webcam
    if not cap.isOpened():
        raise RuntimeError("Error: Could not open webcam.")
    
    cap.set(cv.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
    cap.set(cv.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
    return cap

def detect_initial_features(frame_gray):
    """
    Detect initial feature points using goodFeaturesToTrack.
    
    Args:
        frame_gray (numpy array): Grayscale frame.
    
    Returns:
        numpy array: Initial feature points of shape (num_points, 1, 2).
    """
    corners = cv.goodFeaturesToTrack(
        frame_gray,
        maxCorners=MAX_CORNERS,
        qualityLevel=QUALITY_LEVEL,
        minDistance=MIN_DISTANCE,
        blockSize=BLOCK_SIZE
    )
    return corners

def compute_optical_flow(old_gray, frame_gray, points):
    """
    Compute Lucas-Kanade optical flow to track feature points.
    
    Args:
        old_gray (numpy array): Previous grayscale frame.
        frame_gray (numpy array): Current grayscale frame.
        points (numpy array): Feature points to track.
    
    Returns:
        tuple: (new_points, status, good_new, good_old)
            - new_points: Updated points.
            - status: Status of tracked points.
            - good_new: Successfully tracked new points.
            - good_old: Corresponding old points.
    """
    new_points, status, _ = cv.calcOpticalFlowPyrLK(
        old_gray, frame_gray, points, None, **LK_PARAMS
    )
    
    # Select good points (status == 1)
    good_new = new_points[status == 1]
    good_old = points[status == 1]
    
    return new_points, status, good_new, good_old

def draw_trajectories(frame, old_points, new_points, status, color):
    """
    Draw trajectories as lines and circles for current positions.
    
    Args:
        frame (numpy array): Frame to draw on.
        old_points (numpy array): Previous feature points.
        new_points (numpy array): Current feature points.
        status (numpy array): Status of tracked points.
        color (tuple): Color for drawing.
    
    Returns:
        numpy array: Frame with drawn trajectories.
    """
    for i, (new, old) in enumerate(zip(new_points, old_points)):
        if status[i] != 1:
            continue
        a, b = new.ravel().astype(int)
        c, d = old.ravel().astype(int)
        # Draw line between old and new positions
        frame = cv.line(frame, (a, b), (c, d), color, 2)
        # Draw circle at current position
        frame = cv.circle(frame, (a, b), 5, color, -1)
    return frame

def main():
    """Main function to perform real-time feature tracking with Lucas-Kanade."""
    # Initialize webcam
    cap = initialize_webcam()
    
    # Read the first frame and detect initial features
    ret, old_frame = cap.read()
    if not ret:
        raise RuntimeError("Error: Could not read frame from webcam.")
    
    old_gray = cv.cvtColor(old_frame, cv.COLOR_BGR2GRAY)
    points = detect_initial_features(old_gray)
    
    if points is None or len(points) == 0:
        raise RuntimeError("Error: No feature points detected in the first frame.")
    
    # Create a mask for drawing trajectories
    mask = np.zeros_like(old_frame)
    
    # Generate random colors for each point
    colors = np.random.randint(0, 255, (MAX_CORNERS, 3)).tolist()
    
    while True:
        # Read the current frame
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame. Exiting...")
            break
        
        frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        
        # Compute optical flow
        new_points, status, good_new, good_old = compute_optical_flow(old_gray, frame_gray, points)
        
        # Draw trajectories on the mask
        for i, (new, old) in enumerate(zip(good_new, good_old)):
            mask = cv.line(mask, tuple(new.ravel().astype(int)), tuple(old.ravel().astype(int)), colors[i], 2)
            frame = cv.circle(frame, tuple(new.ravel().astype(int)), 5, colors[i], -1)
        
        # Overlay the mask on the frame
        frame = cv.add(frame, mask)
        
        # Display the frame
        cv.imshow(WINDOW_NAME, frame)
        
        # Update points and previous frame
        old_gray = frame_gray.copy()
        points = good_new.reshape(-1, 1, 2)
        
        # Exit on ESC key
        key = cv.waitKey(30) & 0xFF
        if key == ESC_KEY:
            break
        
        # If too few points remain, redetect features
        if len(points) < 10:
            points = detect_initial_features(old_gray)
            mask = np.zeros_like(frame)  # Reset mask
    
    # Clean up
    cap.release()
    cv.destroyAllWindows()

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"An error occurred: {e}")

Exercise 8

In [None]:
import cv2 as cv
import numpy as np

# Constants
FRAME_WIDTH = 640
FRAME_HEIGHT = 480
ESC_KEY = 27
WINDOW_NAME = "Object Tracking with Farneback Optical Flow"
BOX_COLOR = (0, 255, 0)  # Green bounding box
BOX_THICKNESS = 2

# Farneback optical flow parameters
FARNEBACK_PARAMS = dict(
    pyr_scale=0.5,
    levels=3,
    winsize=15,
    iterations=3,
    poly_n=5,
    poly_sigma=1.2,
    flags=0
)

def initialize_webcam():
    """
    Initialize the webcam feed and set frame dimensions.
    
    Returns:
        cv.VideoCapture: Webcam capture object.
    """
    cap = cv.VideoCapture(0)  # 0 for default webcam
    if not cap.isOpened():
        raise RuntimeError("Error: Could not open webcam.")
    
    cap.set(cv.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
    cap.set(cv.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
    return cap

def select_initial_roi(frame):
    """
    Prompt the user to select an ROI for tracking.
    
    Args:
        frame (numpy array): First frame to display for ROI selection.
    
    Returns:
        tuple: (x, y, w, h) of the selected ROI.
    """
    roi = cv.selectROI(WINDOW_NAME, frame, fromCenter=False, showCrosshair=True)
    if roi == (0, 0, 0, 0):
        raise RuntimeError("Error: No ROI selected.")
    return roi

def compute_dense_flow(old_gray, frame_gray):
    """
    Compute dense optical flow using Farneback method.
    
    Args:
        old_gray (numpy array): Previous grayscale frame.
        frame_gray (numpy array): Current grayscale frame.
    
    Returns:
        numpy array: Dense flow field of shape (height, width, 2).
    """
    flow = cv.calcOpticalFlowFarneback(
        old_gray, frame_gray, None, **FARNEBACK_PARAMS
    )
    return flow

def update_roi_position(flow, roi):
    """
    Update the ROI position by averaging flow vectors within the ROI.
    
    Args:
        flow (numpy array): Dense flow field.
        roi (tuple): Current ROI as (x, y, w, h).
    
    Returns:
        tuple: Updated ROI as (x, y, w, h).
    """
    x, y, w, h = roi
    
    # Extract flow vectors within the ROI
    flow_roi = flow[y:y+h, x:x+w]
    
    # Compute average flow (dx, dy)
    if flow_roi.size == 0:
        return roi  # No update if ROI is invalid
    avg_flow = np.mean(flow_roi, axis=(0, 1))
    dx, dy = avg_flow
    
    # Update ROI position
    new_x = int(x + dx)
    new_y = int(y + dy)
    
    # Ensure ROI stays within frame bounds
    new_x = max(0, min(new_x, FRAME_WIDTH - w))
    new_y = max(0, min(new_y, FRAME_HEIGHT - h))
    
    return (new_x, new_y, w, h)

def draw_bounding_box(frame, roi):
    """
    Draw the bounding box on the frame.
    
    Args:
        frame (numpy array): Frame to draw on.
        roi (tuple): ROI as (x, y, w, h).
    
    Returns:
        numpy array: Frame with bounding box drawn.
    """
    x, y, w, h = roi
    frame = cv.rectangle(frame, (x, y), (x + w, y + h), BOX_COLOR, BOX_THICKNESS)
    return frame

def main():
    """Main function to perform real-time object tracking with Farneback optical flow."""
    # Initialize webcam
    cap = initialize_webcam()
    
    # Read the first frame and select ROI
    ret, frame = cap.read()
    if not ret:
        raise RuntimeError("Error: Could not read frame from webcam.")
    
    old_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
    roi = select_initial_roi(frame)
    
    while True:
        # Read the current frame
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame. Exiting...")
            break
        
        frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        
        # Compute dense optical flow
        flow = compute_dense_flow(old_gray, frame_gray)
        
        # Update ROI position
        roi = update_roi_position(flow, roi)
        
        # Draw bounding box
        frame = draw_bounding_box(frame, roi)
        
        # Display the frame
        cv.imshow(WINDOW_NAME, frame)
        
        # Update previous frame
        old_gray = frame_gray.copy()
        
        # Exit on ESC key
        key = cv.waitKey(30) & 0xFF
        if key == ESC_KEY:
            break
    
    # Clean up
    cap.release()
    cv.destroyAllWindows()

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"An error occurred: {e}")

Exercise 9

In [None]:
import cv2 as cv
import numpy as np

# Constants
FRAME_WIDTH = 640
FRAME_HEIGHT = 480
ESC_KEY = 27
WINDOW_NAME = "SIFT Keypoint Tracking with Lucas-Kanade"
LINE_COLOR = (0, 255, 0)  # Green for trajectories
CIRCLE_COLOR = (0, 0, 255)  # Red for current positions
CIRCLE_RADIUS = 5
CIRCLE_THICKNESS = -1  # Filled circle

# Lucas-Kanade optical flow parameters
LK_PARAMS = dict(
    winSize=(15, 15),
    maxLevel=2,
    criteria=(cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03)
)

def initialize_webcam():
    """
    Initialize the webcam feed and set frame dimensions.
    
    Returns:
        cv.VideoCapture: Webcam capture object.
    """
    cap = cv.VideoCapture(0)  # 0 for default webcam
    if not cap.isOpened():
        raise RuntimeError("Error: Could not open webcam.")
    
    cap.set(cv.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
    cap.set(cv.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
    return cap

def select_initial_roi(frame):
    """
    Prompt the user to select an ROI for tracking.
    
    Args:
        frame (numpy array): First frame to display for ROI selection.
    
    Returns:
        tuple: (x, y, w, h) of the selected ROI.
    """
    roi = cv.selectROI(WINDOW_NAME, frame, fromCenter=False, showCrosshair=True)
    if roi == (0, 0, 0, 0):
        raise RuntimeError("Error: No ROI selected.")
    return roi

def detect_sift_keypoints(frame_gray, roi):
    """
    Detect SIFT keypoints within the ROI and adjust coordinates to the global frame.
    
    Args:
        frame_gray (numpy array): Grayscale frame.
        roi (tuple): ROI as (x, y, w, h).
    
    Returns:
        numpy array: Keypoints in the format (num_points, 1, 2) for Lucas-Kanade.
    """
    x, y, w, h = roi
    
    # Extract the ROI region
    roi_gray = frame_gray[y:y+h, x:x+w]
    if roi_gray.size == 0:
        raise RuntimeError("Error: Invalid ROI region.")
    
    # Initialize SIFT detector
    sift = cv.SIFT_create()
    
    # Detect keypoints in the ROI
    keypoints = sift.detect(roi_gray, None)
    
    if not keypoints:
        raise RuntimeError("Error: No SIFT keypoints detected in the ROI.")
    
    # Convert keypoints to numpy array and adjust coordinates
    points = np.array([kp.pt for kp in keypoints], dtype=np.float32)
    points[:, 0] += x  # Add ROI x offset
    points[:, 1] += y  # Add ROI y offset
    
    # Reshape for Lucas-Kanade
    points = points.reshape(-1, 1, 2)
    return points

def compute_optical_flow(old_gray, frame_gray, points):
    """
    Compute Lucas-Kanade optical flow to track keypoints.
    
    Args:
        old_gray (numpy array): Previous grayscale frame.
        frame_gray (numpy array): Current grayscale frame.
        points (numpy array): Keypoints to track.
    
    Returns:
        tuple: (new_points, status, good_new, good_old)
            - new_points: Updated points.
            - status: Status of tracked points.
            - good_new: Successfully tracked new points.
            - good_old: Corresponding old points.
    """
    new_points, status, _ = cv.calcOpticalFlowPyrLK(
        old_gray, frame_gray, points, None, **LK_PARAMS
    )
    
    # Select good points (status == 1)
    good_new = new_points[status == 1]
    good_old = points[status == 1]
    
    return new_points, status, good_new, good_old

def draw_trajectories(frame, old_points, new_points, status):
    """
    Draw trajectories as green lines and red circles for current positions.
    
    Args:
        frame (numpy array): Frame to draw on.
        old_points (numpy array): Previous keypoints.
        new_points (numpy array): Current keypoints.
        status (numpy array): Status of tracked points.
    
    Returns:
        numpy array: Frame with drawn trajectories.
    """
    for i, (new, old) in enumerate(zip(new_points, old_points)):
        if status[i] != 1:
            continue
        a, b = new.ravel().astype(int)
        c, d = old.ravel().astype(int)
        # Draw green line between old and new positions
        frame = cv.line(frame, (a, b), (c, d), LINE_COLOR, 2)
        # Draw red circle at current position
        frame = cv.circle(frame, (a, b), CIRCLE_RADIUS, CIRCLE_COLOR, CIRCLE_THICKNESS)
    return frame

def main():
    """Main function to perform real-time SIFT keypoint tracking with Lucas-Kanade."""
    # Initialize webcam
    cap = initialize_webcam()
    
    # Read the first frame and select ROI
    ret, frame = cap.read()
    if not ret:
        raise RuntimeError("Error: Could not read frame from webcam.")
    
    old_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
    roi = select_initial_roi(frame)
    
    # Detect initial SIFT keypoints
    points = detect_sift_keypoints(old_gray, roi)
    
    # Create a mask for drawing trajectories
    mask = np.zeros_like(frame)
    
    while True:
        # Read the current frame
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame. Exiting...")
            break
        
        frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        
        # Compute optical flow
        new_points, status, good_new, good_old = compute_optical_flow(old_gray, frame_gray, points)
        
        # Draw trajectories on the mask
        mask = draw_trajectories(mask, good_old, good_new, status.ravel())
        
        # Overlay the mask on the frame
        frame = cv.add(frame, mask)
        
        # Display the frame
        cv.imshow(WINDOW_NAME, frame)
        
        # Update points and previous frame
        old_gray = frame_gray.copy()
        points = good_new.reshape(-1, 1, 2)
        
        # Exit on ESC key
        key = cv.waitKey(30) & 0xFF
        if key == ESC_KEY:
            break
        
        # If too few points remain, redetect keypoints
        if len(points) < 5:
            points = detect_sift_keypoints(old_gray, roi)
            mask = np.zeros_like(frame)  # Reset mask
    
    # Clean up
    cap.release()
    cv.destroyAllWindows()

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"An error occurred: {e}")

Exercise 10

In [None]:
import cv2 as cv
import numpy as np
import time

# Constants
FRAME_WIDTH = 640
FRAME_HEIGHT = 480
ESC_KEY = 27
WINDOW_NAME = "Feature Tracking and Motion Prediction with Lucas-Kanade"
ACTUAL_LINE_COLOR = (0, 255, 0)  # Green for actual trajectories
PREDICTED_LINE_COLOR = (255, 0, 0)  # Blue for predicted trajectories
CURRENT_CIRCLE_COLOR = (0, 0, 255)  # Red for current positions
PREDICTED_CIRCLE_COLOR = (255, 255, 0)  # Light blue for predicted positions
CIRCLE_RADIUS = 3
CIRCLE_THICKNESS = -1
MAX_FEATURES = 50
REDETECTION_THRESHOLD = 10
REDETECTION_COOLDOWN = 1.0  # Seconds
FRAME_SCALE_FACTOR = 0.5  # Resize frames to 50% for faster processing

# Lucas-Kanade optical flow parameters
LK_PARAMS = dict(
    winSize=(10, 10),
    maxLevel=1,
    criteria=(cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03)
)

def initialize_webcam():
    """Initialize the webcam feed and set frame dimensions."""
    cap = cv.VideoCapture(0)
    if not cap.isOpened():
        raise RuntimeError("Error: Could not open webcam.")
    cap.set(cv.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
    cap.set(cv.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
    return cap

def resize_frame(frame, scale_factor):
    """Resize the frame to reduce processing time."""
    new_width = int(frame.shape[1] * scale_factor)
    new_height = int(frame.shape[0] * scale_factor)
    resized_frame = cv.resize(frame, (new_width, new_height), interpolation=cv.INTER_LINEAR)
    return resized_frame, new_width, new_height

def select_and_scale_roi(frame, scale_factor):
    """Prompt the user to select an ROI and scale it."""
    roi = cv.selectROI(WINDOW_NAME, frame, fromCenter=False, showCrosshair=True)
    if roi == (0, 0, 0, 0):
        raise RuntimeError("Error: No ROI selected.")
    x, y, w, h = roi
    return (
        int(x * scale_factor),
        int(y * scale_factor),
        int(w * scale_factor),
        int(h * scale_factor)
    )

def detect_features(frame_gray, roi):
    """Detect feature points within the ROI."""
    x, y, w, h = roi
    roi_gray = frame_gray[y:y+h, x:x+w]
    if roi_gray.size == 0:
        raise RuntimeError("Error: Invalid ROI region.")
    corners = cv.goodFeaturesToTrack(
        roi_gray,
        maxCorners=MAX_FEATURES,
        qualityLevel=0.3,
        minDistance=7,
        blockSize=7
    )
    if corners is None or len(corners) == 0:
        raise RuntimeError("Error: No feature points detected in the ROI.")
    corners[:, :, 0] += x
    corners[:, :, 1] += y
    return corners

def compute_optical_flow(old_gray, frame_gray, points):
    """Compute Lucas-Kanade optical flow to track feature points."""
    if len(points) == 0:
        return None, None, np.array([]), np.array([])
    new_points, status, _ = cv.calcOpticalFlowPyrLK(
        old_gray, frame_gray, points, None, **LK_PARAMS
    )
    good_new = new_points[status == 1]
    good_old = points[status == 1]
    return new_points, status, good_new, good_old

def calculate_velocity_and_predict(old_points, new_points, status):
    """Calculate velocity and predict the next position for each point."""
    velocities = []
    predicted_points = []
    for i, (new, old) in enumerate(zip(new_points, old_points)):
        if status[i] != 1:
            continue
        velocity = new - old
        velocities.append(velocity)
        predicted = new + velocity
        predicted_points.append(predicted)
    return np.array(velocities), np.array(predicted_points)

def draw_trajectories(frame, mask, old_points, new_points, predicted_points, status):
    """Draw actual and predicted trajectories and positions."""
    pred_idx = 0
    for i, (new, old) in enumerate(zip(new_points, old_points)):
        if status[i] != 1:
            continue
        a, b = new.ravel().astype(int)
        c, d = old.ravel().astype(int)
        mask = cv.line(mask, (a, b), (c, d), ACTUAL_LINE_COLOR, 2)
        frame = cv.circle(frame, (a, b), CIRCLE_RADIUS, CURRENT_CIRCLE_COLOR, CIRCLE_THICKNESS)
        pred = predicted_points[pred_idx]
        e, f = pred.ravel().astype(int)
        mask = cv.line(mask, (a, b), (e, f), PREDICTED_LINE_COLOR, 1)
        frame = cv.circle(frame, (e, f), CIRCLE_RADIUS, PREDICTED_CIRCLE_COLOR, CIRCLE_THICKNESS)
        pred_idx += 1
    return frame, mask

def redetect_features_if_needed(frame_gray, roi, points, mask, last_redetection_time):
    """Redetect features if the number of points is below the threshold."""
    current_time = time.time()
    if len(points) < REDETECTION_THRESHOLD and current_time - last_redetection_time > REDETECTION_COOLDOWN:
        points = detect_features(frame_gray, roi)
        mask = np.zeros_like(mask)
        last_redetection_time = current_time
    return points, mask, last_redetection_time

def main():
    """Main function to perform real-time feature tracking and motion prediction."""
    cap = initialize_webcam()
    ret, frame = cap.read()
    if not ret:
        raise RuntimeError("Error: Could not read frame from webcam.")
    frame, _, _ = resize_frame(frame, FRAME_SCALE_FACTOR)
    roi = select_and_scale_roi(frame, 1.0 / FRAME_SCALE_FACTOR)
    old_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
    points = detect_features(old_gray, roi)
    mask = np.zeros_like(frame)
    last_redetection_time = time.time()

    while True:
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame. Exiting...")
            break
        frame, _, _ = resize_frame(frame, FRAME_SCALE_FACTOR)
        frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        new_points, status, good_new, good_old = compute_optical_flow(old_gray, frame_gray, points)
        velocities, predicted_points = calculate_velocity_and_predict(good_old, good_new, status.ravel() if status is not None else [])
        frame, mask = draw_trajectories(frame, mask, good_old, good_new, predicted_points, status.ravel() if status is not None else [])
        frame = cv.add(frame, mask)
        cv.imshow(WINDOW_NAME, frame)
        old_gray = frame_gray.copy()
        points = good_new.reshape(-1, 1, 2)
        key = cv.waitKey(1) & 0xFF
        if key == ESC_KEY:
            break
        points, mask, last_redetection_time = redetect_features_if_needed(old_gray, roi, points, mask, last_redetection_time)

    cap.release()
    cv.destroyAllWindows()

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"An error occurred: {e}")
