In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import sys

sys.path.append("../..")
from src.utils.constants import OPENPOSE_ANGLES

In [None]:
import warnings

warnings.filterwarnings("ignore", category=FutureWarning, message=".*")

In [None]:
joints_squat_test_df = pd.read_csv("../../data/test/squat/joints.csv")
joints_squat_train_df = pd.read_csv("../../data/train/squat/joints.csv")

joints_df = pd.concat([joints_squat_train_df, joints_squat_test_df])

In [None]:
for _, rep in joints_df.groupby(["label", "rep"]):
    rep = rep.reset_index()
    if rep["label"].loc[0] == "correct":
        correct = rep
    else:
        incorrect = rep

In [None]:
from src.utils.data import joints_rep_df_to_numpy, get_angles_from_joints


correct_joints = joints_rep_df_to_numpy(correct)
correct_angles = get_angles_from_joints(correct_joints, OPENPOSE_ANGLES)

incorrect_joints = joints_rep_df_to_numpy(incorrect)
incorrect_angles = get_angles_from_joints(incorrect_joints, OPENPOSE_ANGLES)

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(14, 5))
axes[0].plot(correct_angles.values)
axes[1].plot(incorrect_angles.values);

In [None]:
rep_signal = correct_angles.mean(axis=1)
zero_point = np.mean(rep_signal)

In [None]:
def segment_signal(
    x: pd.DataFrame, important_angles: list[str], sliding_window_size: int = 5
) -> pd.DataFrame:
    x = x[important_angles]
    rep_signal = x.mean(axis=1)
    zero_point = np.mean(rep_signal)

    variances = []
    for idx in range(0, len(rep_signal) - sliding_window_size + 1, 1):
        window = rep_signal[idx : idx + sliding_window_size]
        variances.append(np.std(window))
    variances = np.array(variances)

    below_mean_indexes = np.where(rep_signal < zero_point)[0]
    above_mean_indexes = np.where(rep_signal > zero_point)[0]

    mid_phase_idx = (
        below_mean_indexes[np.argmin(variances[below_mean_indexes])]
        + sliding_window_size // 2
    )
    above_mean_indexes_left = above_mean_indexes[above_mean_indexes < mid_phase_idx]
    start_phase_idx = (
        above_mean_indexes_left[np.argmin(variances[above_mean_indexes_left])]
        + sliding_window_size // 2
    )

    above_mean_indexes_right = above_mean_indexes[above_mean_indexes > mid_phase_idx]
    above_mean_indexes_right = above_mean_indexes_right[
        above_mean_indexes_right < len(variances)
    ]
    finish_phase_idx = (
        above_mean_indexes_right[np.argmin(variances[above_mean_indexes_right])]
        + sliding_window_size // 2
    )
    return x.iloc[[start_phase_idx, mid_phase_idx, finish_phase_idx]]

In [None]:
import yaml

with open(f"../../configs/squat.yaml", "r") as file:
    file_data = yaml.safe_load(file)
    important_angles = file_data["important_angles"]
    reference_table = pd.DataFrame(file_data["reference_table"]).transpose()

In [None]:
phases_names = reference_table.index.values

In [None]:
phases = segment_signal(correct_angles, important_angles)

In [None]:
phases

In [None]:
reference_table = reference_table.reset_index(drop=True)
reference_table

In [None]:
phases = phases.reset_index(drop=True)
phases

In [None]:
results = reference_table - phases

In [None]:
results

In [None]:
results["phase"] = phases_names
results = results.set_index("phase")

In [None]:
results

In [None]:
from tslearn.metrics import dtw_path

reference = correct_angles["left_hip"]
query = incorrect_angles["left_hip"]

path, score = dtw_path(reference, query)
path = np.array(path)

In [None]:
reference_to_query = path[:, 0]
query_to_reference = path[:, 1]

In [None]:
def filter_repetable_reference_indexes(
    referene_to_query: np.ndarray, query_to_refernce: np.ndarray
) -> np.ndarray:
    query_to_refernce_cp = query_to_refernce.copy()

    for idx in range(len(referene_to_query) - 1, -1, -1):
        if idx > 0 and referene_to_query[idx] == referene_to_query[idx - 1]:
            query_to_refernce_cp = np.delete(query_to_refernce_cp, idx)

    return query_to_refernce_cp

In [None]:
query_to_reference_idx = filter_repetable_reference_indexes(
    reference_to_query, query_to_reference
)
query_to_reference_warped = np.array([query[idx] for idx in query_to_reference_idx])

In [None]:
plt.plot(reference - query_to_reference_warped)
score

## statistical classification

In [None]:
EXERCISES = ("squat", "lunges", "plank")

In [None]:
train_datasets = {}
test_datasets = {}

for exercise in EXERCISES:
    train_datasets[exercise] = pd.read_csv(f"../../data/train/{exercise}/angles.csv")
    test_datasets[exercise] = pd.read_csv(f"../../data/test/{exercise}/angles.csv")

In [None]:
reference_tables = {}
important_angles = {}

for exercise in EXERCISES:
    with open(f"../../configs/{exercise}.yaml", "r") as file:
        file_data = yaml.safe_load(file)
        important_angles[exercise] = file_data["important_angles"]
        reference_tables[exercise] = pd.DataFrame(
            file_data["reference_table"]
        ).transpose()

In [None]:
from src.utils.data import segment_signal


def statistical_classification(
    query_angles: pd.DataFrame,
    reference_angles: pd.DataFrame,
    important_angles: dict,
    threshold: float = 20.0,
) -> bool:
    phases_names = reference_angles.index.values
    query_angles = query_angles.reset_index(drop=True)

    if len(phases_names) > 1:
        phases = segment_signal(query_angles, important_angles)
    else:
        phases = pd.DataFrame(query_angles.loc[len(query_angles) // 2]).transpose()
    phases.index = phases_names
    results = reference_angles - phases
    results["phase"] = phases_names
    results = results.set_index("phase")

    for _, result in results.iterrows():
        wrong_angles = result.loc[result.abs() > threshold]
        if not wrong_angles.empty:
            return False

    return True

In [None]:
classification_results = {exercise: [] for exercise in EXERCISES}
y = {exercise: [] for exercise in EXERCISES}

for exercise in EXERCISES:
    phases_names = reference_tables[exercise].index.values
    reference_angles = reference_tables[exercise].reset_index(drop=True)

    for (label, rep), df in test_datasets[exercise].groupby(["label", "rep"]):
        y[exercise].append(label)
        result = statistical_classification(
            df, reference_tables[exercise], important_angles[exercise]
        )
        classification_results[exercise].append(result)

In [None]:
for exercise in EXERCISES:
    y[exercise] = [1 if label == "correct" else 0 for label in y[exercise]]
    classification_results[exercise] = [
        1 if label else 0 for label in classification_results[exercise]
    ]

In [None]:
_, class_counts = np.unique(y, return_counts=True)
class_counts = class_counts / len(y)
class_counts

In [None]:
from sklearn.metrics import classification_report, roc_auc_score

for exercise in EXERCISES:
    print(
        f"{exercise}: {classification_report(y[exercise], classification_results[exercise])}"
    )