# Evaluation on EC3D dataset for squat exercise

## Data processing and angles extraction

In [None]:
import pandas as pd
import numpy as np
import yaml
import warnings
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore", category=RuntimeWarning)

In [None]:
dataset = pd.read_pickle("../data/EC3D/data_3D.pickle")

with open("../configs/pose_estimators.yaml") as file:
    open_pose_data = yaml.safe_load(file)["openpose"]

LABELS_COLUMNS = ["exercise", "subject", "label", "rep", "frame"]

In [None]:
labels = dataset["labels"]
poses = dataset["poses"]

### Labels processing

In [None]:
labels_df = pd.DataFrame(labels, columns=LABELS_COLUMNS)
labels_df["frame"] = np.arange(len(labels_df))
labels_df = labels_df[labels_df["exercise"] == "SQUAT"]
labels_df.iloc[:, 2:] = labels_df.iloc[:, 2:].astype("int")

### Poses processing

In [None]:
SQUAT_LABELS = {
    1: "correct",
    2: "feet_too_wide",
    3: "knees_inwards",
    4: "not_low_enough",
    5: "front_bend",
}

In [None]:
def get_rep_frames_from_df(labels_df: pd.DataFrame) -> pd.Grouper:
    """Group dataframe by repetition"""
    groups = labels_df.groupby("subject")

    return [
        rep["frame"].values
        for _, subject_group in groups
        for _, rep in subject_group.groupby("rep")
    ]


def get_df_from_frames(
    labels_df: pd.DataFrame, poses: np.ndarray, label: str
) -> pd.DataFrame:
    """Combine joints representations with labels"""
    frames = get_rep_frames_from_df(labels_df)
    final_reps_df = pd.DataFrame()
    for rep_num, frames_rep in enumerate(frames, start=1):
        rep_3d_joints = poses[frames_rep]

        rep_3d_joints_x = rep_3d_joints[:, 0, :15].reshape(-1)
        rep_3d_joints_y = rep_3d_joints[:, 1, :15].reshape(-1)
        rep_3d_joints_z = rep_3d_joints[:, 2, :15].reshape(-1)
        frames_num = len(rep_3d_joints_x) // 15

        rep = np.full_like(rep_3d_joints_x, rep_num, dtype=int)
        frames = np.repeat(np.arange(frames_num, dtype=int), 15)
        joint_names = np.tile(list(open_pose_data["joints"].values()), frames_num)
        labels = np.full_like(rep_3d_joints_x, label, dtype="<U15")
        final_rep = np.array(
            [
                rep,
                frames,
                rep_3d_joints_x,
                rep_3d_joints_y,
                rep_3d_joints_z,
                joint_names,
                labels,
            ]
        ).T
        final_reps_df = pd.concat(
            [
                final_reps_df,
                pd.DataFrame(
                    final_rep,
                    columns=["rep", "frame", "x", "y", "z", "joint_name", "label"],
                ),
            ],
            axis=0,
        )

    final_reps_df["rep"] = final_reps_df["rep"].astype("int")
    final_reps_df["frame"] = final_reps_df["frame"].astype("int")

    return final_reps_df

In [None]:
joints_data = []
for label_number, label_name in SQUAT_LABELS.items():
    joints_data.append(
        get_df_from_frames(
            labels_df[labels_df["label"] == label_number], poses, label_name
        )
    )

joints_df = pd.concat(joints_data)

### Angles extraction

In [None]:
ANGLE_TYPES = {"3D": [0, 1, 2], "roll": [1, 2], "pitch": [0, 1], "yaw": [0, 2]}


def calculate_angle(
    v1: np.ndarray, v2: np.ndarray, v3: np.ndarray, dims: list = [0, 1, 2]
) -> float:
    if not all(arr.shape == (3,) for arr in (v1, v2, v3)):
        raise ValueError("Input arrays must all be of shape (3,).")
    v1 = v1[dims]
    v2 = v2[dims]
    v3 = v3[dims]

    v21 = v1 - v2
    v23 = v3 - v2

    cosine_angle = np.dot(v21, v23) / (np.linalg.norm(v21) * np.linalg.norm(v23))
    angle = np.arccos(cosine_angle)

    return np.degrees(angle)


def extract_angles_from_joints(
    joints_df: pd.DataFrame, angles_formula: dict
) -> pd.DataFrame:
    angles_data = []
    for (label, rep, frame), rep_data in joints_df.groupby(["label", "rep", "frame"]):
        rep_data = rep_data.reset_index()
        angles = {}
        for angle_name, angle_joints in angles_formula.items():
            joints_3d_positions = rep_data.loc[angle_joints][["x", "y", "z"]].astype(
                "float"
            )
            for angle_dims_name, angle_dims in ANGLE_TYPES.items():
                angles[f"{angle_name}_{angle_dims_name}"] = calculate_angle(
                    *joints_3d_positions.values, angle_dims
                )
        angles_data.append(
            pd.Series(
                {
                    "rep": rep,
                    "frame": frame,
                    **angles,
                    "label": label,
                }
            )
        )
    return pd.DataFrame(angles_data)

In [None]:
angles_df = extract_angles_from_joints(joints_df, open_pose_data["angles"])

In [None]:
angles_df.head()

## Angles comparison

Comparison will be presented between **correct** and **not low enough** labels

### Correct and incorrect samples extraction

In [None]:
def get_sample(df: pd.DataFrame, label: str) -> pd.DataFrame:
    for _, sample in df.groupby(["label", "rep"]):
        if sample["label"].iloc[0] == label:
            return sample.reset_index()
    return pd.DataFrame()

In [None]:
correct_sample = get_sample(angles_df, "correct")
incorrect_sample = get_sample(angles_df, "not_low_enough")

In [None]:
fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(20, 16))

considered_angles = ["left_knee", "right_knee", "left_hip", "right_hip"]

for angle_axes, angle in zip(axes, considered_angles):
    for axis, angle_type in zip(angle_axes, ANGLE_TYPES.keys()):
        axis.plot(correct_sample[f"{angle}_{angle_type}"], label="correct")
        axis.plot(incorrect_sample[f"{angle}_{angle_type}"], label="incorrect")
        axis.legend(fontsize=8)
        axis.set_title(f"{angle} {angle_type}", fontsize=8)
        axis.set_xlabel("frame", fontsize=8)
        axis.set_ylabel("angle", fontsize=8)

It is clearly visible that pitch rotation provides best information for this kind of incorrection analysis. 

### DTW alignment

In [None]:
from tslearn.metrics import dtw_path


def get_warped_frame_indexes(query: np.ndarray, reference: np.ndarray) -> list:
    path, _ = dtw_path(query, reference)
    path = np.array(path)
    return path


def filter_repetable_reference_indexes(
    referene_to_query: np.ndarray, query_to_refernce: np.ndarray
) -> np.ndarray:
    query_to_refernce_cp = query_to_refernce.copy()

    for idx in range(len(referene_to_query) - 1, -1, -1):
        if idx > 0 and referene_to_query[idx] == referene_to_query[idx - 1]:
            query_to_refernce_cp = np.delete(query_to_refernce_cp, idx)

    return query_to_refernce_cp

In [None]:
dtw_alignment = get_warped_frame_indexes(
    incorrect_sample["right_knee_pitch"], correct_sample["right_knee_pitch"]
)
query_to_reference_warping = filter_repetable_reference_indexes(
    dtw_alignment[:, 1], dtw_alignment[:, 0]
)

diffs = [
    correct_sample["right_knee_pitch"][reference_idx]
    - incorrect_sample["right_knee_pitch"][query_idx]
    for reference_idx, query_idx in enumerate(query_to_reference_warping)
]

### DTW diffs visualization

In [None]:
fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(20, 16))

considered_angles = ["left_knee", "right_knee", "left_hip", "right_hip"]

for angle_axes, angle in zip(axes, considered_angles):
    for axis, angle_type in zip(angle_axes, ANGLE_TYPES.keys()):
        angle_key = f"{angle}_{angle_type}"
        dtw_alignment = get_warped_frame_indexes(
            incorrect_sample[angle_key], correct_sample[angle_key]
        )
        query_to_reference_warping = filter_repetable_reference_indexes(
            dtw_alignment[:, 1], dtw_alignment[:, 0]
        )

        diffs = [
            correct_sample[angle_key][reference_idx]
            - incorrect_sample[angle_key][query_idx]
            for reference_idx, query_idx in enumerate(query_to_reference_warping)
        ]
        axis.plot(diffs)
        axis.set_title(f"DTW difference {angle} {angle_type}", fontsize=8)
        axis.set_xlabel("frame", fontsize=8)
        axis.set_ylabel("DTW angle diff", fontsize=8)