In [1]:
import cv2
import mediapipe as mp
import numpy as np
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

In [2]:
model_path = "pose_landmarker_full.task"

In [18]:
import numpy as np
from time import time

def smoothing_factor(t_e, cutoff):
    r = 2 * np.pi * cutoff * t_e
    return r / (r + 1)


def exponential_smoothing(a, x, x_prev):
    return a * x + (1 - a) * x_prev


class OneEuroFilter:
    def __init__(self, x0, dx0=0.0, min_cutoff=1.0, beta=0.0,
                 d_cutoff=1.0):
        """Initialize the one euro filter."""
        # The parameters.
        self.data_shape = x0.shape
        self.min_cutoff = np.full(x0.shape, min_cutoff)
        self.beta = np.full(x0.shape, beta)
        self.d_cutoff = np.full(x0.shape, d_cutoff)
        # Previous values.
        self.x_prev = x0.astype(np.float64)
        self.dx_prev = np.full(x0.shape, dx0)
        self.t_prev = time()

    def __call__(self, x):
        """Compute the filtered signal."""
        assert x.shape == self.data_shape

        t = time()
        t_e = t - self.t_prev
        t_e = np.full(x.shape, t_e)

        # The filtered derivative of the signal.
        a_d = smoothing_factor(t_e, self.d_cutoff)
        dx = (x - self.x_prev) / t_e
        dx_hat = exponential_smoothing(a_d, dx, self.dx_prev)

        # The filtered signal.
        cutoff = self.min_cutoff + self.beta * np.abs(dx_hat)
        a = smoothing_factor(t_e, cutoff)
        x_hat = exponential_smoothing(a, x, self.x_prev)

        # Memorize the previous values.
        self.x_prev = x_hat
        self.dx_prev = dx_hat
        self.t_prev = t

        return x_hat

In [3]:
def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]
    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

In [30]:
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.IMAGE)

cap = cv2.VideoCapture(0)
with PoseLandmarker.create_from_options(options) as pose:
    init = True
    while cap.isOpened():
        ret, frame = cap.read()
        frame = cv2.flip(frame, 1)
        # Recolor image to RGB
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
      
        # Make detection
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)
        results = pose.detect(mp_image)
        
        if len(results.pose_landmarks) != 0:
            coords = np.array([[joint.x, joint.y, joint.z] for joint in results.pose_landmarks[0]])
            if init == True:
                coords_hat = coords.copy()
                
                # The filtered signal
                min_cutoff = 1
                beta = 0
                one_euro_filter = OneEuroFilter(
                    coords,
                    min_cutoff=min_cutoff,
                    beta=beta)
                init = False
            else:
                coords_hat = one_euro_filter(coords)
                
            for i in range(coords.shape[0]):
                results.pose_landmarks[0][i].x = coords_hat[i][0]
                results.pose_landmarks[0][i].y = coords_hat[i][1]
                results.pose_landmarks[0][i].z = coords_hat[i][2]
            
            
        # Recolor back to BGR
        image.flags.writeable = True
    
        
        annotated_image = draw_landmarks_on_image(image, results)
        image = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR)
        
        cv2.imshow('Mediapipe Feed', image)
        

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()
    cv2.waitKey(1)

I0000 00:00:1712842586.128710       1 gl_context.cc:344] GL version: 2.1 (2.1 Metal - 83.1), renderer: Apple M2


In [17]:
coords = np.array([[joint.x, joint.y, joint.z] for joint in results.pose_landmarks[0]])


In [25]:
for i in range(coords.shape[0]):
    results.pose_landmarks[0][i].x = coords_hat[i][0]
    results.pose_landmarks[0][i].y = coords_hat[i][1]
    results.pose_landmarks[0][i].z = coords_hat[i][2]
    

In [19]:
coords.shape

(33, 3)

In [22]:
coords_hat = np.zeros((33, 3))

In [None]:
# The filtered signal
min_cutoff = 0.004
beta = 0.7
df_hat = np.zeros(33, 3)
df_hat[0] = df[0]
one_euro_filter = OneEuroFilter(
    df[0],
    min_cutoff=min_cutoff,
    beta=beta
)