In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# from pathlib import Path
# from typing import cast

# import cv2 as cv
# import matplotlib.pyplot as plt
# import mediapipe as mp
# from mediapipe import solutions
# from mediapipe.framework.formats import landmark_pb2
# import mediapipe.python.solutions.drawing_styles as mp_drawing_styles
# import mediapipe.python.solutions.drawing_utils as mp_drawing_utils
# import mediapipe.python.solutions.hands as mp_hands
# from mediapipe.tasks import python
# from mediapipe.tasks.python import vision
# from mediapipe.tasks.python.components.containers.category import Category
# from mediapipe.tasks.python.components.containers.landmark import (
#     Landmark,
#     NormalizedLandmark,
# )
# from mediapipe.tasks.python.vision.core.vision_task_running_mode import (
#     VisionTaskRunningMode as VisionRunningMode,
# )
# import numpy as np

# from holo_table.landmark.compute import HandLandmarkerFrame
# from holo_table.landmark.compute import get_landmarks_from_result
# from holo_table.utils.cv import cv_imshow
# from holo_table.utils.data import get_resource
# from holo_table.utils.mediapipe import (
#     HAND_LANDMARK_MAP,
#     HAND_LANDMARK_NAMES,
#     get_default_hand_connections,
# )
# from holo_table.utils.plt import show_frame
# from holo_table.video.frame import Frame
# from holo_table.video.load import list_video_frames, iterate_video_frames

from pathlib import Path
from typing import cast, Any

import cv2 as cv
import matplotlib.pyplot as plt
import mediapipe as mp
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import mediapipe.python.solutions.drawing_styles as mp_drawing_styles
import mediapipe.python.solutions.drawing_utils as mp_drawing_utils
import mediapipe.python.solutions.hands as mp_hands
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.tasks.python.components.containers.category import Category
from mediapipe.tasks.python.components.containers.landmark import (
    Landmark,
    NormalizedLandmark,
)
from mediapipe.tasks.python.core.base_options import BaseOptions
from mediapipe.tasks.python.vision.core.vision_task_running_mode import (
    VisionTaskRunningMode as VisionRunningMode,
)
from mediapipe.tasks.python.vision.hand_landmarker import (
    HandLandmarker,
    HandLandmarkerOptions,
    HandLandmarkerResult,
)
import numpy as np

from holo_table.landmark.compute import HandLandmarkerFrame
from holo_table.utils.cv import cv_imshow
from holo_table.utils.data import get_resource
from holo_table.utils.mediapipe import (
    HAND_LANDMARK_MAP,
    HAND_LANDMARK_NAMES,
    get_default_hand_connections,
    get_landmarks_from_result,
)
from holo_table.utils.plt import show_frame
from holo_table.video.frame import Frame
from holo_table.video.load import iterate_video_frames, list_video_frames


In [None]:
# load the landmark recognition model
hand_landmark_model_path = get_resource("hand_landmarker.task")
hlf = HandLandmarkerFrame(
    hand_landmark_model_path=hand_landmark_model_path,
    hand_landmarker_kwargs={
        # "running_mode": VisionRunningMode.IMAGE,
        "running_mode": VisionRunningMode.VIDEO,
        "num_hands": 2,
    },
)


In [None]:
data_fol = get_resource("hand_fol")
video_name = "pinch_02.mp4"
video_path = data_fol / video_name
video_path


In [None]:
vfs = list_video_frames(
    video_path,
    keep_every_nth_frame=1,
    # max_frame_count=4,
)
print(f"{len(vfs)=}")
# show_frame(vfs[2])


In [None]:
# detection_result = hlf.detect(vfs[2])
# one_hand_world_landmarks = get_landmarks_from_result(detection_result, "world")
# one_hand_world_landmarks[HAND_LANDMARK_MAP["WRIST"]]


In [None]:
from holo_table.landmark.dist import compute_landmark_dist

# def compute_landmark_dist(
#     one_hand_world_landmarks: list[Landmark],
#     landmark_name1: str,
#     landmark_name2: str,
# ) -> float:
#     """Compute the distance between two landmarks."""
#     landmark1 = one_hand_world_landmarks[HAND_LANDMARK_MAP[landmark_name1]]
#     landmark2 = one_hand_world_landmarks[HAND_LANDMARK_MAP[landmark_name2]]
#     return np.linalg.norm(
#         np.array([landmark1.x, landmark1.y, landmark1.z])
#         - np.array([landmark2.x, landmark2.y, landmark2.z])
#     ).astype(float)


# compute distance between thumb and index tips
# compute_landmark_dist(one_hand_world_landmarks, "THUMB_TIP", "INDEX_FINGER_TIP")


In [None]:
from holo_table.landmark.dist import compute_pinch_level

# def compute_pinch_level(
#     one_hand_world_landmarks: list[Landmark],
# ) -> float:
#     """Compute the pinch size, normalized."""
#     dist_thumb_index = compute_landmark_dist(
#         one_hand_world_landmarks, "THUMB_TIP", "INDEX_FINGER_TIP"
#     )
#     dist_wrist_index = compute_landmark_dist(
#         one_hand_world_landmarks, "WRIST", "INDEX_FINGER_MCP"
#     )
#     return dist_thumb_index / dist_wrist_index


# compute_pinch_level(one_hand_world_landmarks)


In [None]:
from IPython.display import display, clear_output


def plot_frame(
    fig,
    ax,
    frame: Frame,
    dist: float,
):
    """Plot a frame."""
    ax.cla()
    show_frame(
        frame,
        ax=ax,
        do_show=False,
        do_resize=True,
        title_suffix=f": {dist:.3f}",
    )
    display(fig)
    clear_output(wait=True)


In [None]:
# process the video and compute all the pinch sizes

# fig, ax = plt.subplots(figsize=(4, 4))

from tqdm import tqdm


all_dist_ls = []
all_msec_ls = []

for frame in tqdm(vfs):
    all_msec_ls.append(frame.msec)
    detection_result = hlf.detect(frame)
    one_hand_world_landmarks = get_landmarks_from_result(detection_result, "world")
    if one_hand_world_landmarks is None:
        continue
    dist = compute_pinch_level(one_hand_world_landmarks)
    all_dist_ls.append(dist)
    # print(f"{dist=:.5f}")
    # show_frame(frame, title_suffix=f"{dist=:.5f}")
    # plot_frame(fig, ax, frame, dist)
    # break

all_msec = np.array(all_msec_ls)
all_dist = np.array(all_dist_ls)


In [None]:
from holo_table.utils.np import diff_pad

# # def diff_pad(x):
# def diff_pad(x: np.ndarray) -> np.ndarray:
#     """Compute the difference between adjacent elements, padding the first element with itself."""
#     return np.diff(x, prepend=x[0])


In [None]:
# raw data first derivative

# all_dist_d = np.diff(all_dist, prepend=all_dist[0])
all_dist_d = diff_pad(all_dist)
plt.plot(all_msec, all_dist)
plt.plot(all_msec, all_dist_d)
plt.grid()


In [None]:
# create a moving average filter
# left_triangle = np.arange(1, 5+1) 
# left_triangle = left_triangle / left_triangle.sum()
# plt.plot(left_triangle)

from holo_table.utils.np import create_left_triangle_filter

# def create_left_triangle_filter(
#     window_size: int,
# ) -> np.ndarray:
#     """Create a left triangle filter."""
#     triangle = np.arange(1, window_size+1) 
#     triangle = triangle / triangle.sum()
#     return triangle

left_triangle = create_left_triangle_filter(5)
print(f"{left_triangle=}")
# plt.plot(left_triangle)


In [None]:
# apply the filter
def convolve_pad(x, kernel):
    return np.convolve(x, kernel, mode="same")

In [None]:
# all_dist_pad = np.pad(all_dist, (4, 0), mode="edge")
# all_dist_pad = np.pad(all_dist, (len(left_triangle)-1, 0), mode="edge")
# all_dist_smooth = np.convolve(all_dist_pad, left_triangle, mode="valid")

# smooth the pinch data
all_dist_smooth = convolve_pad(all_dist, left_triangle)
print(all_dist.shape)
# print(all_dist_pad.shape)
print(all_dist_smooth.shape)

# compute and smooth the first derivative
all_dist_smooth_d = diff_pad(all_dist_smooth)
all_dist_smooth_d_smooth = convolve_pad(all_dist_smooth_d, left_triangle)

# plotz
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(all_msec, all_dist_smooth)
axt = ax.twinx()
axt.plot(all_msec, all_dist_smooth_d_smooth, color="C1")
axt.grid()


In [None]:
from holo_table.utils.np import roll_append

# def roll_append(arr: np.ndarray, val: Any) -> np.ndarray:
#     """Roll the array to the left, and append val at the end of arr."""
#     arr = np.roll(arr, -1)
#     arr[-1] = val
#     return arr


In [None]:
from holo_table.utils.np import roll_append_smooth

# def roll_append_smooth(
#     hist,
#     hist_smooth,
#     value,
#     filt,
# ):
#     # roll append and smooth
#     # update the history of the original data
#     hist = roll_append(hist, value)
#     # compute the moving average using the filter
#     value_smooth = np.dot(hist, filt)
#     # update the history of the smoothed data
#     hist_smooth = roll_append(hist_smooth, value_smooth)
#     return hist, hist_smooth


In [None]:
# cool but we work in an online fashion

# # create a moving average filter
# filter_size = 5
# left_triangle = create_left_triangle_filter(filter_size)
# # save previous values of the raw data
# hist_dist = np.zeros(filter_size, dtype=float)
# # save previous values of the moving average
# hist_dist_smooth = np.zeros(filter_size, dtype=float)

# # # save previous values of the first derivative of the moving average
# # hist_dist_smooth_d = np.zeros(filter_size, dtype=float)

# all_hd_ls = []
# all_hds_ls = []
# all_dist_sd_ls = []
# all_dist_sds_ls = []

# i = 0
# for dist in all_dist:
#     # update the history
#     # hist_dist = np.roll(hist_dist, -1)
#     # hist_dist[-1] = dist
#     hist_dist = roll_append(hist_dist, dist)
#     # print(hist_dist)

#     # compute the moving average
#     dist_smooth = np.dot(hist_dist, left_triangle)

#     # update the history for the smooth data
#     hist_dist_smooth = roll_append(hist_dist_smooth, dist_smooth)

#     # compute the derivative of the smooth data
#     # which is an array
#     # here is what was shady
#     # we should compute the delta of the last two elements
#     # then append it to the history of the derivative
#     # then smooth the derivative
#     dist_smooth_d = diff_pad(hist_dist_smooth)

#     # compute the moving average of the first derivative
#     dist_smooth_d_smooth = np.dot(dist_smooth_d, left_triangle)

#     # save them all to plot later
#     all_hd_ls.append(dist)
#     all_hds_ls.append(dist_smooth)
#     all_dist_sd_ls.append(dist_smooth_d[-1])
#     all_dist_sds_ls.append(dist_smooth_d_smooth)

#     # print(); i += 1; if i > 3: break


# all_hd = np.array(all_hd_ls) * 100
# all_hds = np.array(all_hds_ls) * 100
# all_dist_sd = np.array(all_dist_sd_ls) * 100
# all_dist_sds = np.array(all_dist_sds_ls) * 100

# # second derivative, not smoothed
# all_dist_sdsd = diff_pad(all_dist_sds)
# all_dist_sdsds = convolve_pad(all_dist_sdsd, left_triangle)

# fig, axes = plt.subplots(3, 1, figsize=(8, 11))

# # pinch data
# ax = axes[0]
# ax.scatter(all_msec, all_hd, s=1)
# ax.plot(all_msec, all_hds, color="C1")
# ax.grid()

# # first derivative
# ax = axes[1]
# ax.scatter(all_msec, all_dist_sd, s=1)
# ax.plot(all_msec, all_dist_sds, color="C1")
# ax.grid()

# # second derivative
# ax = axes[2]
# ax.scatter(all_msec, all_dist_sdsd, s=1)
# ax.plot(all_msec, all_dist_sdsds, color="C1")
# ax.grid()


In [None]:
# do it more cleanly

# create a moving average filter
filter_size = 5
left_triangle = create_left_triangle_filter(filter_size)

# pinch data
hdist = np.zeros(filter_size, dtype=float)
# pinch data smoothed
hdist_s = np.zeros(filter_size, dtype=float)

# first derivative of the pinch data smoothed
hdist_sd = np.zeros(filter_size, dtype=float)
# first derivative of the pinch data smoothed, smoothed
hdist_sds = np.zeros(filter_size, dtype=float)

# second derivative of the pinch data
# (computed on the first derivative smoothed)
hdist_sdsd = np.zeros(filter_size, dtype=float)
# second derivative of the pinch data, smoothed
hdist_sdsds = np.zeros(filter_size, dtype=float)

# track them all to plot later
all_dist_s_ls = []
all_dist_sd_ls = []
all_dist_sds_ls = []
all_dist_sdss_ls = []
all_dist_sdsd_ls = []
all_dist_sdsds_ls = []
all_ispinch_ls = []
all_ispinch_sds_ls = []
all_ispinch_sdsds_ls = []

# ranges for the derivative
sd_max = 0.04
sd_min = 0.006
sdsd_max = 0.005

for dist in all_dist:
    # update the history of the raw data
    hdist, hdist_s = roll_append_smooth(hdist, hdist_s, dist, left_triangle)

    # compute the first derivative of the smoothed data
    dist_sd = hdist_s[-1] - hdist_s[-2]
    # update the history of the first derivative
    hdist_sd, hdist_sds = roll_append_smooth(
        hdist_sd, hdist_sds, dist_sd, left_triangle
    )

    # compute the second derivative of the smoothed data
    dist_sdsd = hdist_sds[-1] - hdist_sds[-2]
    # update the history of the second derivative
    hdist_sdsd, hdist_sdsds = roll_append_smooth(
        hdist_sdsd, hdist_sdsds, dist_sdsd, left_triangle
    )

    # absolute values
    adist_sds = np.abs(hdist_sds)
    adist_sdsds = np.abs(hdist_sdsds)

    # check if the first derivative is in a pinching range
    ispinch_sds = np.all(adist_sds > sd_min) and np.all(adist_sds < sd_max)
    # check if the second derivative is in a pinching range
    ispinch_sdsds = np.all(adist_sdsds < sdsd_max)
    # if both are in a pinching range
    ispinch = ispinch_sds and ispinch_sdsds

    # smooth the first derivative again
    dist_sdss = np.dot(hdist_sds, left_triangle)
    if ispinch:
        all_dist_sdss_ls.append(dist_sdss)
    else:
        all_dist_sdss_ls.append(0)

    # save them all to plot later
    # all_dist_ls.append(dist)
    all_dist_s_ls.append(hdist_s[-1])
    all_dist_sd_ls.append(dist_sd)
    all_dist_sds_ls.append(hdist_sds[-1])
    all_dist_sdsd_ls.append(dist_sdsd)
    all_dist_sdsds_ls.append(hdist_sdsds[-1])
    all_ispinch_ls.append(ispinch)
    all_ispinch_sds_ls.append(ispinch_sds)
    all_ispinch_sdsds_ls.append(ispinch_sdsds)

# convert to numpy arrays
all_dist_s = np.array(all_dist_s_ls)
all_dist_sd = np.array(all_dist_sd_ls)
all_dist_sds = np.array(all_dist_sds_ls)
all_dist_sdss = np.array(all_dist_sdss_ls)
all_dist_sdsd = np.array(all_dist_sdsd_ls)
all_dist_sdsds = np.array(all_dist_sdsds_ls)
all_ispinch = np.array(all_ispinch_ls)
all_ispinch_sds = np.array(all_ispinch_sds_ls)
all_ispinch_sdsds = np.array(all_ispinch_sdsds_ls)


In [None]:
# again with class

from holo_table.pinch.tracker import PinchTracker

tracker = PinchTracker(sd_max, sd_min, sdsd_max)

for dist, msec in zip(all_dist, all_msec):
    tracker.update(dist, msec)

# convert to numpy arrays
all_dist_s = np.array(tracker.all_dist_s_ls)
all_dist_sd = np.array(tracker.all_dist_sd_ls)
all_dist_sds = np.array(tracker.all_dist_sds_ls)
all_dist_sdss = np.array(tracker.all_dist_sdss_ls)
all_dist_sdsd = np.array(tracker.all_dist_sdsd_ls)
all_dist_sdsds = np.array(tracker.all_dist_sdsds_ls)
all_ispinch = np.array(tracker.all_ispinch_ls)
all_ispinch_sds = np.array(tracker.all_ispinch_sds_ls)
all_ispinch_sdsds = np.array(tracker.all_ispinch_sdsds_ls)

# plot
fig, axes = plt.subplots(4, 1, figsize=(8, 9))

# pinch data
ax: plt.Axes = axes[0]
ax.scatter(all_msec, all_dist, s=1)
ax.plot(all_msec, all_dist_s, color="C1")
ymin, ymax = ax.get_ylim()
ax.fill_between(all_msec, ymin, ymax, where=all_ispinch, color="C0", alpha=0.2)
ax.grid()

# pinch data
ax = axes[1]
ax.scatter(all_msec, all_dist_sd, s=1)
ax.plot(all_msec, all_dist_sds, color="C1")
ymin, ymax = ax.get_ylim()
ax.fill_between(all_msec, ymin, ymax, where=all_ispinch_sds, color="C0", alpha=0.2)
xmin, xmax = all_msec[0], all_msec[-1]
ax.fill_betweenx([sd_min, sd_max], xmin, xmax, color="C1", alpha=0.2)
ax.fill_betweenx([-sd_min, -sd_max], xmin, xmax, color="C1", alpha=0.2)
ax.grid()

# pinch data
ax = axes[2]
ax.scatter(all_msec, all_dist_sdsd, s=1)
ax.plot(all_msec, all_dist_sdsds, color="C1")
ymin, ymax = ax.get_ylim()
ax.fill_between(all_msec, ymin, ymax, where=all_ispinch_sdsds, color="C0", alpha=0.2)
xmin, xmax = all_msec[0], all_msec[-1]
ax.fill_betweenx([-sdsd_max, sdsd_max], xmin, xmax, color="C1", alpha=0.2)
ax.grid()

# # is pinch
# ax = axes[3]
# ax.fill_between(all_msec, 0, 1, where=all_ispinch, alpha=0.2, label="is pinch")
# ax.fill_between(all_msec, 1, 2, where=all_ispinch_sds, alpha=0.2, label="is pinch sds")
# ax.fill_between(all_msec, 2, 3, where=all_ispinch_sdsds, alpha=0.2, label="is pinch sdsds")
# # ax.scatter(all_msec, all_ispinch, s=1, label="is pinch")
# # ax.scatter(all_msec, all_ispinch_sds + 0.05, s=1, label="is pinch sds")
# # ax.scatter(all_msec, all_ispinch_sdsds + 0.1, s=1, label="is pinch sdsds")
# ax.legend()
# ax.grid()

# pinch derivative value
ax = axes[3]
ax.plot(all_msec, all_dist_sds)
ax.plot(all_msec, all_dist_sdss)
ax.grid()


In [None]:
import plotly.subplots as sp
import plotly.graph_objects as go

# Create subplot grid
fig = sp.make_subplots(
    rows=4,
    cols=1,
    subplot_titles=(
        "Pinch Data",
        "Pinch Data SD",
        "Pinch Data SDSD",
        "Pinch Data to send",
    ),
)

################
# pinch data
fig.add_trace(
    go.Scatter(
        x=all_msec, y=all_dist, mode="markers", marker=dict(size=1), showlegend=False
    ),
    row=1,
    col=1,
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_dist_s,
        mode="lines",
        line=dict(color="coral"),
        showlegend=False,
    ),
    row=1,
    col=1,
)
fig.update_yaxes(range=[all_dist.min(), all_dist.max()], row=1, col=1)
fig.add_shape(
    type="rect",
    x0=all_msec[0],
    y0=all_dist.min(),
    x1=all_msec[-1],
    y1=all_dist.max(),
    fillcolor="chartreuse",
    opacity=0.2,
    line=dict(width=0),
    row=1,
    col=1,
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_ispinch.astype(int),
        fill="tozeroy",
        mode="none",
        # fillcolor="coral", # name="Pinch Detection",
        opacity=0.2,
        showlegend=False,
    ),
    row=1,
    col=1,
)

# ################
# first derivative
fig.add_trace(
    go.Scatter(
        x=all_msec, y=all_dist_sd, mode="markers", marker=dict(size=1), showlegend=False
    ),
    row=2,
    col=1,
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_dist_sds,
        mode="lines",
        line=dict(color="coral"),
        showlegend=False,
    ),
    row=2,
    col=1,
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_ispinch_sds.astype(int),
        fill="tozeroy",
        mode="none",
        # fillcolor="coral", # name="Pinch Detection",
        opacity=0.2,
        showlegend=False,
    ),
    row=2,
    col=1,
)
fig.update_yaxes(range=[all_dist_sd.min(), all_dist_sd.max()], row=2, col=1)

# ################
# second derivative
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_dist_sdsd,
        mode="markers",
        marker=dict(size=1),
        showlegend=False,
    ),
    row=3,
    col=1,
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_dist_sdsds,
        mode="lines",
        line=dict(color="coral"),
        showlegend=False,
    ),
    row=3,
    col=1,
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_ispinch_sdsds.astype(int),
        fill="tozeroy",
        mode="none",
        # fillcolor="coral", # name="Pinch Detection",
        opacity=0.2,
        showlegend=False,
    ),
    row=3,
    col=1,
)
fig.update_yaxes(range=[all_dist_sdsd.min(), all_dist_sdsd.max()], row=3, col=1)

# ################
# pinch data to send
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_dist_sds,
        mode="lines",
        line=dict(color="coral"),
        showlegend=False,
    ),
    row=4,
    col=1,
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_dist_sdss,
        mode="lines",
        line=dict(color="blue"),
        showlegend=False,
    ),
    row=4,
    col=1,
)
fig.update_yaxes(range=[all_dist_sds.min(), all_dist_sds.max()], row=4, col=1)

#################
# layout
fig.update_layout(
    height=900,
    width=800,
    # title="Pinch Data title",
    xaxis=dict(title="msec"),
    yaxis=dict(title="dist"),
    xaxis2=dict(title="msec"),
    yaxis2=dict(title="dist sds"),
    xaxis3=dict(title="msec"),
    yaxis3=dict(title="dist sdsds"),
    xaxis4=dict(title="msec"),
    yaxis4=dict(title="dist sdss"),
)

# fig.update_xaxes(showgrid=True, row=1, col=1)
# fig.update_yaxes(showgrid=True, row=1, col=1)


In [None]:
import plotly.graph_objects as go
import numpy as np

fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x=all_msec, y=all_dist, mode="markers", marker=dict(size=1), showlegend=False
    )
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_dist_s,
        mode="lines",
        line=dict(color="coral"),
        showlegend=False,
    )
)
fig.add_trace(
    go.Scatter(
        x=all_msec,
        y=all_ispinch.astype(int),
        fill="tozeroy",
        mode="none",
        # fillcolor="coral",
        name="Pinch Detection",
        opacity=0.2,
    )
)

fig.update_layout(
    title="Pinch Detection", xaxis_title="Time (msec)", yaxis_title="Force (N)"
)

fig.show()
