In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from IPython import display
from climbing_wire.homography.homography import compute_homography
from climbing_wire.homography.homography import perspective_transform
from climbing_wire.landmark.compute import PoseImg
from climbing_wire.landmark.compute import compute_landmarks
from climbing_wire.landmark.drawing import draw_landmarks
from climbing_wire.landmark.landmark_list import LandmarkListImg
from climbing_wire.utils.cv import cv_imshow
from climbing_wire.utils.data import get_package_fol
from climbing_wire.utils.mediapipe import JOINT_NAMES
from climbing_wire.utils.mediapipe import get_default_pose_connections
from climbing_wire.video.load import iterate_video_frames
from climbing_wire.video.load import iterate_video_frames_with_timestamp
from climbing_wire.video.load import pairwise_video_frames
from loguru import logger as lg
from matplotlib import pyplot as plt
from pathlib import Path
from typing import List, Tuple, cast
import cv2 as cv
import math
import matplotlib.pyplot as plt
import mediapipe as mp
import mediapipe.python.solutions.drawing_styles as mp_drawing_styles
import mediapipe.python.solutions.drawing_utils as mp_drawing
import mediapipe.python.solutions.pose as mp_pose
import numpy as np
import pathlib
import sys


In [None]:
from dataclasses import dataclass


@dataclass
class Frame:
    frame: np.ndarray
    usec: int
    idx: int


In [None]:
in_fol = Path("/mnt/c/Users/nobilip/Videos/dataset/rock")
person_fn = "PXL_20230401_135540281.TS.mp4"
empty_fn = "empty.mp4"

# in_fol = Path("~/data/rock")
# in_fn = "nonaka_miho_01.mp4"

person_vid_path = in_fol / person_fn
empty_vid_path = in_fol / empty_fn


In [None]:
per_frames: list[Frame] = []

max_num_frames = 30
frame_num = 0

for frame, usec in iterate_video_frames_with_timestamp(
    person_vid_path,
    keep_every_nth_frame=10,
):
    # print(usec)
    f = Frame(frame=frame, usec=usec, idx=frame_num)
    per_frames.append(f)

    frame_num += 1
    if frame_num >= max_num_frames:
        break
print(f"{len(per_frames)} frames")


In [None]:
empty_frames: list[Frame] = []
frame_num = 0

for frame, usec in iterate_video_frames_with_timestamp(
    empty_vid_path,
    keep_every_nth_frame=3,
):
    # print(usec)
    f = Frame(frame=frame, usec=usec, idx=frame_num)
    empty_frames.append(f)

    frame_num += 1
    if frame_num >= max_num_frames:
        break

print(len(empty_frames))


In [None]:
def show_frame(
    frame: Frame,
    ax: plt.Axes | None = None,
):
    if ax is None:
        fig, ax = plt.subplots(1, 1)

    ax.set_title(f"{frame.idx} @ {frame.usec}")

    ax_params = dict(
        which="both",
        # bottom=False,
        # top=False,
        # left=False,
        # right=False,
        labelbottom=False,
        labeltop=False,
        labelleft=False,
        labelright=False,
    )
    ax.tick_params(**ax_params)

    cv_imshow(frame.frame, ax=ax)


fig, ax = plt.subplots(1, 2)
show_frame(per_frames[7], ax=ax[0])
show_frame(empty_frames[11], ax=ax[1])


In [None]:
def show_warp(f1: Frame, f2: Frame, M: np.ndarray) -> None:
    """Draw the first image warped on the second."""
    f1img = f1.frame
    f2img = f2.frame

    corners = np.array(
        [
            [0, 0],
            [f1img.shape[1], 0],
            [f1img.shape[1], f1img.shape[0]],
            [0, f1img.shape[0]],
        ],
        dtype=np.float32,
    )
    corners_warp = perspective_transform(corners, M)
    f2_ann = cv.polylines(
        f2img.copy(), [np.int32(corners_warp)], True, 255, 3, cv.LINE_AA
    )
    # blend the person frame on the empty frame
    f1_warped = cv.warpPerspective(f1img, M, f1img.shape[:2][::-1])
    f_blend = cv.addWeighted(f1_warped, 0.5, f2_ann, 0.5, 0)

    fig, ax = plt.subplots(1, 1)
    ax_params = dict(
        which="both",
        # bottom=False,
        # top=False,
        # left=False,
        # right=False,
        labelbottom=False,
        labeltop=False,
        labelleft=False,
        labelright=False,
    )
    ax.tick_params(**ax_params)
    ax.set_title(f"{f1.idx} -> {f2.idx}")
    cv_imshow(f_blend, ax=ax)

    plt.show()


In [None]:
# f1 = per_frames[0]
# f2 = empty_frames[0]

f1 = per_frames[7]
f2 = empty_frames[11]


def homography_distance(f1: Frame, f2: Frame, do_plot: bool = False) -> float:
    lg.debug(f"{f1.usec=} {f2.usec=}")
    f1img = f1.frame
    f2img = f2.frame

    # shrink images to speed up computation
    f1img = cv.resize(f1img, (0, 0), fx=0.5, fy=0.5)
    f2img = cv.resize(f2img, (0, 0), fx=0.5, fy=0.5)

    M = compute_homography(f1img, f2img)
    print('compute homography could fail, assign standard high distance') 
    corners = np.array(
        [
            [0, 0],
            [f1img.shape[1], 0],
            [f1img.shape[1], f1img.shape[0]],
            [0, f1img.shape[0]],
        ],
        dtype=np.float32,
    )
    corners_warp = perspective_transform(corners, M)
    corners_delta = corners_warp - corners
    dist = np.linalg.norm(corners_delta, axis=1).mean()

    # draw the first image warped on the second
    # rather than doing this, turn homography_distance into a class
    # maybe a single class with class methods?
    if do_plot:
        show_warp(
            Frame(f1img, f1.usec, f1.idx),
            Frame(f2img, f2.usec, f2.idx),
            M,
        )

    return dist


homography_distance(f1, f2)


In [None]:
# # draw the warped corners on the image
# f2_ann = cv.polylines(
#     f2.frame.copy(), [np.int32(corners_warp)], True, 255, 3, cv.LINE_AA
# )
# # blend the person frame on the empty frame
# f1_warped = cv.warpPerspective(f1.frame, M, f1.frame.shape[:2][::-1])
# f_blend = cv.addWeighted(f1_warped, 0.5, f2_ann, 0.5, 0)
# cv_imshow(f_blend)


In [None]:
len(per_frames), len(empty_frames)

In [None]:
from fastdtw.fastdtw import fastdtw

# from fastdtw.fastdtw import __dtw
from fastdtw.fastdtw import __fastdtw

# distance, path = fastdtw(
distance, path = __fastdtw(
    per_frames[:5],
    empty_frames[:5],
    10,
    dist=homography_distance,
)
print(distance, path)


In [None]:
# plt.plot([x[0] for x in path], [x[1] for x in path])
plt.scatter([x[0] for x in path], [x[1] for x in path])

In [None]:
path

In [None]:
import pandas as pd

pathdf = pd.DataFrame(
    path,
    columns=["per_idx", "empty_idx"],
)
pathdf["per_usec"] = pathdf["per_idx"].apply(lambda idx: per_frames[idx].usec)
pathdf["empty_usec"] = pathdf["empty_idx"].apply(lambda idx: empty_frames[idx].usec)
pathdf.head(20)


In [None]:
# per_usec_new = np.arange(100_000, 3_000_001, 100_000)
# # use both new and existing per_usec
# per_usec_all = np.concatenate([per_usec_new, pathdf["per_usec"].values])
# per_usec_all.sort()
# per_usec_all


In [None]:
# path_all = (
#     pathdf.set_index("per_usec")
#     .reindex(per_usec_all)
#     .reset_index()
#     # [["per_usec", "empty_usec"]]
# )
# path_all.head()


In [None]:
# # use pandas to interpolate the empty_usec values
# path_all["empty_usec"] = path_all["empty_usec"].interpolate(method="linear")
# path_all.loc[10:20]


In [None]:
def interpolate_empty_usec(
    pathdf: pd.DataFrame,
    new_per_usec: int,
) -> int:
    """"""
    per_usec_before = pathdf[pathdf["per_usec"] <= new_per_usec]["per_usec"].max()
    per_usec_after = pathdf[pathdf["per_usec"] > new_per_usec]["per_usec"].min()
    print(per_usec_before, per_usec_after)

    em_usec_before, em_usec_after = pathdf[
        pathdf["per_usec"].isin([per_usec_before, per_usec_after])
    ]["empty_usec"].values
    print(em_usec_before, em_usec_after)

    # interpolate the empty_usec value
    em_usec_new = em_usec_before + (new_per_usec - per_usec_before) * (
        em_usec_after - em_usec_before
    ) / (per_usec_after - per_usec_before)
    print(em_usec_new)
    return int(em_usec_new)


new_per_usec = 1_000_000
interpolate_empty_usec(pathdf, new_per_usec)


In [None]:
# # show the path frame warps
# for per_idx, empty_idx in path:
#     f1 = per_frames[per_idx]
#     f2 = empty_frames[empty_idx]
#     homography_distance(f1, f2, do_plot=True)
