### Data Loading and Preparation

In [None]:
# Load data from training data directory, convert from NPZ to tensor, and save to a dict
# containing lift name and tensor data

import os
import torch
import numpy as np
import functions
import matplotlib.pyplot as plt

DEADLIFT_TENSORS = {}
BENCH_TENSORS = {}
SQUAT_TENSORS = {}

# Setup paths
script_dir = os.path.dirname(os.path.abspath("model_train.ipynb"))
data_dir = os.path.normpath(os.path.join(script_dir, '..', 'lift data'))

# Step 1: Get all subfolder names in data_dir, A.K.A lift_data
subfolders = []
for f in os.listdir(data_dir):
    if os.path.isdir(os.path.join(data_dir, f)):
        subfolders.append(f)


#  Squat Files
#  Bench Files
#  Deadlift Files
for subfolder in subfolders:
    subfolder_path = os.path.join(data_dir, subfolder)
    files = []

    # Isolate individual lift files in each of the subfolders (bench files, squat files, deadlift files) and add them to files list
    for f in os.listdir(subfolder_path):
        if os.path.isfile(os.path.join(subfolder_path, f)):
            files.append(f)

    # Iterate through files in each subfolder
    for file in files:
        data = np.load(os.path.join(subfolder_path, file))

        # convert each landmark/angle to a tensor and add it to the tensor dict (which is
        # specific to the current lift only
        tensor_dict = {}
        for key in data.files:
            lift_name_without_video_type = " ".join(key.split('.')[:1] + key.split("_")[-2:])
            tensor_dict[lift_name_without_video_type] = torch.tensor(data[key])
        if file.split('.')[0].startswith('bench'):
            BENCH_TENSORS[file.split('.')[0]] = tensor_dict
        if file.split('.')[0].startswith('deadlift'):
            DEADLIFT_TENSORS[file.split('.')[0]] = tensor_dict
        if file.split('.')[0].startswith('squat'):
            SQUAT_TENSORS[file.split('.')[0]] = tensor_dict

In [None]:
BENCH_TENSORS['bench 3 good lift data'].keys()

In [None]:
BENCH_TENSORS['bench 3 good lift data']['bench 3 good left elbow']

In [None]:
plt.figure()
plt.plot(BENCH_TENSORS['bench 3 good lift data']['bench 3 good right elbow'])

In [None]:
for k,v in BENCH_TENSORS.items():
    print(k)
    for key in v.keys():

        try:
            print(f"---{key} contains a {type(v[key])} containing Z coords for {functions.landmark_indeces_to_labels[int(key.split(' ')[-1])]} that contains ({len(v[key])} points)")
        except:
            print(f"---{key} contains a {type(v[key])} containing joint angles for {' '.join(key.split('_')[-2:])} that contains ({len(v[key])} angles)")


In [None]:
for key, value in DEADLIFT_TENSORS.items():
    print(key)
    # for landmark, coordinate in BENCH_TENSORS[key].items():
    #     return (landmark, coordinate[2])

In [None]:
# Get the keys within an individual lift

DEADLIFT_TENSORS['deadlift 6 good lift data'].keys()

In [None]:
all_deadlift_joint_angles = functions.joint_angles_to_list(DEADLIFT_TENSORS)
all_bench_joint_angles = functions.joint_angles_to_list(BENCH_TENSORS)
all_squat_joint_angles = functions.joint_angles_to_list(SQUAT_TENSORS)

# Example of one output for one lift (in this case, deadlift 1 good is the first to be
# processed, so it is at index 0 of our array. We can now iterate over this array with our
# model to learn patterns
all_squat_joint_angles

### Split Tensors into individual reps to then be batched

In [None]:
import random

def split_data(data_list, train_ratio, seed):
    random.seed(seed)
    random.shuffle(data_list)
    split_index = int(len(data_list) * train_ratio)
    train_set = data_list[:split_index]
    test_set = data_list[split_index:]
    return train_set, test_set

train_ratio = 1
seed=45

deadlift_train, deadlift_test = split_data(all_deadlift_joint_angles, train_ratio, seed)
bench_train, bench_test = split_data(all_bench_joint_angles, train_ratio, seed)
squat_train, squat_test = split_data(all_squat_joint_angles, train_ratio, seed)

In [None]:
# EXAMPLE OF SPLITTING LIFT DATA INTO INDIVIDUAL REPS
# via ChatGPT
from scipy.signal import find_peaks, savgol_filter
import torch
import numpy as np
import matplotlib.pyplot as plt

def detect_reps_by_threshold(lift_name: str, joint_tensor: torch.Tensor, depth_thresh, rise_thresh, min_frames=4, plot=False):

    signal = joint_tensor
    rep_ranges = []
    in_rep = False
    start = 1

    peaks, valleys = [], []

    if lift_name != "deadlift":
        for i in range(1, len(signal)):
            if signal[i] < rise_thresh and not in_rep:
                in_rep = True
                start = i
            elif signal[i] > rise_thresh and in_rep:
                end = i
                if end - start >= min_frames:
                    rep_ranges.append((start, end))
                start = None
                in_rep = False
                continue
    else:
        start = 1
        ascending = True
        highest_valley = float('inf')
        for i in range(1, len(signal)):

            if signal[i - 1] < signal[i] and not ascending:
                print(i, ' valley')
                valleys.append(i - 1)
                highest_valley = min(highest_valley, int(signal[i]))
                end = i - 2
                if end - start >= min_frames:
                    rep_ranges.append((start, end))
                start = i - 1
                ascending =  True

            elif signal[i - 1] > signal[i] and ascending:
                print(i, 'peak')
                peaks.append(i - 1)
                ascending = False

            elif signal[i - 1] < signal[i] and not ascending:
                print(i, ' hill')
                ascending = True

        print(highest_valley)
        j = 0
        if len(rep_ranges) > 0:

            while j < len(signal) and signal[j] < highest_valley:
                j += 1
            valleys.append(j)
            rep_ranges[0] = (j, rep_ranges[0][1])

        elif len(rep_ranges) == 0:
            rep_ranges = [(0,len(signal)-1)]
        print(rep_ranges)


    if plot:
        plt.figure(figsize=(12, 5))
        plt.plot(signal, label='Joint Angle', linewidth=2)
        for s, e in rep_ranges:
            plt.axvspan(s, e, color='green', alpha=0.2)

        plt.scatter(valleys, [signal[i] for i in valleys], color='red', zorder=5)
        plt.scatter(peaks, [signal[i] for i in peaks], color='green', zorder=5)

        plt.axhline(y=depth_thresh, color='red', linestyle='--', label=f'Depth Threshold ({depth_thresh}°)')
        plt.axhline(y=rise_thresh, color='blue', linestyle='--', label=f'Rise Threshold ({rise_thresh}°)')

        plt.title("Rep Detection: Threshold-Based")
        plt.xlabel("Frame")
        plt.ylabel("Joint Angle")
        plt.legend()
        plt.grid(True)
        plt.show()

    return rep_ranges




right_knee = DEADLIFT_TENSORS['deadlift 1 good lift data']['deadlift 1 good right hip']
#right_hip = DEADLIFT_TENSORS['deadlift 6 good lift data']['deadlift 6 good right hip']

rep_ranges = detect_reps_by_threshold(lift_name='deadlift',
                                      joint_tensor=right_knee,
                                      plot=True,
                                      depth_thresh=90,
                                      rise_thresh=140)
rep_ranges

In [None]:
import importlib
import functions
importlib.reload(functions)

deadlift_train_split_reps = functions.split_lifts_into_reps(lift_name = 'deadlift', lift_list = deadlift_train, joint='hip')
squat_train_split_reps = functions.split_lifts_into_reps(lift_name = 'squat', lift_list = squat_train, joint='knee')
bench_train_split_reps = functions.split_lifts_into_reps(lift_name = 'bench', lift_list = bench_train, joint='elbow')

In [None]:
print(f"Split {len(deadlift_train)} videos into {len(deadlift_train_split_reps)} repititions")

In [None]:
plt.figure(figsize=[16,9])
for rep in squat_train_split_reps:
    plt.title(f'Squat knee joint data')
    plt.plot(rep['angles'][f'{rep["viewing from"]} knee'], label=f'{rep["viewing from"]}')
    plt.xlabel('frame')
    ''
    plt.ylabel('angle')
    #plt.legend()
plt.show()

In [None]:

for key in squat_train_split_reps[1]['angles']:
    print(key, len(squat_train_split_reps[1]['angles'][key]))

for key in squat_train_split_reps[1]['points']:
    for dimension in range(len(squat_train_split_reps[1]['points'][key])):
        print(squat_train_split_reps[1]['points'][key][dimension])

plt.plot(squat_train_split_reps[1]['angles']['left knee'])

In [None]:
import torch
import torch.nn.functional as F

def resample(series: torch.Tensor, target_len: int = 60) -> torch.Tensor:
    """
    Linearly resamples a 1D tensor to target_len.
    Input:  shape (T,)
    Output: shape (target_len,)
    """
    series = series.unsqueeze(0).unsqueeze(0).to(torch.float32)  # (1,1,T)
    out = F.interpolate(series, size=target_len, mode='linear', align_corners=True)
    return out.squeeze()

for key in squat_train_split_reps[1]['angles']:
    squat_train_split_reps[1]['angles'][key] = resample(squat_train_split_reps[1]['angles'][key], target_len = 60)

for key in squat_train_split_reps[1]['angles']:
    print(key, len(squat_train_split_reps[1]['angles'][key]))

plt.plot(squat_train_split_reps[1]['angles']['left knee'])

In [None]:
for landmark in squat_train_split_reps[1]['points']:
    for dimension in range(len(squat_train_split_reps[1]['points'][landmark])):
        print(squat_train_split_reps[1]['points'][landmark])

In [None]:
# Normalize squat joints
for repetition in range(len(squat_train_split_reps)):
    for key in squat_train_split_reps[repetition]['angles']:
        squat_train_split_reps[repetition]['angles'][key] = resample(squat_train_split_reps[repetition]['angles'][key], target_len = 60)

for repetition in squat_train_split_reps:
    for key in repetition['points']:
        for dim in repetition['points'][key]:
            dim = resample(dim, target_len = 60)

In [None]:
for repetition in range(len(deadlift_train_split_reps)):
    for key in deadlift_train_split_reps[repetition]['angles']:
        deadlift_train_split_reps[repetition]['angles'][key] = resample(deadlift_train_split_reps[repetition]['angles'][key], target_len = 60)

for repetition in deadlift_train_split_reps:
    for key in repetition['points']:
        for dim in repetition['points'][key]:
            dim = resample(dim, target_len = 60)

In [None]:
for repetition in range(len(bench_train_split_reps)):
    for key in bench_train_split_reps[repetition]['angles']:
        bench_train_split_reps[repetition]['angles'][key] = resample(bench_train_split_reps[repetition]['angles'][key], target_len = 60)

for repetition in bench_train_split_reps:
    for key in repetition['points']:
        for dim in repetition['points'][key]:
            dim = resample(dim, target_len = 60)

In [None]:
# Plot squat point data
#
# plt.figure(figsize=[8,6])
# for repetition in squat_train_split_reps:
#
#     for key in repetition['points']:
#         for dim in repetition['points'][key]:
#
#             plt.title(key)
#             plt.plot(dim)
#         plt.show()

In [None]:
plt.figure(figsize=[8,6])
for rep in squat_train_split_reps:
    plt.title(f'Squat knee joint data')
    plt.plot(rep['angles'][f'{rep["viewing from"]} knee'], label=f'{rep["viewing from"]}')
    plt.xlabel('frame')
    ''
    plt.ylabel('angle')
    plt.ylim(0,180)
    #plt.legend()
plt.show()

# i = 1
# for rep in squat_train_split_reps:
#     plt.figure(figsize=[8,6])
#     plt.title(f'Squat knee joint data ( rep {i})')
#     plt.plot(rep['angles'][f'{rep["viewing from"]} knee'], label=f'{rep["viewing from"]}')
#     plt.xlabel('frame')
#     ''
#     plt.ylabel('angle')
#     #plt.legend()
#     plt.show()
#     i += 1

In [None]:
import torch

# KNEE ---------------------------------------------------
squat_knee_all_reps = []
for rep in squat_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} knee']  # this is already a tensor
    squat_knee_all_reps.append(series)

# ---------------------------------------------------------

# HIP ---------------------------------------------------
squat_hip_all_reps = []
for rep in squat_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} hip']  # this is already a tensor
    squat_hip_all_reps.append(series)

# ---------------------------------------------------------

# ELBOW ---------------------------------------------------
squat_elbow_all_reps = []
for rep in squat_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} elbow']  # this is already a tensor
    squat_elbow_all_reps.append(series)

# ---------------------------------------------------------

# SHOULDER ---------------------------------------------------
squat_shoulder_all_reps = []
for rep in squat_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} shoulder']  # this is already a tensor
    squat_shoulder_all_reps.append(series)
# ---------------------------------------------------------

squat_knee_angles_matrix = torch.stack(squat_knee_all_reps)  # shape (num_reps, num_frames)
squat_hip_angles_matrix = torch.stack(squat_hip_all_reps)  # shape (num_reps, num_frames)
squat_elbow_angles_matrix = torch.stack(squat_elbow_all_reps)  # shape (num_reps, num_frames)
squat_shoulder_angles_matrix = torch.stack(squat_shoulder_all_reps)  # shape (num_reps, num_frames)

mean, lower, upper = functions.compute_normal_credible_interval(squat_knee_angles_matrix)

In [None]:
# KNEE ---------------------------------------------------
deadlift_knee_all_reps = []
for rep in deadlift_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} knee']  # this is already a tensor
    deadlift_knee_all_reps.append(series)

# ---------------------------------------------------------

# HIP ---------------------------------------------------
deadlift_hip_all_reps = []
for rep in deadlift_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} hip']  # this is already a tensor
    deadlift_hip_all_reps.append(series)

# ---------------------------------------------------------

# ELBOW ---------------------------------------------------
deadlift_elbow_all_reps = []
for rep in deadlift_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} elbow']  # this is already a tensor
    deadlift_elbow_all_reps.append(series)

# ---------------------------------------------------------

# SHOULDER ---------------------------------------------------
deadlift_shoulder_all_reps = []
for rep in deadlift_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} shoulder']  # this is already a tensor
    deadlift_shoulder_all_reps.append(series)
# ---------------------------------------------------------

deadlift_knee_angles_matrix = torch.stack(deadlift_knee_all_reps)  # shape (num_reps, num_frames)
deadlift_hip_angles_matrix = torch.stack(deadlift_hip_all_reps)  # shape (num_reps, num_frames)
deadlift_elbow_angles_matrix = torch.stack(deadlift_elbow_all_reps)  # shape (num_reps, num_frames)
deadlift_shoulder_angles_matrix = torch.stack(deadlift_shoulder_all_reps)  # shape (num_reps, num_frames)

In [None]:
# KNEE ---------------------------------------------------
bench_knee_all_reps = []
for rep in bench_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} knee']  # this is already a tensor
    bench_knee_all_reps.append(series)

# ---------------------------------------------------------

# HIP ---------------------------------------------------
bench_hip_all_reps = []
for rep in bench_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} hip']  # this is already a tensor
    bench_hip_all_reps.append(series)

# ---------------------------------------------------------

# ELBOW ---------------------------------------------------
bench_elbow_all_reps = []
for rep in bench_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} elbow']  # this is already a tensor
    bench_elbow_all_reps.append(series)

# ---------------------------------------------------------

# SHOULDER ---------------------------------------------------
bench_shoulder_all_reps = []
for rep in bench_train_split_reps:
    side = rep["viewing from"]
    series = rep['angles'][f'{side} shoulder']  # this is already a tensor
    bench_shoulder_all_reps.append(series)
# ---------------------------------------------------------

bench_knee_angles_matrix = torch.stack(bench_knee_all_reps)  # shape (num_reps, num_frames)
bench_hip_angles_matrix = torch.stack(bench_hip_all_reps)  # shape (num_reps, num_frames)
bench_elbow_angles_matrix = torch.stack(bench_elbow_all_reps)  # shape (num_reps, num_frames)
bench_shoulder_angles_matrix = torch.stack(bench_shoulder_all_reps)  # shape (num_reps, num_frames)

In [None]:
squat_knee_samples = functions.run_mcmc(squat_knee_angles_matrix)
squat_hip_samples = functions.run_mcmc(squat_hip_angles_matrix)
squat_elbow_samples = functions.run_mcmc(squat_elbow_angles_matrix)
squat_shoulder_samples = functions.run_mcmc(squat_shoulder_angles_matrix)
deadlift_knee_samples = functions.run_mcmc(deadlift_knee_angles_matrix)
deadlift_hip_samples = functions.run_mcmc(deadlift_hip_angles_matrix)
deadlift_elbow_samples = functions.run_mcmc(deadlift_elbow_angles_matrix)
deadlift_shoulder_samples = functions.run_mcmc(deadlift_shoulder_angles_matrix)
bench_knee_samples = functions.run_mcmc(bench_knee_angles_matrix)
bench_hip_samples = functions.run_mcmc(bench_hip_angles_matrix)
bench_elbow_samples = functions.run_mcmc(bench_elbow_angles_matrix)
bench_shoulder_samples = functions.run_mcmc(bench_shoulder_angles_matrix)

In [None]:
functions.summarize_and_plot(squat_shoulder_samples, data = squat_knee_angles_matrix, title="squat_shoulder_samples")
functions.summarize_and_plot(squat_elbow_samples, data = squat_knee_angles_matrix, title="squat_elbow_samples")
functions.summarize_and_plot(squat_hip_samples, data = squat_knee_angles_matrix, title="squat_hip_samples")
functions.summarize_and_plot(squat_knee_samples, data = squat_knee_angles_matrix, title="squat_knee_samples")
functions.summarize_and_plot(deadlift_shoulder_samples, data = squat_knee_angles_matrix, title="deadlift_shoulder_samples")
functions.summarize_and_plot(deadlift_elbow_samples, data = squat_knee_angles_matrix, title="deadlift_elbow_samples")
functions.summarize_and_plot(deadlift_hip_samples, data = squat_knee_angles_matrix, title="deadlift_hip_samples")
functions.summarize_and_plot(deadlift_knee_samples, data = squat_knee_angles_matrix, title="deadlift_knee_samples")
functions.summarize_and_plot(bench_shoulder_samples, data = squat_knee_angles_matrix, title="bench_shoulder_samples")
functions.summarize_and_plot(bench_elbow_samples, data = squat_knee_angles_matrix, title="bench_elbow_samples")
functions.summarize_and_plot(bench_hip_samples, data = squat_knee_angles_matrix, title="bench_hip_samples")
functions.summarize_and_plot(bench_knee_samples, data = squat_knee_angles_matrix, title="bench_knee_samples")

In [None]:
np.savez(
    "lift_samples.npz",
    squat_knee=squat_knee_samples,
    squat_hip=squat_hip_samples,
    squat_elbow=squat_elbow_samples,
    squat_shoulder=squat_shoulder_samples,
    deadlift_knee=deadlift_knee_samples,
    deadlift_hip=deadlift_hip_samples,
    deadlift_elbow=deadlift_elbow_samples,
    deadlift_shoulder=deadlift_shoulder_samples,
    bench_knee=bench_knee_samples,
    bench_hip=bench_hip_samples,
    bench_elbow=bench_elbow_samples,
    bench_shoulder=bench_shoulder_samples
)