In [None]:
from dataclasses import dataclass
from typing import Generator, Iterable, List, Optional, Tuple, TypedDict, cast

import cv2
import cv2 as cv
import numpy as np
from cv2 import BackgroundSubtractor, BackgroundSubtractorKNN, BackgroundSubtractorMOG2
from cv2.typing import MatLike, Size
from loguru import logger
from tqdm.notebook import tqdm

In [None]:
@dataclass
class CapProps:
    width: int
    height: int
    fps: float
    frame_count: Optional[int] = None


def video_cap(
        src: str | int) -> Tuple[Generator[MatLike, None, None], CapProps]:
    cap = cv2.VideoCapture(src)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = float(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    props = CapProps(width, height, fps, frame_count)

    def gen():
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            yield frame
        cap.release()

    return gen(), props


# MOT Challenge
# PETS09-S2L1
def fourcc(*args: str) -> int:
    return cv2.VideoWriter_fourcc(*args)  # type: ignore

In [None]:
frames, props = video_cap("PETS09-S2L1-raw.mp4")
writer = cv2.VideoWriter("PETS09-S2L1-bgsub.mp4", fourcc(*"mp4v"), props.fps, (props.width, props.height), isColor=False)
subtractor = cv2.createBackgroundSubtractorMOG2(detectShadows=False)
for frame in tqdm(frames, total=props.frame_count):
    fgmask = subtractor.apply(frame)
    # convert fgmask to RGB for video writer
    writer.write(fgmask)
writer.release()

In [None]:
frames, props = video_cap("PETS09-S2L1-raw.mp4")
writer = cv2.VideoWriter("PETS09-S2L1-flow_pyr_lk.mp4", fourcc(*"mp4v"),
                         props.fps, (props.width, props.height))
# use optical flow
prev_img_grey: Optional[MatLike] = None
prev_pts: Optional[np.ndarray] = None

try:
    for frame in tqdm(frames, total=props.frame_count):

        def go(prev_img_grey: Optional[MatLike], prev_pts: Optional[np.ndarray],
               frame: MatLike) -> Tuple[Optional[MatLike],
                                        Optional[np.ndarray]]:
            # https://docs.opencv.org/4.x/d4/d8c/tutorial_py_shi_tomasi.html
            # https://github.com/chuanenlin/optical-flow/blob/master/sparse-solution.py
            # https://docs.opencv.org/4.x/db/d7f/tutorial_js_lucas_kanade.html
            next_img = frame
            pts = cv2.goodFeaturesToTrack(cv2.cvtColor(next_img,
                                                       cv2.COLOR_BGR2GRAY),
                                          maxCorners=100,
                                          qualityLevel=0.3,
                                          minDistance=7,
                                          blockSize=7)
            grey = cv2.cvtColor(next_img, cv2.COLOR_BGR2GRAY)
            if prev_img_grey is None:
                return grey, pts
            # [a 1 2] (a is the features count)
            # use ravel to flatten the array
            _prev_pts = pts if prev_pts is None else prev_pts
            dummy_pts = cast(np.ndarray, None)
            pl, st, err = cv2.calcOpticalFlowPyrLK(prev_img_grey, grey,
                                                   _prev_pts, dummy_pts)
            # status
            # output status vector (of unsigned chars); each element of the
            # vector is set to 1 if the flow for the corresponding features has
            # been found, otherwise, it is set to 0.
            # error vector
            # output vector of errors; each element of the vector is set to an
            # error for the corresponding feature, type of the error measure can
            # be set in flags parameter; if the flow wasn't found then the error
            # is not defined (use the status parameter to find such cases).
            return grey, pl

        cur_pts = prev_pts
        grey_img, pts = go(prev_img_grey, prev_pts, frame)
        assert grey_img is not None
        assert pts is not None
        if cur_pts is not None and pts is not None:
            pts_r = pts.reshape(-1, 2)
            cur_pts_r = cur_pts.reshape(-1, 2)
            # https://numpy.org/doc/stable/reference/generated/numpy.ravel.html
            for (x0, y0), (x1, y1) in zip(cur_pts_r, pts_r):
                x0 = int(x0)
                y0 = int(y0)
                x1 = int(x1)
                y1 = int(y1)
                cv2.line(frame, (x0, y0), (x1, y1), (0, 255, 0), 2)
        else:
            logger.warning("no points")
        prev_img_grey = grey_img
        prev_pts = pts
        writer.write(frame)
except KeyboardInterrupt as e:
    writer.release()
    raise e
finally:
    writer.release()

In [None]:


def resized_video_cap(
    src: str | int,
    scale: float = 0.5,
) -> Tuple[Generator[MatLike, None, None], CapProps]:
    cap = cv2.VideoCapture(src)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) * scale)
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) * scale)
    fps = float(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    props = CapProps(width, height, fps, frame_count)

    def gen():
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, (width, height))
            yield frame
        cap.release()

    return gen(), props


frames, props = resized_video_cap("PETS09-S2L1-raw.mp4", 0.5)
size = (props.width, props.height)
logger.info(f"size: {size}")
writer = cv2.VideoWriter("PETS09-S2L1-dense_flow.mp4", fourcc(*"mp4v"), props.fps,
                         size)
# use optical flow
prev_img_grey: Optional[MatLike] = None

try:
    for frame in tqdm(frames, total=props.frame_count):

        def go(prev_img: Optional[MatLike], frame: MatLike) -> Tuple[
                Optional[MatLike], Optional[np.ndarray]]:
            next_img = frame
            grey = cv2.cvtColor(next_img, cv2.COLOR_BGR2GRAY)
            # make type checker happy
            dummy_flow = cast(np.ndarray, None)
            if prev_img is None:
                return grey, None
            flow = cv2.calcOpticalFlowFarneback(
                prev_img,
                grey,
                dummy_flow,
                pyr_scale=0.5,
                levels=3,
                winsize=15,
                iterations=1,
                poly_n=5,
                poly_sigma=1.2,
                flags=0,
            )
            return grey, flow

        grey_img, flow = go(prev_img_grey, frame)
        assert grey_img is not None
        if flow is not None:
            # Compute the magnitude and angle of the 2D vectors
            magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
            # Set image hue according to the optical flow direction
            mask = np.zeros_like(frame)
            mask[..., 1] = 255
            mask[..., 0] = angle * 180 / np.pi / 2
            # Set image value according to the optical flow magnitude (normalized)
            mask[..., 2] = cv2.normalize(magnitude, None, 0, 255,
                                         cv2.NORM_MINMAX)
            # Convert HSV to RGB (BGR) color representation
            rgb = cv2.cvtColor(mask, cv2.COLOR_HSV2BGR)
            # Resize the frame to match the dimensions of the flow visualization
            resized_frame = cv2.resize(frame, (flow.shape[1], flow.shape[0]))
            # Combine the flow visualization with the original frame
            output = cv2.addWeighted(resized_frame, 0.7, rgb, 1, 0)
            # Resize the output back to the original frame size
            output = cv2.resize(output, (frame.shape[1], frame.shape[0]))
            writer.write(output)
        else:
            logger.warning("no flow")
        prev_img_grey = grey_img
except KeyboardInterrupt:
    writer.release()
    raise e
finally:
    writer.release()