In [1]:
import cv2
import numpy as np
import math
from scipy.optimize import linear_sum_assignment

def segment_and_label(frame_bgr, threshold=50, min_area=5):
    """
    Threshold + connected components segmentation.
    Returns a 2D 'labels' array for each pixel (0=background).
    """
    gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
    _, bin_img = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY)
    num_labels, labels = cv2.connectedComponents(bin_img)

    for label_id in range(1, num_labels):
        area = np.sum(labels == label_id)
        if area < min_area:
            labels[labels == label_id] = 0

    return labels

def compute_centroids(labels):
    """
    For each label>0, compute centroid (mean x, mean y).
    (x=column, y=row)
    """
    centroid_list = []
    max_label = labels.max()
    for label_id in range(1, max_label + 1):
        yx = np.where(labels == label_id)
        if len(yx[0]) == 0:
            continue
        mean_y = np.mean(yx[0])
        mean_x = np.mean(yx[1])
        centroid_list.append((mean_x, mean_y))
    return centroid_list

def hungarian_average_distance(centsA, centsB):
    """
    Hungarian algorithm to find minimal-sum pairing between
    two centroid sets centsA, centsB.
    Returns the average assigned distance.
    """
    N = len(centsA)
    M = len(centsB)
    if N == 0 or M == 0:
        return 0.0

    cost_matrix = np.zeros((N, M), dtype=np.float32)
    for i, (ax, ay) in enumerate(centsA):
        for j, (bx, by) in enumerate(centsB):
            dx = ax - bx
            dy = ay - by
            cost_matrix[i, j] = math.hypot(dx, dy)

    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    assigned_distances = cost_matrix[row_ind, col_ind]
    avg_dist = float(np.mean(assigned_distances)) if len(assigned_distances) > 0 else 0.0
    return avg_dist

def measure_migration_speed(video_path, threshold=50, min_area=5):
    """
    Reads all frames, does frame-by-frame Hungarian matching,
    and returns the average step displacement ("speed").
    """
    cap = cv2.VideoCapture(str(video_path))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if frame_count < 2:
        cap.release()
        return 0.0

    frames = []
    for _ in range(frame_count):
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()

    if len(frames) < 2:
        return 0.0

    centroids_per_frame = []
    for fr in frames:
        labs = segment_and_label(fr, threshold=threshold, min_area=min_area)
        cents = compute_centroids(labs)
        centroids_per_frame.append(cents)

    step_distances = []
    for t in range(len(frames) - 1):
        cA = centroids_per_frame[t]
        cB = centroids_per_frame[t + 1]
        dist_t = hungarian_average_distance(cA, cB)
        step_distances.append(dist_t)

    if len(step_distances) == 0:
        return 0.0

    avg_speed = float(np.mean(step_distances))  # "pixels/frame"
    return avg_speed

def demo_migration_speed(video_path):
    speed = measure_migration_speed(video_path, threshold=50, min_area=5)
    print(f"Multi-frame average speed (px/frame) for {video_path}: {speed:.2f}")

In [5]:
video_path = "../data/processed/idr0013/LT0001_02/00007_01.mp4"
demo_migration_speed(video_path)

Multi-frame average speed (px/frame) for ../data/processed/idr0013/LT0001_02/00007_01.mp4: 20.57
