# OpticFlow

Sparse-OpticFlow -> трекинг опорных точек

In [96]:
import numpy as np
import cv2
import time


lk_params = dict(winSize  = (15, 15),
                maxLevel = 2,
                criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

feature_params = dict(maxCorners = 20,
                    qualityLevel = 0.3,
                    minDistance = 10,
                    blockSize = 7 )


trajectory_len = 40
detect_interval = 5
trajectories = []
frame_idx = 0


cap = cv2.VideoCapture(1)


while True:

    # start time to calculate FPS
    start = time.time()

    suc, frame = cap.read()
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    img = frame.copy()

    # Calculate optical flow for a sparse feature set using the iterative Lucas-Kanade Method
    if len(trajectories) > 0:
        img0, img1 = prev_gray, frame_gray
        p0 = np.float32([trajectory[-1] for trajectory in trajectories]).reshape(-1, 1, 2)
        p1, _st, _err = cv2.calcOpticalFlowPyrLK(img0, img1, p0, None, **lk_params)
        p0r, _st, _err = cv2.calcOpticalFlowPyrLK(img1, img0, p1, None, **lk_params)
        d = abs(p0-p0r).reshape(-1, 2).max(-1)
        good = d < 1

        new_trajectories = []

        # Get all the trajectories
        for trajectory, (x, y), good_flag in zip(trajectories, p1.reshape(-1, 2), good):
            if not good_flag:
                continue
            trajectory.append((x, y))
            if len(trajectory) > trajectory_len:
                del trajectory[0]
            new_trajectories.append(trajectory)
            # Newest detected point
            cv2.circle(img, (int(x), int(y)), 2, (0, 0, 255), -1)

        trajectories = new_trajectories

        # Draw all the trajectories
        cv2.polylines(img, [np.int32(trajectory) for trajectory in trajectories], False, (0, 255, 0))
        cv2.putText(img, 'track count: %d' % len(trajectories), (20, 50), cv2.FONT_HERSHEY_PLAIN, 1, (0,255,0), 2)


    # Update interval - When to update and detect new features
    if frame_idx % detect_interval == 0:
        mask = np.zeros_like(frame_gray)
        mask[:] = 255

        # Lastest point in latest trajectory
        for x, y in [np.int32(trajectory[-1]) for trajectory in trajectories]:
            cv2.circle(mask, (x, y), 5, 0, -1)

        # Detect the good features to track
        p = cv2.goodFeaturesToTrack(frame_gray, mask = mask, **feature_params)
        if p is not None:
            # If good features can be tracked - add that to the trajectories
            for x, y in np.float32(p).reshape(-1, 2):
                trajectories.append([(x, y)])


    frame_idx += 1
    prev_gray = frame_gray

    # End time
    end = time.time()
    # calculate the FPS for current frame detection
    fps = 1 / (end-start)
    
    # Show Results
    cv2.putText(img, f"{fps:.2f} FPS", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow('Optical Flow', img)
    cv2.imshow('Mask', mask)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break


cap.release()
cv2.destroyAllWindows()

Dense-OpticFlow - классический вариант оценки по всему изображению

In [97]:
import numpy as np
import cv2
import time



def draw_flow(img, flow, step=16, arrow_scale=1, arrow_thickness=2):
    h, w = img.shape[:2]
    y, x = np.mgrid[step/2:h:step, step/2:w:step].reshape(2,-1).astype(int)
    fx, fy = flow[y,x].T

    # Scale the flow vectors for longer arrows
    fx *= arrow_scale
    fy *= arrow_scale

    lines = np.vstack([x, y, x+fx, y+fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)

    img_bgr = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    cv2.polylines(img_bgr, lines, 0, (0, 255, 0))

    for (x1, y1), (x2, y2) in lines:
        cv2.line(img_bgr, (x2, y2), (x1, y1), (0, 255, 0), 1, cv2.LINE_AA)
        cv2.circle(img_bgr, (x2, y2), arrow_thickness, (0, 255, 0), -1)

    return img_bgr



def draw_hsv(flow):

    h, w = flow.shape[:2]
    fx, fy = flow[:,:,0], flow[:,:,1]

    ang = np.arctan2(fy, fx) + np.pi
    v = np.sqrt(fx*fx+fy*fy)

    hsv = np.zeros((h, w, 3), np.uint8)
    hsv[...,0] = ang*(180/np.pi/2)
    hsv[...,1] = 255
    hsv[...,2] = np.minimum(v*4, 255)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

    return bgr




cap = cv2.VideoCapture(1)

suc, prev = cap.read()
prevgray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)


while True:

    suc, img = cap.read()
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # start time to calculate FPS
    start = time.time()


    flow = cv2.calcOpticalFlowFarneback(prevgray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    
    prevgray = gray


    # End time
    end = time.time()
    # calculate the FPS for current frame detection
    fps = 1 / (end-start)


    cv2.imshow('flow', draw_flow(gray, flow))
    cv2.imshow('flow HSV', draw_hsv(flow))


    key = cv2.waitKey(5)
    if key == ord('q'):
        break


cap.release()
cv2.destroyAllWindows()

# Обработка видео файла:

Вот как обычно определяются оси в оптическом потоке:

Ось x (горизонтальная ось) представляет движение вправо (положительное значение) или влево (отрицательное значение).

Ось y (вертикальная ось) представляет движение вниз (положительное значение) или вверх (отрицательное значение).

Данный код обрабатывает видео, анализируя движение пикселей между последовательными кадрами с использованием оптического потока. Оптический поток вычисляет направление и скорость движения пикселей, предоставляя информацию о том, как объекты двигаются в видео. Код вычисляет и отображает среднюю скорость движения как по горизонтали (влево-вправо), так и по вертикали (вверх-вниз), что помогает понять основное направление движения в сцене. Кроме того, он рассчитывает среднее направление движения, представляющее общий угол движения в видео. Этот угол представлен как в радианах, так и в градусах, что упрощает интерпретацию направления движения. Код наносит эту информацию в виде текста на кадры видео, предоставляя сведения о средних характеристиках потока и направлении движения в анализируемом видео.

Пояснение этих паретров : flow = cv2.calcOpticalFlowFarneback(prevgray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

pyr_scale (0.5): This parameter is used to specify the scale of the image pyramid. Optical flow is often computed at multiple scales for better accuracy. A smaller value results in a finer image pyramid and potentially more accurate flow computation.

levels (3): The number of levels in the image pyramid. More levels allow for the computation of flow at different resolutions. Increasing the number of levels may improve accuracy but requires more computation.

winsize (15): The size of the averaging window used for flow computation. A larger window size will smooth the flow and make it more robust to noise but may lose fine details.

iterations (3): The number of iterations performed at each pyramid level. More iterations can lead to more accurate flow estimation but also increase computation time.

poly_n (5): The size of the pixel neighborhood used to find polynomial expansion over each pixel. It affects the polynomial expansion of the pixel values and can control the smoothness of the flow field.

poly_sigma (1.2): Standard deviation of the Gaussian that is used to smooth the derivatives used in the polynomial expansion. It controls the degree of smoothing applied to the derivatives.

flags (0): This parameter is used to specify various flags that control the operation of the function. The value 0 means there are no additional flags applied.

In [10]:
import numpy as np
import cv2
import time
import math  # Import math for degree conversion


def get_output_fourcc(output_file):
    # Получить расширение файла из output_file
    file_extension = output_file.split('.')[-1].lower()

    if file_extension == 'avi':
        return cv2.VideoWriter_fourcc(*'XVID')  # Для AVI используем XVID кодек
    elif file_extension == 'mp4':
        return cv2.VideoWriter_fourcc(*'mp4v')  # Для MP4 используем H264 или mp4v кодек
    else:
        raise ValueError("Unsupported video file format. Use .avi or .mp4.")


def calculate_average_flow(flow, window, N):
    fx, fy = flow[:, :, 0], flow[:, :, 1]
    window.append((fx, fy))
    
    # Keep the window size limited to N frames
    if len(window) > N:
        window.pop(0)
    
    # Calculate the average velocity over the window
    average_velocity_x = np.mean([fx for fx, _ in window])
    average_velocity_y = np.mean([fy for _, fy in window])
    
    return average_velocity_x, average_velocity_y


def calculate_average_direction(flow, window, N):
    fx, fy = flow[:, :, 0], flow[:, :, 1]
    angles = np.arctan2(fy, fx)
    window.append(angles)
    
    # Keep the window size limited to N frames
    if len(window) > N:
        window.pop(0)
    
    # Calculate the average angle in radians over the window
    average_angle_radians = np.mean(window)
    
    # Convert radians to degrees
    average_angle_degrees = math.degrees(average_angle_radians)
    
    return average_angle_radians, average_angle_degrees


def draw_flow_with_wider_arrows(img, flow, step=16, arrow_scale=2, arrow_thickness=2):
    h, w = img.shape[:2]
    y, x = np.mgrid[step/2:h:step, step/2:w:step].reshape(2,-1).astype(int)
    fx, fy = flow[y,x].T

    # Scale the flow vectors for longer arrows
    fx *= arrow_scale
    fy *= arrow_scale

    for x1, y1, dx, dy in zip(x, y, fx, fy):
        x1, y1, dx, dy = int(x1), int(y1), int(dx), int(dy)
        cv2.arrowedLine(img, (x1, y1), (x1 + dx, y1 + dy), (0, 255, 0), arrow_thickness, cv2.LINE_AA)

    return img


def process_video_rgb(input_file, output_file, show=True, save=False, N=10, resize_scale=1):
    cap = cv2.VideoCapture(input_file)
    suc, prev_orig = cap.read()
    prev = cv2.resize(prev_orig, None, fx=resize_scale, fy=resize_scale)
    prevgray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
    
    original_fps = int(cap.get(cv2.CAP_PROP_FPS))  # Get original FPS

    fourcc = get_output_fourcc(output_file)
    out = None

    if save:
        out = cv2.VideoWriter(output_file, fourcc, original_fps/2, (prev_orig.shape[1], prev_orig.shape[0]), 10000)

    # Initialize rolling windows to store optical flow and angles data
    flow_window = []
    angle_window = []

    while True:
        suc, img = cap.read()
        if not suc:
            break

        # Resize the frame
        img = cv2.resize(img, None, fx=resize_scale, fy=resize_scale)

        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        flow = cv2.calcOpticalFlowFarneback(prevgray, gray, None, 0.5, 5, 15, 5, 5, 1.2, 0)
        prevgray = gray

        flow_img = draw_flow_with_wider_arrows(img, flow, step=30, arrow_scale=2, arrow_thickness=2)
        average_velocity_x, average_velocity_y = calculate_average_flow(flow, flow_window, N)
        average_angle_radians, average_angle_degrees = calculate_average_direction(flow, angle_window, N)

        # Display average velocity and direction as text on the image
        cv2.putText(flow_img, f'Avg Velocity (X, Y): ({average_velocity_x:.2f}, {average_velocity_y:.2f})',
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        cv2.putText(flow_img, f'Avg Angle: {average_angle_radians:.2f} radians ({average_angle_degrees:.2f} degrees)',
                    (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)

        if show:
            cv2.imshow('Flow', flow_img)

        if save:
            flow_img = cv2.resize(flow_img, None, fx=1/resize_scale, fy=1/resize_scale) # back to normal
            out.write(flow_img)

        key = cv2.waitKey(1)
        if key == ord('q'):
            break

    cap.release()
    if save:
        out.release()
    cv2.destroyAllWindows()



Вариант со стандартным пониманием направления осей: (из нижнего левого угла)

In [11]:
def calculate_average_flow(flow, window, N):
    fx, fy = flow[:, :, 0], flow[:, :, 1]
    window.append((fx, fy))  # Инвертируем направление движения

    if len(window) > N:
        window.pop(0)

    # Вычисляем среднюю скорость в окне
    average_velocity_x = np.mean([fx for fx, _ in window])
    average_velocity_y = np.mean([-fy for _, fy in window])

    return average_velocity_x, average_velocity_y

def calculate_average_direction(flow, window, N):
    fx, fy = flow[:, :, 0], flow[:, :, 1]
    angles = np.arctan2(-fy, fx)  # Инвертируем направление движения

    window.append(angles)

    if len(window) > N:
        window.pop(0)

    # Вычисляем средний угол в окне
    average_angle_radians = np.mean(window)

    # Переводим радианы в градусы
    average_angle_degrees = math.degrees(average_angle_radians)

    return average_angle_radians, average_angle_degrees

if __name__ == "__main__":
    input_file = "race.mp4"
    output_file = "final.mp4"
    show_results = True  # Set to True to display results
    save_results = False  # Set to True to save results
    N = 15  # Number of frames to average over - окно усреднения вычисляемых величин
    resize_scale = 0.75

    process_video_rgb(input_file, output_file, show=show_results, save=save_results, N=N, resize_scale=resize_scale)


---

In [12]:
if __name__ == "__main__":
    input_file = "football.mp4"
    output_file = "final.mp4"
    show_results = True  # Set to True to display results
    save_results = False  # Set to True to save results
    N = 15  # Number of frames to average over - окно усреднения вычисляемых величин
    resize_scale = 0.75

    process_video_rgb(input_file, output_file, show=show_results, save=save_results, N=N, resize_scale=resize_scale)