# Tutorial: Data Analysis with Pre-Processed Motion Data  

Austin MBaye

Northeastern University

## **Overview**
In this tutorial, we will walk through the **core data analysis** process for our research using a `.pkl` file. This file contains de-identified motion data from all videos and studies, serving as the foundation for our analysis. 

###  **What’s in the `.pkl` File?**
The `.pkl` file includes key motion tracking data extracted from MediaPipe’s **BlazePose model**, along with additional metadata:

- **`keypoints`** – The **(x, y, visibility)** coordinates for each joint detected in the BlazePose model.
- **`annotations`** – Frame-wise annotations for each video, indicating labeled behaviors.
- **`fps`** – Frames per second (fps) of the video.
- **`frame_count`** – Total number of frames in each video.
- **`duration`** – Video duration in seconds.
- **`name`** – Unique identifier for each entry, structured as (child-date_study).

This `.pkl` file contains all necessary pre-processing data for the analysis, while still allowing room for further modifications or new ideas using the raw MediaPipe keypoint data.

---

## **AQSM-SW1PerS + MediaPipe Algorithm Walkthrough**
The **Automated Quantification of Stereotypical Motor Movements via Sliding Windows and 1-Persistence Scoring (AQSM-SW1PerS)** pipeline follows these key steps:

1.  **Data Colletion** - Optional walkthrough that shows how videos were created and MediaPipe pose inference is run
2.  **Preprocessing** – Cleaning and structuring motion data for analysis.  
3.  **Period Estimation** – Identifying periodic patterns in movement.  
4.  **Optimizing Parameters $d$ and $\tau$** – Finding the best delay embedding parameters for persistence analysis.  
5.  **Computing Sliding Window Embedding** – Transforming motion data into time-delayed embeddings.  
6.  **Computing Persistent Homology & Extracting Features** – Calculating topological persistence and selecting the 10 most persistent points as motion descriptors.



# Data Collection (Optional Walkthrough)

This part of the walkthrough shows how videos were created from publically available data from *Goodwin et. al.* and how to run MediaPipe inference on the videos. As we give all the critical information on the videos and provide all MediaPipe pose landmarks in our `.pkl` file, this section is not critical to any data analysis we do in other sections.

The following functions are located in **`AQSM_SW1PerS/utils/video_tools.py`** and **`AQSM_SW1PerS/mediapipe_pose.py`**.  


###  **Loading annotations**

The functions below are to extract annotation information from the publically available data and are essential for writing the video files

In [None]:
import os
import cv2
import math
import numpy as np
from datetime import datetime 
import xml.etree.ElementTree as ET
import glob
from collections import Counter


In [None]:
'''
Definitions used to convert time representations and load annotations
'''

def getTime(s):
    """
    Convert time from YYYY-MM-DD HH:MM:SS.mmm into seconds 
    """
    if isinstance(s, datetime):  
        return s.timestamp()
    else:
        t = datetime.strptime(s, "%Y-%m-%d %H:%M:%S.%f")
        return t.timestamp()

def getUnix(dt_str):
    """
    Convert to Unix timestamp
    """
    dt_obj = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S.%f")
    return dt_obj.timestamp()
   
def parse_datetime(datetime_str):
    """
    Parse the datetime string
    """
    format_str = '%Y-%m-%d-%H-%M-%S-%f'
    return datetime.strptime(datetime_str, format_str)

def loadAnnotations(filename):
    """
    Load annotations into a dictionary format and extract UNIX times for each Good Data interval.
    Each interval's annotations are stored in a distinct list.
    """
    tree = ET.parse(filename)
    root = tree.getroot()
    annotations = []
    good_data = []

    # Identify Good Data intervals
    for m in root: #This line iterates over the top-level elements (or "child" elements) directly under the root element of the XML file.
        for c in m: #This line iterates over the sub-elements (or "children") of each element m
            if c.tag == "LABEL" and c.text == "Good Data": #This line checks if the tag name of the sub-element c is "LABEL" and if its text content (i.e., the text between <LABEL> and </LABEL> in the XML file) is "Good Data"
                good_data_start_unix = getUnix(m.find("START_DT").text)
                good_data_end_unix = getUnix(m.find("STOP_DT").text)
                good_data.append((good_data_start_unix, good_data_end_unix))

    # Process annotations for each Good Data interval
    
    for start_unix, stop_unix in good_data:
        anno = []
        for m in root:
            start = -1
            stop = -1
            label = ""
            for c in m:
                if c.tag == "START_DT":
                    start = getTime(c.text) - start_unix
                elif c.tag == "STOP_DT":
                    stop = getTime(c.text) - start_unix
                elif c.tag == "LABEL":
                    label = c.text

            # Only append valid annotations within the Good Data interval
            if 0 <= start < (stop_unix - start_unix) and stop > 0:
                start = max(start, 0)
                stop = min(stop, stop_unix - start_unix)
                anno.append({"start": start, "stop": stop, "label": label})
        annotations.append(anno)
    return annotations, good_data

    

In [None]:
'''
If you have access to the data folder of .xml files, then change the path below to the correct directory and person ID you wish to analyze. We will be using 001-01-18-08 for the entire tutorial
'''

from AQSM_SW1PerS.utils.paths import get_data_path

data_folder = 'Study2/001-2010-05-25/Annotator1Stereotypy.annotation.xml'
data_path = get_data_path("data", data_folder)

annotations, good_data = loadAnnotations(data_path)

#Look at annotations
for anno in annotations:
    print(np.vstack(anno))
    print('')

In [None]:
'''
Definitions used to write videos from other data file that is publically available.  
'''

def get_current_label(annotations, current_time):
    """
    Get the annotation label for the current frame based on the time within the interval.
    """
    for annotation in annotations:
        if annotation["start"] <= current_time <= annotation["stop"]:
            if  annotation["label"] == 'Rock' or annotation["label"] == 'Flap' or annotation["label"] == 'Flap-Rock':
                return annotation["label"]
    return "Normal"


def findFrames(folder_path, start_unix, end_unix):
    '''
    This is necessary to get the total number of frames needed to copute the fps of the video
    '''
    total_frames=0
     # Use glob to get all image paths in nested folders
    image_paths = sorted(glob.glob(f"{folder_path}/**/*.jpg", recursive=True))

    for img_path in image_paths:
        img_name = os.path.basename(img_path).split('.')[0]

        timestamp = parse_datetime(img_name)
        frame_time = getTime(timestamp)

        # Check if frame is within the "Good Data" interval
        if start_unix <= frame_time <= end_unix:
            total_frames += 1
    return total_frames


def write_video(folder_path, output_name, good_data, annotations):
    """
    This takes in the files from autism data and converts them into videos, one for each 'Good Data' interval.
    Each frame is labeled with its annotation, and the annotations for each video part are saved in a list.
    
    :param folder_path: Root folder containing image files in a structured directory.
    :param output_name: Base name for output video files.
    :param good_data: List of tuples with start and end UNIX times for each interval.
    :param annotations: List of dictionaries with 'start', 'stop', and 'label' keys for each annotation.
    :return: List of lists, where each sublist contains annotations for each video part.
    """
    video_annotations = []  # List to store annotations for each video part

    for i, (start_unix, end_unix) in enumerate(good_data):
        part_annotations = []  # List to store annotations for the current part
        
        total_frames = findFrames(folder_path, start_unix, end_unix)
        fps = total_frames/(end_unix - start_unix)
        # Set up video writer for each part
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(f'{output_name}_part_{i+1}.mp4', fourcc, fps, (352, 288))  # Adjust dimensions as needed

        # Use glob to get all image paths in nested folders
        image_paths = sorted(glob.glob(f"{folder_path}/**/*.jpg", recursive=True))

        for img_path in image_paths:
            img_name = os.path.basename(img_path).split('.')[0]
            timestamp = parse_datetime(img_name)
            frame_time = getTime(timestamp)

            # Check if frame is within the "Good Data" interval
            if start_unix <= frame_time <= end_unix:
                img = cv2.imread(img_path)
                if img is None:
                    continue  # Skip if image could not be read

                # Get the annotation label for the current frame
                annotation_label = get_current_label(annotations[i], frame_time - start_unix)
                num_label = 0
                if annotation_label:
                    if annotation_label == 'Rock':
                        num_label = 1
                    elif annotation_label == 'Flap':
                        num_label = 2
                    elif annotation_label == 'Flap-Rock':
                        num_label = 3  
                        
                    part_annotations.append(num_label)  # Store annotation for this frame
                    # Overlay the annotation label onto the frame
                    cv2.putText(img, annotation_label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
                else:
                    part_annotations.append(num_label)
                # Write the frame to the video file
                out.write(img)
                cv2.imshow('Frame',img)
        # Finalize the video for this part
        out.release()
        cv2.destroyAllWindows()

        # Add the collected annotations for this part to the overall list
        video_annotations.append(part_annotations)

    return video_annotations
    

In [None]:

video_path = 'Study1/001/URI-001-01-18-08/2008-01-18' #Replace with the path of the folder with images

folder_path = get_data_path("autismvideos", video_path)

output_video = '001-01-18-08' 

frame_annotations = write_video(folder_path, output_video, good_data, annotations)


###  **MediaPipe & YOLOv5 Pose Estimation**: The following functions can be located in `**AQSM_SW1PerS/mediapipe_pose.py**`

MediaPipe is an open-source framework developed by Google that provides efficient, cross-platform solutions for real-time machine learning pipelines. In this project, we leverage MediaPipe's pose landmarker to capture body movements at a low computational cost compared to other models, such as OpenPose. This task uses a machine learning model on a continuous stream of images. The task outputs 33 landmarks.


In [None]:
from filterpy.kalman import KalmanFilter
import torch
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
mp_drawing=mp.solutions.drawing_utils
mp_pose=mp.solutions.pose


In [None]:
# Section: Video Tools
# -------------------------------------------------

def get_video_info(video_path):
    """
    Tool to get the number of frames in the video, the fps, and the total duration of the video in seconds. Not Necessary since 
    we already have this information above. But if starting with video then this is needed.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise Exception("Error opening video file")
    fps = cap.get(cv2.CAP_PROP_FPS)
    _, image = cap.read()
    count = 0
    success = True
    while success: 
        success,image = cap.read()
        count += 1
    total_frames = count
    duration_seconds = total_frames / fps
    cap.release()
    frame_times = [1 / fps * i for i in range(int(fps * duration_seconds))]
    return fps, total_frames, duration_seconds,frame_times

# Section: Keypoint Handling
# -------------------------------------------------
    
def getNormalizedFrameKeypoint(keypoint, dims, xmin, xmax, ymin, ymax):
    '''
    Normalized keypoint with respect to frame, instead of YOLOv5 bounding box
    :Param keypoint: Keypoint coordinate for given frame
    :Param dims: Dimension of the video frame
    :Params xmin,xmax,ymin,ymax: YOLOv5 bounding box min/max x,y values
    :Returns x_frame,y_frame: New normalized keypoint coordinate with respect to frame
    '''
    frame_width, frame_height = dims
    x, y = keypoint[0], keypoint[1]
    w, h = np.abs(xmax - xmin), np.abs(ymax - ymin)
    x_unnormalized = x * w + xmin
    y_unnormalized = y * h + ymin
    x_frame = np.abs((x_unnormalized - frame_width) / frame_width)
    y_frame = np.abs((y_unnormalized - frame_height) / frame_height)
    return x_frame, y_frame

# Section: YOLOv5 Box Tracking
# -------------------------------------------------

def getClass(video_name):
    '''
    Returns YOLOv5 class that will be used on video for each subject
    '''
    if '001' in video_name:
        return 0
    elif '002' in video_name:
        return 1
    elif '003' in video_name:
        return 2
    elif '004' in video_name:
        return 3
    elif '005' in video_name:
        return 4
    elif '006' in video_name:
        return 5
    else: 
        return 6

def create_kalman_filter(fps):
    dt = 1 / fps  # Time step
    
    kf = KalmanFilter(dim_x=8, dim_z=4)
    
    # State Transition Matrix (F)
    kf.F = np.array([
        [1, 0, 0, 0, dt, 0,  0,  0],
        [0, 1, 0, 0, 0,  dt, 0,  0],
        [0, 0, 1, 0, 0,  0,  dt, 0],
        [0, 0, 0, 1, 0,  0,  0,  dt],
        [0, 0, 0, 0, 1,  0,  0,  0],
        [0, 0, 0, 0, 0,  1,  0,  0],
        [0, 0, 0, 0, 0,  0,  1,  0],
        [0, 0, 0, 0, 0,  0,  0,  1]
    ])
    
    # Measurement Matrix (H)
    kf.H = np.array([
        [1, 0, 0, 0, 0, 0, 0, 0],
        [0, 1, 0, 0, 0, 0, 0, 0],
        [0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0]
    ])
    
    # Process Noise Covariance (Q)
    kf.Q *= 0.01
    
    # Measurement Noise Covariance (R)
    kf.R *= 1.0
    
    # State Covariance Matrix (P)
    kf.P *= 10.0
    
    return kf

def update_kalman_filter(kf, measurement):
    # Predict the next state
    kf.predict()
    
    # Update with the current measurement (YOLOv5 bounding box)
    kf.update(measurement)
    
    corrected_state = kf.x
    return corrected_state[:4]  # Return only position and size
    

In [None]:

def create_mediapipe_file(filepath, times, fps, model, create_video = True):
    """
    Using MediaPipe and YOLOv5, we get all (x,y) keypoint coordinates that will automatically be saved to a .csv file
    :Param filepath: Video file
    :Param times: Array of time values at each frame in video
    :Param fps: fps of video
    :Param model: YOLOv5 model used
    """
    
    basename = os.path.basename(filepath)  
    # Remove the .avi suffix
    cleaned_name = os.path.splitext(basename)[0] 
    output_video = f'{cleaned_name}_mp.avi'
    cap = cv2.VideoCapture(filepath)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    dims = (frame_width,frame_height)
    if create_video:
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter(output_video, fourcc, fps, (frame_width,frame_height))

    #Decide what child we need to track for the YOLOv5 model
    cls = getClass(filepath)
    
    #For tracking centroids of current frame and previous frame
    kf = create_kalman_filter(fps)

    data = []
    frame_count = 0

    # Total number of joints in Mediapipe Pose
    total_joints = len(mp_pose.PoseLandmark)

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5,model_complexity=2) as pose:
        for timestamp in times:
            cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000)
            ret, frame = cap.read()
            frame_data = {'frame': frame_count}
            
            if not ret:
                for idx in range(total_joints):
                    frame_data[f'joint_{idx}_x'] = np.nan
                    frame_data[f'joint_{idx}_y'] = np.nan
                    frame_data[f'joint_{idx}_visibility'] = np.nan
                data.append(frame_data)
                continue

            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False
            
            #Run YOLOv5 on the frame
            results = model(image)
            annotated_frame = results.render()[0]
            
            current_bbox = None

            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
            
            #Obtain the index of the most confident bounding box for the child
            detection_index = 0
            max_confidence = 0
            for idx, det in enumerate(results.xyxy[0].tolist()):
                xmin, ymin, xmax, ymax, confidence, clas = det
                if clas == cls and confidence >= max_confidence:
                    max_confidence = confidence
                    detection_index = idx
                    
            MARGIN = 15
            detected = False
            for idx2, det in enumerate(results.xyxy[0].tolist()):
                if idx2 == detection_index:
                    xmin, ymin, xmax, ymax, confidence, clas = det
                    if clas == cls:
                        current_bbox = [xmin, ymin, xmax, ymax]
            
                        # Convert to [x_center, y_center, width, height] for Kalman Filter
                        x_center = (xmin + xmax) / 2
                        y_center = (ymin + ymax) / 2
                        width = xmax - xmin
                        height = ymax - ymin
                        yolo_bbox = [x_center, y_center, width, height]
                        
                        # Smooth bounding box with Kalman Filter
                        smoothed_bbox = update_kalman_filter(kf, np.array(yolo_bbox))
                        xmin = smoothed_bbox[0].item() - smoothed_bbox[2].item() / 2
                        ymin = smoothed_bbox[1].item() - smoothed_bbox[3].item() / 2
                        xmax = smoothed_bbox[0].item() + smoothed_bbox[2].item() / 2
                        ymax = smoothed_bbox[1].item() + smoothed_bbox[3].item() / 2
                        
                        x_min = int(max(0, xmin - MARGIN))
                        y_min = int(max(0, ymin - MARGIN))
                        x_max = int(min(image.shape[1], xmax + MARGIN))
                        y_max = int(min(image.shape[0], ymax + MARGIN))
                       
                        cropped_frame = frame[int(y_min):int(y_max), int(x_min):int(x_max)]
                        cropped_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)
                        #Run MediaPipe within the bounding box
                        results = pose.process(cropped_rgb)

                        if results.pose_landmarks:
                            # Pose detected, extract landmark data
                            for idx, landmark in enumerate(results.pose_landmarks.landmark):
                                x, y = landmark.x, landmark.y
                                #Re-normalize with respect to frame
                                x_new, y_new = getNormalizedFrameKeypoint([x,y],dims,x_min,x_max,y_min,y_max)
                                frame_data[f'joint_{idx}_x'] = x_new
                                frame_data[f'joint_{idx}_y'] = y_new
                                frame_data[f'joint_{idx}_visibility'] = landmark.visibility
                                
                            mp_drawing.draw_landmarks(cropped_frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                                      mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
                                                      mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2))
                        else:
                            # Pose not detected, fill with NaN
                            for idx in range(total_joints):
                                frame_data[f'joint_{idx}_x'] = np.nan
                                frame_data[f'joint_{idx}_y'] = np.nan
                                frame_data[f'joint_{idx}_visibility'] = np.nan
                    
                        data.append(frame_data)
            
                        detected = True
                        break

            if not detected:
                for idx in range(total_joints):
                    frame_data[f'joint_{idx}_x'] = np.nan
                    frame_data[f'joint_{idx}_y'] = np.nan
                    frame_data[f'joint_{idx}_visibility'] = np.nan
                data.append(frame_data)
            if create_video:
                out.write(frame)
            
            if cv2.waitKey(29) & 0xFF == ord('q'):
                break
                
            frame_count += 1
    if create_video:
        out.release()
    cap.release()
    cv2.destroyAllWindows()
    df = pd.DataFrame(data)
    df.to_csv(f'MPdata/{cleaned_name}.csv', index=False) #Ensure you have made directory called MPdata
    

In [None]:
'''
This is necessary if you are using a windows system to not get any errors
'''

import pathlib

# Force the use of WindowsPath, this is for use on Windows
pathlib.PosixPath = pathlib.WindowsPath


In [None]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

#Example Usage 
model_path = get_data_path("YOLOv5l/yolov5/runs/train/exp/weights", "YOLOv5l_transfer.pt")

model = torch.hub.load('ultralytics/yolov5', 'custom', path=model_path, force_reload=True)

video_path = '001-01-18-08.avi'

fps, total_frames, duration_seconds, frame_times = get_video_info(video_path)

create_mediapipe_file(video_path, frame_times, fps, model, True)


This marks the end of the processing of the publically available data from Goodwin et. al. For the remainder of the tutorial, we use the `*.pkl*` file which contains all of the data we collected above into one easy-to-use dataset.

# Preprocessing

The following functions are located in **`AQSM_SW1PerS/utils/data_processing.py`**.  


###  **Load the `.pkl` file**


In [None]:
import pickle
from ATSM_SW1PerS.utils.paths import get_data_path

pkl_file = get_data_path("dataset.pkl")

def open_pickle(pkl_file):
    '''
    Loads the .pkl file
    '''
    with open(pkl_file, 'rb') as f:
        loaded_data = pickle.load(f)
    return loaded_data

data = open_pickle(pkl_file = pkl_file)

index = 0
for entry in data:  # First video entry as example
   
    print(f"Video Name: {entry['name']}, Index: {index}")
    index += 1
    print('')

###  **Segmenting Videos into Overlapping Time Blocks**
The function below is responsible for segmenting videos into overlapping time blocks, ensuring that each segment retains its associated annotations.  

These time blocks will be individually analyzed in later stages of the pipeline to extract movement patterns and compute persistence-based features.

In [None]:
import numpy as np
from collections import Counter
    
def segment_video(entry, segment_size):
    '''    
    Tool for segmenting video into a fixed window size and annotating the segments
    
    :Param filepath: entry of .pkl file
    :Param segment_size: Desired segmentation size
    :Return segments: Array of time intervals based on segment size
    :Return fps, frame times, segments, and their annotations
    '''
    fps, frame_count, duration = entry['fps'], entry['frame_count'], entry['duration']
    frame_times = [1 / fps * i for i in range(int(fps * duration))]
  
    step_size = 1/fps #In seconds
  
    segments = []
    annotated_segments = []
    
    start_time = frame_times[0]
    i = start_time
    while i <= np.max(frame_times) - segment_size:
        lower_bound = i
        upper_bound = i + segment_size
        x = np.array([lower_bound,upper_bound])
        segments.append(x)
        i += step_size
    segments = np.vstack(segments)
        
    frame_annotations = entry['annotations']
    
    for segment in segments:
        segment_start = np.min(segment)
        segment_end = np.max(segment)
        frame_start, frame_end = int(fps* segment_start), int(fps * segment_end)
        interval_annos = np.array(frame_annotations[frame_start:frame_end])
        interval_annos_flat = np.ravel(interval_annos)  # Flatten to 1D array
        # Count occurrences
        counts = Counter(interval_annos_flat.tolist())
        # Access counts
        num_normals = int(counts[0])
        num_rocks = int(counts[1])
        num_flaps = int(counts[2])
        num_flap_rock = int(counts[3])
        total_counts = num_normals + num_rocks + num_flaps + num_flap_rock
        # Default annotation
        annotation = 0
        
        # Ensure 100% Overlap
        if num_rocks / total_counts == 1:
            annotation = 1
        elif num_flaps / total_counts == 1:
            annotation = 2
        elif num_flap_rock / total_counts == 1:
            annotation = 3
        elif (0 < num_rocks / total_counts < 1) or \
             (0 < num_flaps / total_counts < 1) or \
             (0 < num_flap_rock / total_counts < 1):
            annotation = -1  

        # Append the determined annotation
        annotated_segments.append(annotation)
        
    annotated_segments = np.vstack(annotated_segments)
    
    return fps, frame_times, segments, annotated_segments


In [None]:

entry = data[13] 
fps, frame_times, segments, annotated_segments = segment_video(entry, 4)


##  MediaPipe & YOLOv5 Pose Estimation  

The function for running MediaPipe on a video can be found in **`AQSM_SW1PerS/mediapipe_pose.py`**. We use the extracted keypoints from the .pkl file.

###  **Extracting Keypoints from Segmented Video**
Now that we have the segmented video, we will extract the same keypoints used throughout the repository. These keypoints correspond to specific body landmarks and are essential for motion analysis.  

###  **Overview of MediaPipe Pose Estimation**
[MediaPipe](https://developers.google.com/mediapipe) is an open-source framework developed by Google that provides efficient, cross-platform solutions for real-time machine learning pipelines.  

In this project, we use MediaPipe's Pose Landmarker to capture body movements efficiently, offering a lower computational cost compared to models like OpenPose. The pose estimation model processes a continuous stream of images and outputs 33 key body landmarks.  


| Index | 0  | 1                | 2          | 3                | 4                | 5          | 6                | 7         | 8         | 9         | 10        |
|-------|----|-----------------|------------|-----------------|-----------------|------------|-----------------|----------|----------|----------|----------|
| **Landmark** | Nose | Left eye (inner) | Left eye | Left eye (outer) | Right eye (inner) | Right eye | Right eye (outer) | Left ear | Right ear | Mouth (left) | Mouth (right) |

| Index | 11  | 12            | 13          | 14          | 15         | 16         | 17         | 18         | 19         | 20         | 21         | 22         |
|-------|----|--------------|------------|------------|------------|------------|------------|------------|------------|------------|------------|------------|
| **Landmark** | Left shoulder | Right shoulder | Left elbow | Right elbow | Left wrist | Right wrist | Left pinky | Right pinky | Left index | Right index | Left thumb | Right thumb |

| Index | 23  | 24       | 25      | 26      | 27        | 28        | 29        | 30        | 31            | 32            |
|-------|----|---------|--------|--------|----------|----------|----------|----------|--------------|--------------|
| **Landmark** | Left hip | Right hip | Left knee | Right knee | Left ankle | Right ankle | Left heel | Right heel | Left foot  | Right foot  |


### Keypoint Processing: Smoothing & Occlusion Handling  

The following functions are designed to further process extracted keypoints, reducing sudden jumps and handling occlusions for improved motion tracking.


These refinements ensure higher accuracy in movement detection and thus better feature extraction for further analysis.


In [None]:
import numpy as np
import pandas as pd
import scipy.ndimage
from scipy.interpolate import CubicSpline

def getChest(p1, p2, p3):
    """
    Computes the chest position as the average of three keypoints.
    
    Parameters:
      p1, p2, p3 (np.ndarray): Arrays of shape (n_frames, 3) containing x, y, z values.
    
    Returns:
      np.ndarray: Array of shape (n_frames, 2) with the (x, y) chest positions.
    """
    x1, y1 = p1[:, 0], p1[:, 1]
    x2, y2 = p2[:, 0], p2[:, 1]
    x3, y3 = p3[:, 0], p3[:, 1]
    midpoint = [(x1 + x2 + x3) / 3, (y1 + y2 + y3) / 3]
    return np.column_stack(midpoint)

def compute_joint_angle(x1, y1, x2, y2, x3, y3):
    """
    Computes the angle between two vectors: (x1,y1)->(x2,y2) and (x2,y2)->(x3,y3).
    
    Returns:
      float: Angle in degrees. Returns NaN if any segment has zero length.
    """
    v1 = np.array([x1 - x2, y1 - y2])
    v2 = np.array([x3 - x2, y3 - y2])
    norm_v1 = np.linalg.norm(v1)
    norm_v2 = np.linalg.norm(v2)
    if norm_v1 == 0 or norm_v2 == 0:
        return np.nan
    angle = np.arccos(np.clip(np.dot(v1, v2) / (norm_v1 * norm_v2), -1.0, 1.0))
    return np.degrees(angle)

def filter_visibility(keypoints, frame_times, keypoint_index, do_wrists=False,
                      wrist_index=None, elbow_index=None, visibility_threshold=0.3):
    """
    Filters out frames with low visibility keypoints.
    
    Parameters:
      keypoints (list or np.array): List of per-frame keypoints; each frame is an iterable
                                    of keypoint entries, where each entry is [x, y, visibility].
      frame_times (list): Timestamps for each frame.
      keypoint_index (int): Index of the keypoint to filter when do_wrists is False.
      do_wrists (bool): If True, filters both wrist and elbow keypoints.
      wrist_index (int, optional): Index of the wrist keypoint (required if do_wrists is True).
      elbow_index (int, optional): Index of the elbow keypoint (required if do_wrists is True).
      visibility_threshold (float): Threshold below which a keypoint is considered low visibility.
      
    Returns:
      If do_wrists is False:
        list: Frame times where the keypoint at keypoint_index is below the threshold.
      If do_wrists is True:
        tuple: (wrist_bad_frames, elbow_bad_frames)
    """
    if do_wrists:
        if wrist_index is None or elbow_index is None:
            raise ValueError("wrist_index and elbow_index must be provided when do_wrists=True.")
        wrist_bad_frames = []
        elbow_bad_frames = []
        for idx, frame in enumerate(keypoints):
            if frame[wrist_index][2] < visibility_threshold:
                wrist_bad_frames.append(frame_times[idx])
            if frame[elbow_index][2] < visibility_threshold:
                elbow_bad_frames.append(frame_times[idx])
        return wrist_bad_frames, elbow_bad_frames
    else:
        bad_frames = []
        for idx, frame in enumerate(keypoints):
            if frame[keypoint_index][2] < visibility_threshold:
                bad_frames.append(frame_times[idx])
        return bad_frames

def getAccel(values, fps, n_dim = 2):
    """
    Computes the second-order central derivative of the time series,
    with forward and backward differences applied at the boundaries.
    """
    h = 1 / fps
    if n_dim == 2:
        f_x = values[:, 0]
        f_y = values[:, 1]
        
        f_x_second_deriv = (f_x[2:] - 2 * f_x[1:-1] + f_x[:-2]) / h**2
        f_x_second_deriv_lower_bound = (f_x[1] - 2 * f_x[0] + f_x[2]) / h**2  
        f_x_second_deriv_upper_bound = (f_x[-3] - 2 * f_x[-2] + f_x[-1]) / h**2  
    
        f_y_second_deriv = (f_y[2:] - 2 * f_y[1:-1] + f_y[:-2]) / h**2
        f_y_second_deriv_lower_bound = (f_y[1] - 2 * f_y[0] + f_y[2]) / h**2  
        f_y_second_deriv_upper_bound = (f_y[-3] - 2 * f_y[-2] + f_y[-1]) / h**2
      
        f_x_second_derivative = np.concatenate(([f_x_second_deriv_lower_bound], f_x_second_deriv, [f_x_second_deriv_upper_bound]))
        f_y_second_derivative = np.concatenate(([f_y_second_deriv_lower_bound], f_y_second_deriv, [f_y_second_deriv_upper_bound]))

        return np.column_stack((f_x_second_derivative, f_y_second_derivative))
    else:
        f_x = values
        
        f_x_second_deriv = (f_x[2:] - 2 * f_x[1:-1] + f_x[:-2]) / h**2
        f_x_second_deriv_lower_bound = (f_x[1] - 2 * f_x[0] + f_x[2]) / h**2  
        f_x_second_deriv_upper_bound = (f_x[-3] - 2 * f_x[-2] + f_x[-1]) / h**2  

        return np.concatenate(([f_x_second_deriv_lower_bound], f_x_second_deriv, [f_x_second_deriv_upper_bound]))


def interprolate_missing_frames(df, col1, col2, method="cubic"):
    """
    Interpolates the two columns in the DataFrame using the specified method,
    then fills any leading NaNs with the first valid value (backfill) and 
    trailing NaNs with the last valid value (forward fill).
    
    Parameters:
        df (pd.DataFrame): The DataFrame containing the data.
        col1 (str): The name of the first column.
        col2 (str): The name of the second column.
        method (str): Interpolation method (default "cubic").
        
    Returns:
        pd.DataFrame: The modified DataFrame.
    """
    # Interpolate the entire series without a limit.
    df[col1] = df[col1].interpolate(method=method)
    df[col2] = df[col2].interpolate(method=method)

    # --- Handle leading NaNs: backfill from the first valid value ---
    # For col1:
    if pd.isna(df[col1].iloc[0]):
        first_valid_idx = df[col1].first_valid_index()
        if first_valid_idx is not None:
            # Fill all rows up to and including the first valid index
            df.loc[:first_valid_idx, col1] = df.loc[first_valid_idx, col1]
            
    # For col2:
    if pd.isna(df[col2].iloc[0]):
        first_valid_idx = df[col2].first_valid_index()
        if first_valid_idx is not None:
            df.loc[:first_valid_idx, col2] = df.loc[first_valid_idx, col2]
    
    # --- Handle trailing NaNs: forward fill from the last valid value ---
    # For col1:
    if pd.isna(df[col1].iloc[-1]):
        last_valid_idx = df[col1].last_valid_index()
        if last_valid_idx is not None:
            df.loc[last_valid_idx:, col1] = df.loc[last_valid_idx, col1]
            
    # For col2:
    if pd.isna(df[col2].iloc[-1]):
        last_valid_idx = df[col2].last_valid_index()
        if last_valid_idx is not None:
            df.loc[last_valid_idx:, col2] = df.loc[last_valid_idx, col2]

            
    return df


def extract_keypoints(entry, keypoint_index, frame_times, fps, do_wrists=False,
                      elbow_index=None, shoulder_index=None):
    """
    Extracts and cleans keypoint (x, y) data.
    
    If do_wrists is False, the function returns cleaned (x, y) positions for the specified keypoint.
    If do_wrists is True, it also extracts elbow and shoulder points to compute elbow flexion angles,
    angular velocity, and acceleration for wrist-specific processing.
    
    Parameters:
      entry (dict): Contains 'keypoints' (list of per-frame keypoint data).
      keypoint_index (int): Index for the target keypoint (or wrist if do_wrists is True).
      frame_times (list): Timestamps for each frame.
      fps (float): Frames per second.
      do_wrists (bool): Flag for wrist-specific processing.
      elbow_index (int, optional): Required if do_wrists is True.
      shoulder_index (int, optional): Required if do_wrists is True.
      
    Returns:
      np.ndarray: Cleaned (x, y) positions. If do_wrists is True, returns wrist positions.
    """
    keypoints = entry['keypoints']
    
    if do_wrists:
        
        # Get bad visibility frames for wrist and elbow
        wrist_bad_frames, elbow_bad_frames = filter_visibility(keypoints, frame_times, keypoint_index, do_wrists=True, wrist_index=keypoint_index, elbow_index=elbow_index, visibility_threshold=0.2)
        # Extract (x,y) data for wrist, elbow, and shoulder from each frame.
        keypoint_array = [[[x, y] for x, y, _ in frame] for frame in keypoints]
        wrist_points, elbow_points, shoulder_points = [], [], []
        for frame in keypoint_array:
            if (keypoint_index < len(frame) and elbow_index < len(frame) and shoulder_index < len(frame)):
                wrist_points.append(frame[keypoint_index])
                elbow_points.append(frame[elbow_index])
                shoulder_points.append(frame[shoulder_index])
            else:
                # In production code you might want to handle this differently.
                continue
        
        wrist_points = np.vstack(wrist_points)
        elbow_points = np.vstack(elbow_points)
        shoulder_points = np.vstack(shoulder_points)
        
        # Build a DataFrame for wrist processing.
        df = pd.DataFrame({
            "Time": frame_times[:len(wrist_points)],
            "X_Wrist": wrist_points[:, 0],
            "Y_Wrist": wrist_points[:, 1],
            "X_Elbow": elbow_points[:, 0],
            "Y_Elbow": elbow_points[:, 1],
            "X_Shoulder": shoulder_points[:, 0],
            "Y_Shoulder": shoulder_points[:, 1],
        })
        
        # Mark low visibility elbow frames as NaN.
        df.loc[df["Time"].isin(elbow_bad_frames), ["X_Elbow", "Y_Elbow"]] = np.nan
        
        df = interprolate_missing_frames(df, "X_Elbow", "Y_Elbow")

        # Compute elbow flexion angle.
        df["Elbow_Flexion_Angle"] = df.apply(lambda row: compute_joint_angle(
            row["X_Shoulder"], row["Y_Shoulder"],
            row["X_Elbow"], row["Y_Elbow"],
            row["X_Wrist"], row["Y_Wrist"]
        ), axis=1)
        
        angles = df["Elbow_Flexion_Angle"].values
        angular_acceleration = getAccel(angles, fps, n_dim = 1)
        df["Angular_Acceleration"] = angular_acceleration
        
        # Flag frames with extreme angular acceleration.
        df.loc[df["Time"].isin(wrist_bad_frames), "Angular_Acceleration"] = 10000
        
        bad_frames = df[np.abs(df["Angular_Acceleration"]) > 500].index #500, and 5000, 150
        df["Valid"] = True
        df.loc[bad_frames, "Valid"] = False
        df.loc[bad_frames, ["X_Wrist", "Y_Wrist"]] = np.nan

        # Interpolate and smooth wrist coordinates.
        df = interprolate_missing_frames(df, "X_Wrist", "Y_Wrist")
        
        sigma = 0.5
        df["X_Wrist"] = scipy.ndimage.gaussian_filter1d(df["X_Wrist"], sigma=sigma)
        df["Y_Wrist"] = scipy.ndimage.gaussian_filter1d(df["Y_Wrist"], sigma=sigma)
        
        return df[["X_Wrist", "Y_Wrist"]].to_numpy()
    
    else:
        # For non-wrist processing, filter visibility and extract the desired keypoint.
        bad_visibility_frames = filter_visibility(keypoints, frame_times, keypoint_index)
        keypoint_array = [[[x, y] for x, y, _ in frame] for frame in keypoints]
        extracted = []
        for frame in keypoint_array:
            if keypoint_index < len(frame):
                extracted.append(frame[keypoint_index])
            else:
                continue
        extracted = np.vstack(extracted)
        
        # Compute acceleration using external function getAccel.
        acceleration = getAccel(extracted, fps)
        
        df = pd.DataFrame({
            "Time": frame_times[:len(extracted)],
            "X": extracted[:, 0],
            "Y": extracted[:, 1],
        })
        df["Acceleration X"] = acceleration[:, 0]
        df["Acceleration Y"] = acceleration[:, 1]
        
        # Set acceleration of keypoints with low visibility to a high value.
        df.loc[df["Time"].isin(bad_visibility_frames), ["Acceleration X", "Acceleration Y"]] = 10000
        
        # Flag bad frames based on acceleration thresholds.
        bad_frames = df[(np.abs(df["Acceleration X"]) > 4) | (np.abs(df["Acceleration Y"]) > 10)].index
        df["Valid"] = True
        df.loc[bad_frames, "Valid"] = False
        df.loc[bad_frames, ["X", "Y"]] = np.nan
        
        # Interpolate and smooth the data.
        df = interprolate_missing_frames(df, "X", "Y")
        
        sigma = 0.5
        df["X"] = scipy.ndimage.gaussian_filter1d(df["X"], sigma=sigma)
        df["Y"] = scipy.ndimage.gaussian_filter1d(df["Y"], sigma=sigma)
        
        return df[["X", "Y"]].to_numpy()


## Keypoint Analysis: Raw Positions & Acceleration  

In our pipeline, we will analyze both:  
1. Raw keypoint positions – The direct (x, y) coordinates extracted from MediaPipe.  
1. Acceleration representations – Primarily estimated from the second-order foward difference. The foward and backwards differences are computed on the boundaries to maintain the same size. 

This dual-analysis approach ensures a comprehensive understanding of movement patterns in each segment.


In [None]:

head = extract_keypoints(entry, 0, frame_times, fps)
lshoulder = extract_keypoints(entry, 11, frame_times, fps)
rshoulder = extract_keypoints(entry, 12, frame_times, fps)
rwrist = extract_keypoints(entry, 16, frame_times, fps, do_wrists=True, elbow_index=14, shoulder_index=12)
lwrist = extract_keypoints(entry, 15, frame_times, fps, do_wrists=True, elbow_index=13, shoulder_index=11)

chest = getChest(head, lshoulder, rshoulder)

lshoulder_accel = getAccel(lshoulder,fps)
rshoulder_accel = getAccel(rshoulder,fps)

rwrist_accel = getAccel(rwrist,fps)
lwrist_accel = getAccel(lwrist,fps)

head_accel = getAccel(head,fps)
chest_accel = getAccel(chest,fps)


In [None]:
'''
Turn Keypoints into Spline Representation for the Main Algorithm to Interpolate Values
'''

#Raw keypoint positions

h_x, h_y = CubicSpline(frame_times,head[:,0]), CubicSpline(frame_times,head[:,1])

r_x, r_y = CubicSpline(frame_times,rwrist[:,0]), CubicSpline(frame_times,rwrist[:,1])

l_x, l_y = CubicSpline(frame_times,lwrist[:,0]), CubicSpline(frame_times,lwrist[:,1])

rs_x, rs_y = CubicSpline(frame_times,rshoulder[:,0]), CubicSpline(frame_times,rshoulder[:,1])

ls_x, ls_y = CubicSpline(frame_times,lshoulder[:,0]), CubicSpline(frame_times,lshoulder[:,1])

c_x, c_y = CubicSpline(frame_times,chest[:,0]), CubicSpline(frame_times,chest[:,1])

#Acceleration representation of keypoint movement

h_x_a, h_y_a = CubicSpline(frame_times,head_accel[:,0]), CubicSpline(frame_times,head_accel[:,1])

r_x_a, r_y_a = CubicSpline(frame_times,rwrist_accel[:,0]), CubicSpline(frame_times,rwrist_accel[:,1])

l_x_a, l_y_a = CubicSpline(frame_times,lwrist_accel[:,0]), CubicSpline(frame_times,lwrist_accel[:,1])

rs_x_a, rs_y_a = CubicSpline(frame_times,rshoulder_accel[:,0]), CubicSpline(frame_times,rshoulder_accel[:,1])

ls_x_a, ls_y_a = CubicSpline(frame_times,lshoulder_accel[:,0]), CubicSpline(frame_times,lshoulder_accel[:,1])

c_x_a, c_y_a = CubicSpline(frame_times,chest_accel[:,0]), CubicSpline(frame_times,chest_accel[:,1])



#  SW1PerS Algorithm  

The following definitions can be found in: **`AQSM_SW1PerS/SW1PerS.py`**  

###  **Overview**
The **Sliding Windows and 1-Persistence Scoring  (SW1PerS)** algorithm is designed to analyze motion periodicity using a combination of sliding windows and persistent homology.  

Below is the pipeline for obtaining periodicity scores for each video segment.

---

## **SW1PerS Algorithm: Computing Periodicity Scores**
1. **Preprocessing** – Extract keypoints and clean motion data.  
2. **Period Estimation** – Compute dominant movement frequency.  
3. **Optimizing Embedding Parameters** – Find the best $d$ and $\tau$ for sliding window embedding.
4.  **Sliding Window Embedding** – Convert motion data into high-dimensional point cloud.  
5. **Persistent Homology Computation** – Extract topological features from embedded motion data.  
6. **Scoring & Interpretation** – Extract the 10 most persistent points and compute periodicity scores.  

This pipeline enables automated movement tracking and provides a quantification of recurrent signals.


In [None]:
from ripser import ripser
from persim import plot_diagrams
from scipy.interpolate import CubicSpline
import pandas as pd
from scipy.signal import find_peaks
from scipy import signal
from scipy import interpolate
from numpy.linalg import norm
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
import multiprocessing as multi

%matplotlib inline


In [None]:
# Example: choose either a time value OR an annotation
time_value = None     # set to a numeric value if you want to choose by time
annotation = 1   # set to a string if you want to choose by annotation

# Specify which occurrence you want (0 for first, 1 for second, etc.)
occurrence = 1  # for example, choose the second occurrence

if (time_value is None and annotation is None) or (time_value is not None and annotation is not None):
    raise ValueError("Please provide either a time_value OR an annotation (but not both).")

if time_value is not None:
    # Gather all matching segment indices based on time_value
    matching_indices = [
        i for i in range(len(segments) - 1)
        if np.min(segments[i]) >= time_value and np.min(segments[i + 1]) != time_value
    ]
else:
    # Gather all matching segment indices based on annotation
    matching_indices = [i for i, ann in enumerate(annotated_segments) if ann == annotation]

if not matching_indices:
    raise ValueError("No segment found for the given criteria.")

if occurrence >= len(matching_indices):
    raise ValueError("Occurrence number is out of range. There are only {} matches.".format(len(matching_indices)))

idx = matching_indices[occurrence]

segment = segments[idx]
annotated_segment = annotated_segments[idx]

print("Chosen segment:", segment)
print("Chosen annotated segment:", annotated_segment)


# --- Choose a Sensor ---
def choose_sensor(sensor='Chest'):
    # Map sensor names to their corresponding (position, acceleration) tuples.
    sensor_map = {
        'Chest': ((c_x, c_y), (c_x_a, c_y_a)),
        'LW':    ((l_x, l_y), (l_x_a, l_y_a)),
        'RW':    ((r_x, r_y), (r_x_a, r_y_a)),
        'LS':    ((ls_x, ls_y), (ls_x_a, ls_y_a)),
        'RS':    ((rs_x, rs_y), (rs_x_a, rs_y_a)),
        'Head':  ((h_x, h_y), (h_x_a, h_y_a))
    }
    
    if sensor not in sensor_map:
        raise ValueError("Not a valid sensor. Valid options are: " + ", ".join(sensor_map.keys()))
    
    (t_x, t_y), (t_x_a, t_y_a) = sensor_map[sensor]
    return t_x, t_y, t_x_a, t_y_a

# Example usage:
t_x, t_y, t_x_a, t_y_a = choose_sensor(sensor='LS')


In [None]:
'''
Setup
'''

start = np.min(segment)
end = np.max(segment)

#Interprolate more points for time window
num_points = 1000
t_vals = np.linspace(start,end,num_points)

keypoint_x = t_x(t_vals)
keypoint_y = t_y(t_vals)


keypoint_x_accel = t_x_a(t_vals)
keypoint_y_accel = t_y_a(t_vals)


#Remove linear trends in the time series
keypoint_x = signal.detrend(keypoint_x)
keypoint_y = signal.detrend(keypoint_y)


keypoint_x_accel = signal.detrend(keypoint_x_accel)
keypoint_y_accel = signal.detrend(keypoint_y_accel)


#Store Cubic Spline representations of accelerometers into an array. Needed for Sliding Window Computation
f_x = CubicSpline(t_vals, keypoint_x)
f_y = CubicSpline(t_vals, keypoint_y) 


f_x_a = CubicSpline(t_vals, keypoint_x_accel)
f_y_a = CubicSpline(t_vals, keypoint_y_accel) 


cs1 = []
cs1.append(f_x)
cs1.append(f_y)


cs2 = []
cs2.append(f_x_a)
cs2.append(f_y_a)


fig, axes = plt.subplots(1, 2, figsize=(15, 4))  

axes[0].plot(t_vals,keypoint_x,color='r',label = 'X')
axes[0].plot(t_vals,keypoint_y,color='g',label = 'Y')

axes[0].set_title("Keypoint Positions")
axes[0].set_xlabel("Time")
axes[0].set_ylabel("Normalized Position")
axes[0].legend()

# Second subplot
axes[1].plot(t_vals,keypoint_x_accel,color='r',label = 'X')
axes[1].plot(t_vals,keypoint_y_accel,color='g',label = 'Y')
axes[1].set_title("Acceleration Keypoint")
axes[1].set_xlabel("Time")
axes[1].set_ylabel("Acceleration")
axes[1].legend()

plt.tight_layout()
plt.show()


### Period Estimation

It has been observed that when the window size $d\tau$ approaches the length of the period of a signal $f: \mathbb{R} \longrightarrow \mathbb{R}$, the roundedness of its sliding window point cloud is maximized. 
This, in turn,
maximizes the persistence of the point fartherst from the diagonal in $dgm_1$, and hence the strength of the periodicity score computed via persistence diagrams. 
In particular, if $f$ satisfies the identity 
$$
f(t + L) = f(t) \quad \text{for } L >0 \text{ minimal and all } t,
$$
i.e., $f$ is $L$-periodic, then its $L$-periodicity is best captured by the persistence diagrams of its sliding window point cloud when:

$$    d\tau= \left( \frac{d}{d+1} \right)L
$$

We use the Complex Discrete Fourier Transform as discussed in the *Methods* section of the manuscript to estimate $L$.

In [None]:
'''
Obtaining Optimal d and tau values for Sliding Window Embedding
'''

def find_prominent_peaks(magnitude_spectrum, threshold, prominence):
    '''
    Tool to find the optimal d value for Sliding Windows by sorting the peaks of the DFT 
    :Param magnitude_spectrum: Array of magnitude spectrum from DFT data
    :Param threshold: Minimum value that a peak must exceed to be considered prominent
    :Param promincence: Measure of how much a peak stands out from the surrounding data
    :Return sorted_peaks[:n_peaks]: Sorted array of most prominent peaks
    '''
    peaks, _ = find_peaks(magnitude_spectrum, height=threshold, prominence=prominence)
    sorted_peaks = sorted(peaks, key=lambda p: -magnitude_spectrum[p])
    cumulative_sum = np.cumsum(magnitude_spectrum[sorted_peaks])
    total_sum = np.sum(magnitude_spectrum[sorted_peaks])
    n_peaks = np.searchsorted(cumulative_sum, 0.7 * total_sum, side='right')
    return sorted_peaks[:n_peaks]



In [None]:
window_size = 4 #Time block size

F1 = keypoint_x + 1j * keypoint_y
F2 = keypoint_x_accel + 1j * keypoint_y_accel

Fs = num_points/(end-start)

try:
    dft1 = np.fft.fft(F1)
    magnitude_spectrum1 = np.abs(dft1)
    frequencies1 = np.fft.fftfreq(len(F1), 1/Fs)
    
    pk1 = find_prominent_peaks(magnitude_spectrum1, 0, 0) 
    
    peak_center1 = frequencies1[pk1][0]
    
    period1 = np.abs((1/peak_center1))

    if period1 > window_size / 2: #Cutoff Period
        epsilon = 1e-10  
    
        
        frequencies1 = np.where(frequencies1 == 0, epsilon, frequencies1)
    
        cutoff_period1 = window_size / 2
    
        periods1 = np.abs(1/frequencies1)
        # Apply cutoff to magnitude spectrum
        idx = np.where(periods1 < cutoff_period1)  # Use > to zero out values above cutoff
        magnitude_spectrum1 = magnitude_spectrum1[idx]
        frequencies1 = frequencies1[idx]
        pk1 = find_prominent_peaks(magnitude_spectrum1, 0, 0) 
        
        peak_center1 = frequencies1[pk1][0]
        period1 = np.abs((1/peak_center1))
except:
    period1 = 0
    frequencies1, magnitude_spectrum1 = 0, 0

try:
    dft2 = np.fft.fft(F2)
    magnitude_spectrum2 = np.abs(dft2)
    frequencies2 = np.fft.fftfreq(len(F2), 1/Fs)
    
    pk2 = find_prominent_peaks(magnitude_spectrum2, 0, 0) 
    
    peak_center2 = frequencies2[pk2][0]
    
    period2 = np.abs((1/peak_center2))

    if period2 > window_size / 2: #Cutoff Period
        epsilon = 1e-10  
        
        frequencies2 = np.where(frequencies2 == 0, epsilon, frequencies2)
    
        cutoff_period2 = window_size / 2
    
        periods2 = np.abs(1/frequencies2)
        # Apply cutoff to magnitude spectrum
        idx = np.where(periods2 < cutoff_period2)  # Use > to zero out values above cutoff
        magnitude_spectrum2 = magnitude_spectrum2[idx]
        frequencies2 = frequencies2[idx]
        pk2 = find_prominent_peaks(magnitude_spectrum2, 0, 0) 
    
        peak_center2 = frequencies2[pk2][0]
        period2 = np.abs((1/peak_center2))
except:
    period2 = 0
    frequencies2, magnitude_spectrum2 = 0, 0

fig, axes = plt.subplots(1, 2, figsize=(10, 4))  # 1 row, 2 columns

axes[0].plot(frequencies1,magnitude_spectrum1)
try:
    axes[0].plot(frequencies1[pk1],magnitude_spectrum1[pk1],'x')
    axes[0].vlines(peak_center1, ymin=0, ymax=np.max(magnitude_spectrum1[pk1]), colors='r', linestyles='dashed',label = 'Fundamental Freq.')
except:
    print('Raw frequency detection failed')
axes[0].set_xlabel(fr'$\omega$')
axes[0].set_ylabel(fr'$|F(\omega)|$')
axes[0].set_title("Raw Keypoint Magnitude Spectrum")
axes[0].legend()

axes[1].plot(frequencies2,magnitude_spectrum2)
try:
    axes[1].plot(frequencies2[pk2],magnitude_spectrum2[pk2],'x')
    axes[1].vlines(peak_center2, ymin=0, ymax=np.max(magnitude_spectrum2[pk2]), colors='r', linestyles='dashed',label = 'Fundamental Freq.')
except:
    print('Raw frequency detection failed')
axes[1].set_xlabel(fr'$\omega$')
axes[1].set_ylabel(fr'$|F(\omega)|$')
axes[1].set_title("Acceleration Magnitude Spectrum")
axes[1].legend()

plt.tight_layout()
plt.show()

d = 23

t1 = period1 / (d + 1)
t2 = period2 / (d + 1)

print(fr'Window Size for Raw Keypoints: {d*t1}')
print(fr'Window Size for Acceleration: {d*t2}')




### Sliding Window Embedding

To recognize periodicity (e.g., in the movement of estimated keypoint locations), one can transform the problem of recurrence detection in time series data into that of shape analysis, where tools from TDA can be applied. Specifically, we use the Sliding Window Embedding, which transforms a time series into a discretized point cloud in a high-dimensional space.

Let $F : \mathbb{R} \longrightarrow \mathbb{R}^m$ be an $m$-dimensional time series, $m\geq 1$, and write $F(t)=(f_1(t),...,f_m(t))$. Given parameters $d\in \mathbb{N}$ and $\tau>0$, the sliding window embedding of the signal $F$ at $t \in \mathbb{R}$ is the matrix:

$$
SW_{d, \tau}F(t) =
\begin{bmatrix}
    f_1(t) & \dots & f_m(t)\\
    f_1(t + \tau) & \dots & f_m(t + \tau) \\
    \vdots & \ddots & \vdots\\
    f_1(t + d\tau) & \dots & f_m(t + d\tau)
\end{bmatrix} \in \mathbb{R}^{(d+1)\times m}
$$

Moreover, given a set of time values $I\subset \mathbb{R}$, the sliding window point cloud of $F$ 
is the set $X = \{SW_{d,\tau} F(t) \; : \; t\in I\} \subset \mathbb{R}^{(d+1)\times m} $.
We will use the Euclidean (i.e., Frobenius) norm $\|\cdot\|$ in $\mathbb{R}^{(d+1)\times m}$
to measure pairwise distances $\|p - p'\|$
between points  $p,p' \in X$. 

When the signal $F$ is only observed at discrete 
time points $t_0 < t_1 < \cdots < t_n$  
we  extend it to   $t\in \mathbb{R}$, before computing its  sliding window point cloud, as follows:
$$
F(t) = 
\left\{
  \begin{array}{ll}
    F(t_0)  & \hbox{if }\;\; t\leq t_0  \\ \\
    F(t_n) & \hbox{if } \;\; t\geq t_n \\ \\
    F_{spline}(t) & \hbox{if }\;\; t_i \leq t \leq t_{i+1}, \; i = 0,\ldots, n-1
  \end{array}
\right.
$$
where $F_{spline} : \mathbb{R} \longrightarrow \mathbb{R}^{m}$ is the function whose $j$-th coordinate, $j=1,\ldots, m$, is the cubic spline interpolating the points $(t_i, f_j(t_i))$ for $i=0,\ldots, n$.


Dense regions in the sliding window point cloud $X\subset \mathbb{R}^{(d+1)\times m}$ correspond to areas capturing the underlying shape and geometry of the object. 
More sparse regions, on the other hand,  can contribute noisy data degrading topological inference. 
To filter out this sparse data, we use a $k$-nearest neighbors density approach. 
That is, for $k\in \mathbb{N}$ and each $p\in X$, let $N_k(p) \subset X$ be the set comprised of the $k$ nearest neighbors of $p$ in $X$ using the Euclidean distance. 
To filter based on density, we estimate the density of a point  as the reciprocal of the  median distance to its  $k$-nearest neighbors: 
$$    \rho_k(p) = \frac{1}{\text{median}
    \left(
    \left\{
    \|p - p'\| \; : \; p' \in N_k(p)\,
    \right\}
    \right)}
$$ Given the values $\rho_k(p)$ for $p\in X$, we let the subsample of densest points be
$$X_s = 
\left\{
p \in X \, : \, \rho_k(p) \mbox{ is among the top $80\%$ of values}
\right\}
$$

``knn_density`` below is the function that acheives this filtering of the sliding window point cloud.

After subsampling the point cloud, we mean-center each point individually and normalize it to enable comparisons between the periodic motions of different videos and varying ranges or intensities. 
That is: 
$$\widetilde{X_s} = 
\left\{
\frac{p - \mathsf{mean}(p)}{\|p - \mathsf{mean}(p)\|} \; : \; p \in X_s
\right\}
$$where $\mathsf{mean}(p)\in \mathbb{R}^{(d+1)\times m}$
is the matrix with all entries equal to the average of the entries of the $(d+1)\times m$ matrix  $p \in X_s$. 
We remark that density thresholding in the sliding window point cloud ameliorates the impact of noisy recurrent behavior in $F$, and that mean centering helps remove linear trends from the original signal.

In [None]:

from sklearn.neighbors import NearestNeighbors

def knn_density(point_cloud, k=10): 
    '''
    Tool to filter out sparse data in point cloud to only collect the important information
    '''
    nbrs = NearestNeighbors(n_neighbors=k, algorithm='auto').fit(point_cloud)
    distances, indices = nbrs.kneighbors(point_cloud)
    densities = 1 / np.nanmedian(distances, axis=1)
    
    sample_percentage = 0.8
    filtered_points = int(sample_percentage * len(point_cloud))
    
    dense_indices = np.argsort(densities)[-filtered_points:]
    
    subsampled_point_cloud = point_cloud[dense_indices]
    
    return subsampled_point_cloud

import numpy.matlib

def SW_cloud_nD(cs, x_vals, tau, d, n_data, n_dims):
    '''
    Computes the sliding window embedding with n columns.
    :param cs: List containing the cubic splines for f1(x),f2(x), . . . ,fn(x)
    :param x_vals: Time values of frames segment
    :param tau: Optimal tau value (delay)
    :param d: Optimal embedding dimension
    :param n_data: Number of points desired in the point cloud
    :return SW_normalized: Point cloud from sliding window embedding, normalized
    '''
    t_vals = np.linspace(np.min(x_vals), np.max(x_vals) - (d * tau), n_data)
    SW = np.zeros((n_data, (d + 1) * n_dims))
    
    for i, t in enumerate(t_vals):
        for j in range(n_dims):
            SW_f_t = cs[j](t + np.arange(0, d + 1) * tau)
            SW[i, j * (d + 1):(j + 1) * (d + 1)] = SW_f_t
    
    SW = knn_density(SW, k=20)

    # Subtract the mean from each row (mean centering)
    SW_mean = np.mean(SW, axis=1)
    SW_mean = numpy.matlib.repmat(SW_mean,  np.shape(SW)[1],1).T
    SW_centered = SW - SW_mean
    
    # Perform L2 normalization (row-wise)
    SW_norm = np.linalg.norm(SW_centered, axis=1, keepdims=True)
    SW_normalized = SW_centered / SW_norm

    return SW_normalized
    

In [None]:
from sklearn.decomposition import PCA

#Compute Sliding Window Point Cloud

SW1 = SW_cloud_nD(cs1,t_vals,t1,d,300,2)
SW2 = SW_cloud_nD(cs2,t_vals,t2,d,300,2)

pca = PCA(n_components=2) 
proj_2D_1 = pca.fit_transform(SW1)
proj_2D_2 = pca.fit_transform(SW2)

fig, axes = plt.subplots(1, 2, figsize=(10, 4))  

axes[0].scatter(proj_2D_1[:,0], proj_2D_1[:,1], s=10, alpha=0.7, color='deepskyblue')
axes[0].set_title(fr'Raw Keypoint SW Point Cloud')

axes[1].scatter(proj_2D_2[:,0], proj_2D_2[:,1], s=10, alpha=0.7, color='darkcyan')
axes[1].set_title(fr'Acceleration SW Point Cloud')

plt.tight_layout()
plt.show()


### Peridoicity Score

To measure the "roundness" of the high dimensional point cloud, we use persistent homology to derive a periodicity score:

$$PS_1 = w_1\frac{mp_1(dgm_1)}{\sqrt3} - w_2\frac{mp_2(dgm_1)}{\sqrt3}$$

This approach directly measures the offset between the first and second most persistent $H_1$ features. The bigger this offset is, the more circular we expect our $S\mathbb{W}$ point cloud to be. We also offer the option to use an alternative periodicity score that is more of a summary of the 10 most persistent $H_1$ features"

$$PS_{10} = \begin{bmatrix} w_1\frac{mp_1(dgm_1)}{\sqrt3}, w_2\frac{mp_2(dgm_1)}{\sqrt3}, \dots ,  w_n\frac{mp_n(dgm_1)}{\sqrt3} \end{bmatrix} $$

While this provides a more discriptive measurement of the persistence diagram, the overall accuracy increase is negligible compared to $PS_1$ and is much less interpretable.

In [None]:
import sympy
from ripser import ripser
from persim import plot_diagrams

def next_prime(n):
    '''
    A general rule of thumb is for the coeff parameter in Ripser to be set to the next prime after 2*d
    '''
    return sympy.nextprime(n)

def compute_PS(dgm1, method = 'PS1'):
    try:
        #Sort the points to get the 10 most persistent points
        sorted_points = sorted(dgm1, key=lambda x: x[1] - x[0], reverse=True)
        n_most_persistent_points = np.array(sorted_points[:10])
        zeros_array = np.zeros((10 - len(n_most_persistent_points), 2))
        filled_points = np.vstack([n_most_persistent_points, zeros_array])
        persistences = np.abs(filled_points[:, 0] - filled_points[:, 1]) 
        weights =  np.exp(-0.5 * filled_points[:, 0])
    except:
        persistences = np.zeros(10)
        weights =  np.zeros(10)
        
    if method == 'PS10':
        return (persistences * weights) / np.sqrt(3)
    elif method == 'PS1':
        return (persistences[0] * weights[0]) / np.sqrt(3) - (persistences[1] * weights[1]) / np.sqrt(3)
    else:
        print('Not a valid mathod. Choose either 10MPS or 1PS')
        
        

In [None]:

'''
Compute Homology from Rips Filtration
'''

#Rips filtration for homology for both point clouds
result1 = ripser(SW1, coeff=next_prime(2*d), maxdim = 1) 
result2 = ripser(SW2, coeff=next_prime(2*d), maxdim = 1) 

#Extract diagrams
diagrams1 = result1['dgms']
diagrams2 = result2['dgms']

dgm1_1 = diagrams1[1]
dgm1_1 = np.array(dgm1_1)

dgm1_2 = diagrams2[1]
dgm1_2 = np.array(dgm1_2)

method = 'PS1'
scores1 = compute_PS(dgm1_1, method = method)
scores2 = compute_PS(dgm1_2, method = method)

try: 
    num = len(scores1)
    x_lim = 9.5
except:
    num = 1
    x_lim = 0.5
    
fig, axes = plt.subplots(2, 2, figsize=(10, 10)) 

plot_diagrams(diagrams1, plot_only=[1], xy_range=[0, 2, 0, 2], ax = axes[0,0])
axes[0,0].set_title(fr'Raw Position Persistence Diagram')

axes[0,1].bar(range(num), scores1, alpha=0.5)
axes[0,1].set_title(fr'Raw Position PS')
axes[0,1].set_xlim(-0.5, x_lim)
axes[0,1].set_ylim(0, 1)
axes[0,1].set_xticks([])

plot_diagrams(diagrams2, plot_only=[1], xy_range=[0, 2, 0, 2], ax = axes[1,0])
axes[1,0].set_title(fr'Acceleration Persistence Diagram')

axes[1,1].bar(range(num), scores2, alpha=0.5)
axes[1,1].set_title(fr'Acceleration PS')
axes[1,1].set_xlim(-0.5, x_lim)
axes[1,1].set_ylim(0, 1)
axes[1,1].set_xticks([])

plt.tight_layout()
plt.show()


In [None]:

fig, axes = plt.subplots(2, 5, figsize=(20, 7)) 

axes[0,0].plot(t_vals,keypoint_x,color='r',label = 'X')
axes[0,0].plot(t_vals,keypoint_y,color='g',label = 'Y')

axes[0,0].set_title("Chest Spatial Positions")
axes[0,0].set_xlabel("Time")
axes[0,0].set_ylabel("Normalized Position")
axes[0,0].set_yticks([])
axes[0,0].set_xticks([])
axes[0,0].legend()

# Second subplot
axes[1,0].plot(t_vals,keypoint_x_accel,color='r',label = 'X')
axes[1,0].plot(t_vals,keypoint_y_accel,color='g',label = 'Y')
axes[1,0].set_title("Chest Acceleration")
axes[1,0].set_xlabel("Time")
axes[1,0].set_ylabel("Acceleration")
axes[1,0].set_yticks([])
axes[1,0].set_xticks([])
axes[1,0].legend()


axes[0,1].plot(frequencies1,magnitude_spectrum1)
try:
    axes[0,1].plot(frequencies1[pk1],magnitude_spectrum1[pk1],'x')
    axes[0,1].vlines(peak_center1, ymin=0, ymax=np.max(magnitude_spectrum1[pk1]), colors='r', linestyles='dashed',label = 'F-Freq.')
except:
    print('Raw frequency detection failed')
axes[0,1].set_xlabel(fr'$\omega$')
axes[0,1].set_ylabel(fr'$|F(\omega)|$')
axes[0,1].set_xticks([])
axes[0,1].set_yticks([])
axes[0,1].set_title("Magnitude Spectrum")
axes[0,1].legend()

axes[1,1].plot(frequencies2,magnitude_spectrum2)
try:
    axes[1,1].plot(frequencies2[pk2],magnitude_spectrum2[pk2],'x')
    axes[1,1].vlines(peak_center2, ymin=0, ymax=np.max(magnitude_spectrum2[pk2]), colors='r', linestyles='dashed',label = 'F-Freq.')
except:
    print('Raw frequency detection failed')
axes[1,1].set_xlabel(fr'$\omega$')
axes[1,1].set_ylabel(fr'$|F(\omega)|$')
axes[1,1].set_xticks([])
axes[1,1].set_yticks([])
axes[1,1].set_title("Magnitude Spectrum")
axes[1,1].legend()

axes[0,2].scatter(proj_2D_1[:,0], proj_2D_1[:,1], s=10, alpha=0.7, color='deepskyblue')
axes[0,2].set_xticks([])
axes[0,2].set_yticks([])
axes[0,2].set_title(fr'PCA SW Point Cloud')

axes[1,2].scatter(proj_2D_2[:,0], proj_2D_2[:,1], s=10, alpha=0.7, color='darkcyan')
axes[1,2].set_xticks([])
axes[1,2].set_yticks([])
axes[1,2].set_title(fr'PCA SW Point Cloud')


plot_diagrams(diagrams1, plot_only=[1], xy_range=[0, 2, 0, 2], ax = axes[0,3])
axes[0,3].set_xticks([])
axes[0,3].set_yticks([])

axes[0,3].set_title(fr'Persistence Diagram')

axes[0,4].bar(range(num), scores1, alpha=0.5)
axes[0,4].set_title(fr'$PS_1$')
axes[0,4].set_xlim(-0.5, x_lim)
axes[0,4].set_ylim(0, 1)
axes[0,4].set_xticks([])

plot_diagrams(diagrams2, plot_only=[1], xy_range=[0, 2, 0, 2], ax = axes[1,3])
axes[1,3].set_xticks([])
axes[1,3].set_yticks([])

axes[1,3].set_title(fr'Persistence Diagram')

axes[1,4].bar(range(num), scores2, alpha=0.5)
axes[1,4].set_title(fr'$PS_1$')
axes[1,4].set_xlim(-0.5, x_lim)
axes[1,4].set_ylim(0, 1)
axes[1,4].set_xticks([])

plt.tight_layout()
plt.show()

