In [1]:
%matplotlib inline
import torch
from tqdm import tqdm
import cv2
import numpy as np
from IPython.display import Video, Image, clear_output
import matplotlib.pyplot as plt

DEVICE = "cuda"

In [2]:
source_path = "./sources/test4.mov"
proc_width, proc_height = 640, 384

In [3]:
def read_video(path, width, height):
    cap = cv2.VideoCapture(path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frames = []

    while cap.isOpened():
        succ, frame = cap.read()
        if not succ:
            break

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = cv2.resize(frame, (width, height))
        frames.append(frame)

    cap.release()
    return np.stack(frames, axis=0), fps


def write_video(path, frames, fps, codec="vp80"):
    if type(frames) != np.array:
        frames = np.stack(frames, axis=0)

    frames = frames.astype(np.uint8)

    _, height, width, _ = frames.shape
    out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(
        *codec), fps, (width, height))
    for frame in frames:
        out.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    out.release()


def display_video(frames, fps):
    path = "./outputs/display.webm"
    write_video(path, frames, fps)
    # clear_output(wait=True)
    display(Video(path))

### Load YOLO


In [4]:
from ultralytics.utils.plotting import Annotator, colors
from ultralytics import YOLO
import os

os.environ["YOLO_VERBOSE"] = "False"


yolo_model = YOLO("yolo/yolo11n-seg.pt")

### Load VDA


In [5]:
from vda.video_depth_anything.video_depth import VideoDepthAnything
from vda.utils.dc_utils import save_video

config = {"encoder": "vits", "features": 64,
          "out_channels": [48, 96, 192, 384]}
vda_model = VideoDepthAnything(**config)
vda_model.load_state_dict(torch.load(
    f"./vda/checkpoints/video_depth_anything_vits.pth", map_location="cpu"), strict=True)
vda_model = vda_model.to(DEVICE).eval()

vda_cache = {}

In [6]:
class ExpFilter:
    def __init__(self, gain: float):
        self.state = None
        self.gain = gain

    def push(self, x):
        if self.state is None:
            self.state = x
        else:
            self.state = self.gain * x + (1 - self.gain) * self.state

    def get(self):
        return self.state


def fmt_number(number, digits=3):
    return str(int(number * (10**digits)) / (10**digits))

In [7]:
import math


def normalize(x):
    if type(x) != np.array:
        x = np.array(x)

    return x / np.linalg.norm(x)


def get_pixel_dir_1d(x, rx, fov):
    px = math.sin(fov/2) * (2 * x / rx - 1)
    py = math.cos(fov/2)
    return normalize([px, py])


def get_pixel_dir_2d(x, rx, hfov, y, ry, vfov):
    hx, hy = get_pixel_dir_1d(x, rx, hfov)
    vx, vy = get_pixel_dir_1d(y, ry, vfov)

    return normalize([hx, vy, -vx])


hfov = 73 / 180 * math.pi
vfov = 2 * math.atan(math.tan(hfov/2)*(proc_height/proc_width))
# print(hfov, vfov)

In [8]:
from dataclasses import dataclass
from typing import Iterator
from itertools import tee

VideoStream = Iterator[np.array]


def create_video_stream(path: str, width: int, height: int) -> tuple[VideoStream, int]:
    cap = cv2.VideoCapture(path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)

    def stream():
        while cap.isOpened():
            succ, frame = cap.read()
            if not succ:
                break

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, (width, height))
            yield frame

        cap.release()

    return stream(), frames, fps


@dataclass
class SegObject:
    tid: int
    x: int
    y: int
    cx: int
    cy: int
    width: int
    height: int
    cls: int
    mask: np.array


SegStream = Iterator[dict[int, SegObject]]


def create_yolo_stream(video_stream: VideoStream) -> SegStream:
    for frame in video_stream:
        res = yolo_model.track(frame, persist=True)

        tids = res[0].boxes.id.int().cpu().tolist()
        boxes = res[0].boxes.data.cpu().tolist()
        classes = res[0].boxes.cls.cpu().tolist()
        masks = np.array(res[0].masks.data.cpu(), dtype=bool)

        objects = {}
        for tid, box, cls, mask in zip(tids, boxes, classes, masks):
            x0, y0, x1, y1, *_ = box
            objects[tid] = SegObject(
                tid=int(tid),
                x=int(x0),
                y=int(y0),
                cx=int((x0 + x1)/2),
                cy=int((y0 + y1)/2),
                width=int(x1 - x0),
                height=int(y1 - y0),
                cls=int(cls),
                mask=mask,
            )

        yield objects


DepthStream = Iterator[np.array]


def create_vda_stream(video_stream: VideoStream, fps: int, *, cache=None) -> DepthStream:
    if cache is None or cache not in vda_cache:
        frames = np.array(list(video_stream))
        depths, _ = vda_model.infer_video_depth(
            frames, fps, input_size=518, device=DEVICE)
        if cache is not None:
            vda_cache[cache] = depths
    else:
        depths = vda_cache[cache]

    for depth in depths:
        depth = depth.max() - depth
        depth = depth / depth.max()
        yield depth


AnchorStream = Iterator[int | None]


def create_anchor_stream(seg_stream: SegStream, *, anchor_class: int, anchor_pos: tuple[int, int]) -> AnchorStream:
    def find_anchor(objs: dict[int, SegObject]):
        for tid, obj in objs.items():
            if obj.cls == anchor_class and obj.mask[anchor_pos[1]][anchor_pos[0]]:
                return tid

    anchor_tid = None

    for objs in seg_stream:
        if anchor_tid is None:
            anchor_tid = find_anchor(objs)

        yield anchor_tid


def create_metric_depth_stream(depth_stream: DepthStream, seg_stream: SegStream,
                               *, anchor_class: int, anchor_pos: tuple[int, int], anchor_dist: float) -> DepthStream:
    seg_stream, seg_stream2 = tee(seg_stream)

    anchor_stream = create_anchor_stream(
        seg_stream2, anchor_class=anchor_class, anchor_pos=anchor_pos)

    coeff = None

    for depth, objs, anchor in zip(depth_stream, seg_stream, anchor_stream):
        if anchor is None:
            yield None

        # update coeff
        if anchor in objs:
            avg_dist = np.mean(depth[objs[anchor].mask])
            coeff = anchor_dist / avg_dist

        yield coeff * depth


def create_speed_stream(metric_depth_stream: DepthStream, seg_stream: SegStream, *, fps: int):
    objs = {}

    for metric_depth, seg_objs in zip(metric_depth_stream, seg_stream):
        if metric_depth is None:
            yield None

        height, width = metric_depth.shape

        speeds = {}

        for tid, seg_obj in seg_objs.items():
            dir = get_pixel_dir_2d(
                seg_obj.cx, width, hfov, seg_obj.cy, height, vfov)
            avg_dist = np.mean(metric_depth[seg_obj.mask])
            pos = avg_dist * dir

            if tid not in objs:
                objs[tid] = ExpFilter(0.2)

            prev_smooth_pos = objs[tid].get()
            objs[tid].push(pos)
            new_smooth_pos = objs[tid].get()

            if prev_smooth_pos is not None:
                speeds[tid] = np.linalg.norm(
                    new_smooth_pos - prev_smooth_pos) * fps

        yield speeds


anchor_class = 0
anchor_pos = (320, 190)
anchor_dist = 5

video_stream, frames, fps = create_video_stream(
    source_path, proc_width, proc_height)
video_streams = tee(video_stream, 3)
seg_streams = tee(create_yolo_stream(video_streams[0]), 3)
depth_stream = create_vda_stream(video_streams[1], fps, cache=source_path)
metric_depth_stream = create_metric_depth_stream(
    depth_stream, seg_streams[0], anchor_class=anchor_class, anchor_pos=anchor_pos, anchor_dist=anchor_dist)
speed_stream = create_speed_stream(
    metric_depth_stream, seg_streams[1], fps=fps)

vis_frames = []
for frame, seg_objs, speeds in tqdm(zip(video_streams[2], seg_streams[2], speed_stream), total=frames):
    overlays = [np.zeros(frame.shape) for _ in range(2)]

    for tid, obj in seg_objs.items():
        if obj.cls == 0:
            overlays[0][obj.mask == 1] = np.array(colors(tid, True))
            if tid in speeds:
                cv2.putText(overlays[1], fmt_number(
                    speeds[tid]), (obj.cx, obj.cy), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)

    ws = np.array([2, 1, 1])
    ws = ws / sum(ws)

    vis_frame = ws[0] * frame + ws[1] * overlays[0] + ws[2] * overlays[1]
    vis_frames.append(vis_frame)

display_video(vis_frames, fps)

  0%|                                                                                                                                                                                                             | 0/283.0 [00:00<?, ?it/s]
[A%|                                                                                                                                                                                                                | 0/13 [00:00<?, ?it/s]
[A%|███████████████▍                                                                                                                                                                                        | 1/13 [00:01<00:12,  1.07s/it]
[A%|██████████████████████████████▊                                                                                                                                                                         | 2/13 [00:02<00:11,  1.02s/it]
[A%|██████████████████████████████████████████████▏

In [35]:
from geocalib import GeoCalib
from geocalib.utils import deg2rad, print_calibration
model = GeoCalib(weights="pinhole").to(DEVICE)

video_stream, _, _ = create_video_stream(
    "./sources/test3.mp4", proc_width, proc_height)
frames = []
for i, frame in enumerate(video_stream):
    if i > 10:
        break
    frames.append(np.swapaxes(frame, 0, 2))
    # frames.append()

frames = (torch.tensor(frames).float() / 255).to(DEVICE)

print_calibration(model.calibrate(frames[0]))
# print(vfov / math.pi * 180)


Estimated parameters (Pred):
Roll:  -1.2° (± 1.7)°
Pitch: 1.1° (± 3.3)°
vFoV:  67.2° (± 13.0)°
Focal: 481.5 px (± 85.4 px)
