In [None]:
import numpy as np
import pickle as pkl
import pandas as pd
from scipy.spatial.transform import Rotation as R
import matplotlib.pyplot as plt
import os
import tol_colors as tc
from pathlib import Path
from scipy.signal import butter, filtfilt
from tqdm import tqdm
from scipy.signal import correlate

# Berechnung

Transformation IMU-Orientierung ins OpenSim KS

$$^{IMU}_{OpenSim}T = ^{OpenSim_y}_{OpenSim}T*^{XKF3hm}_{OpenSim_y}T*^{IMU}_{XKF3hm}T$$

Berechnung der Orientierung zwischen IMU und Marker im OpenSim KS
$$^{IMU}_{Marker}T = ^{OpenSim}_{Marker}T* ^{IMU}_{OpenSim}T$$

Berechnung der Abweichung zwischen IMU und Marker
$$ ^{IMU}_{Marker}T -> AxisAngle -> ||AxisAngle||$$





In [None]:
cset = tc.tol_cset('bright')

In [None]:


def lowpass_filter(data, cutoff, fs, order=4):
    """
    Wendet einen Butterworth-Tiefpassfilter auf eine 1D-Datenreihe an.

    Args:
        data (np.ndarray): Die Eingangsdaten (1D).
        cutoff (float): Grenzfrequenz des Filters (Hz).
        fs (float): Abtastrate der Daten (Hz).
        order (int): Ordnung des Filters.

    Returns:
        np.ndarray: Gefilterte Daten.
    """
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    filtered = filtfilt(b, a, data)
    return filtered


def plot_sample(data, subject_name, exercise_name, start_ind=0, end_ind=-1):
    fig, axs = plt.subplots(3,3, figsize=(15,10), sharey=True, constrained_layout = True)
    axs = axs.flatten()
    for i,  (pos, d) in enumerate(data.items()):
        max_ind = len(d['deviations_angles'])
        deviations = d['deviations_angles']
        errors = list(set(d['error_indices']))
        errors = [x for x in errors if x < max_ind]
        deviations[errors] = 0.0
        time = np.arange(0, len(deviations))[start_ind:end_ind]
        axs[i].plot(time, deviations[start_ind:end_ind])
        # axs[i].plot(d['deviations_angles'])
        # axs[i].scatter(d['error_indices'], d['deviations_angles'][d['error_indices']], color='red', marker='x')
        axs[i].set_title(f'{pos} - \n{d["original_lengths"]}')
        axs[i].set_ylabel('Deviation in Degrees')
        axs[i].set_xlabel('Samples')
    fig.suptitle(f'Deviations for measurement {subject_name} - {exercise_name}')
    plt.show()


def plot_sample_w_filter(data, subject_name, exercise_name, start_ind=0, end_ind=-1, cutoff=5, offset=3):
    fig, axs = plt.subplots(3,3, figsize=(15,10), sharey=True, constrained_layout = True)
    axs = axs.flatten()
    for i,  (pos, d) in enumerate(data.items()):
        R_imu_marker = R.from_quat(d['marker_orientations'])
        R_imu = R.from_quat(d['imu_orientations'])

        q_diff = R_imu_marker[:len(R_imu_marker)-offset] * R_imu[offset:].inv()

        deviations = np.linalg.norm(q_diff.as_rotvec(degrees=True), axis=1)
        max_ind = len(deviations)
        errors = list(set(d['error_indices']))
        errors = [x for x in errors if x < max_ind]
        deviations[errors] = 0.0
        deviations = lowpass_filter(deviations, cutoff, 100)
        time = np.arange(0, len(deviations))[start_ind:end_ind]
        axs[i].plot(time, deviations[start_ind:end_ind])
        # axs[i].plot(d['deviations_angles'])
        # axs[i].scatter(d['error_indices'], d['deviations_angles'][d['error_indices']], color='red', marker='x')
        axs[i].set_title(f'{pos} - \n{d["original_lengths"]}')
        axs[i].set_ylabel('Deviation in Degrees')
        axs[i].set_xlabel('Samples')
    fig.suptitle(f'Deviations for measurement {subject_name} - {exercise_name}')
    plt.show()


def plot_all_w_filter(exercise_name, start_ind=0, end_ind=-1, cutoff=5, offset=3, subject_names=['austra', 'darryl', 'elodie', 'erna', 'etsuko', 'hamit', 'julia', 'jung-hee', 'gregers', 'katee', 'latifah', 'laurel', 'marquise', 'neves', 'rehema', 'rushda', 'yaxkin', 'ziri']):
    fig, axs = plt.subplots(9,1, figsize=(15,60), sharey=True, constrained_layout = True)
    axs = axs.flatten()
    for subject_name in subject_names:
        data_path = base_path / subject_name / exercise_name
        try:
            data = pkl.load(open(data_path / 'imu_marker_deviations_second_version.pkl', 'rb'))
        except:
            continue

        for i,  (pos, d) in enumerate(data.items()):
            R_imu_marker = R.from_quat(d['marker_orientations'])
            R_imu = R.from_quat(d['imu_orientations'])

            q_diff = R_imu_marker[:len(R_imu_marker)-offset] * R_imu[offset:].inv()

            deviations = np.linalg.norm(q_diff.as_rotvec(degrees=True), axis=1)
            max_ind = len(deviations)
            errors = list(set(d['error_indices']))
            errors = [x for x in errors if x < max_ind]
            deviations[errors] = 0.0
            deviations = lowpass_filter(deviations, cutoff, 100)
            time = np.arange(0, len(deviations))[start_ind:end_ind]
            axs[i].plot(time, deviations[start_ind:end_ind], label=subject_name)


    for i,  (pos, d) in enumerate(data.items()):

        axs[i].set_ylabel('Deviation in Degrees')
        axs[i].set_xlabel('Samples')
        axs[i].set_ylim(0, 45)
        axs[i].legend(ncols=3, fontsize='xx-small')
        axs[i].set_title(f'{pos}')
    fig.suptitle(f'Deviations for {exercise_name}')
    plt.show()


def plot_sample_w_offset(data, subject_name, exercise_name, offset, start_ind=0, end_ind=-1):
    fig, axs = plt.subplots(3,3, figsize=(15,10), sharey=True, constrained_layout = True)
    axs = axs.flatten()
    for i,  (pos, d) in enumerate(data.items()):
        R_imu_marker = R.from_quat(d['marker_orientations'])
        R_imu = R.from_quat(d['imu_orientations'])

        q_diff = R_imu_marker[:len(R_imu_marker)-offset] * R_imu[offset:].inv()
        # q_diff = R_imu[offset:] *R_imu_marker[:len(R_imu_marker)-offset].inv()

        deviations = np.linalg.norm(q_diff.as_rotvec(degrees=True), axis=1)
        max_ind = len(deviations)
        errors = list(set(d['error_indices']))
        errors = [x for x in errors if x < max_ind]
        deviations[errors] = 0.0
        time = np.arange(0, len(deviations))[start_ind:end_ind]
        axs[i].plot(time, deviations[start_ind:end_ind])
        # axs[i].plot(d['deviations_angles'])
        # axs[i].scatter(d['error_indices'], d['deviations_angles'][d['error_indices']], color='red', marker='x')
        axs[i].set_title(pos)
        axs[i].set_ylabel('Deviation in Degrees')
        axs[i].set_xlabel('Samples')
    fig.suptitle(f'Deviations for measurement {subject_name} - {exercise_name}')
    plt.show()


def plot_euler_angles(data, subject_name, exercise_name, start_ind, end_ind, offset=0):
    fig, axs = plt.subplots(3,3, figsize=(15,10), constrained_layout = True)
    axs = axs.flatten()
    for i,  (pos, d) in enumerate(data.items()):
        # deviations = d['deviations_angles']
        # errors = list(set(d['error_indices']))
        # deviations[errors] = 0.0

        R_imu_marker = R.from_quat(d['marker_orientations'])
        R_imu_marker = R_imu_marker[:len(R_imu_marker)-offset]
        R_imu = R.from_quat(d['imu_orientations'])[offset:]

        q_diff = R_imu_marker * R_imu.inv()

        deviations = np.linalg.norm(q_diff.as_rotvec(degrees=True), axis=1)
        max_ind = len(d['deviations_angles'])
        errors = list(set(d['error_indices']))
        errors = [x for x in errors if x < max_ind]
        deviations[errors] = 0.0


        time = np.arange(0, len(deviations))[start_ind:end_ind]

        axs[i].plot(time, deviations[start_ind:end_ind], color=cset.purple)

        ax1b = axs[i].twinx()
        marker_orientations = R_imu_marker.as_euler('xyz', degrees=True)
        marker_orientations[errors, :] = [0.0, 0.0, 0.0]
        ax1b.plot(time, marker_orientations[start_ind:end_ind, 0], label='marker x', color=cset.red )
        ax1b.plot(time, marker_orientations[start_ind:end_ind, 1], label='marker y', color=cset.green)
        ax1b.plot(time, marker_orientations[start_ind:end_ind, 2], label='marker z', color=cset.blue)
        imu_orientations = R_imu.as_euler('xyz', degrees=True)
        imu_errors = np.array(errors) + offset
        imu_orientations[errors, :] = [0.0, 0.0, 0.0]
        ax1b.plot(time, imu_orientations[start_ind:end_ind, 0], label='imu x', linestyle='--', color=cset.red)
        ax1b.plot(time, imu_orientations[start_ind:end_ind, 1], label='imu y', linestyle='--', color=cset.green)
        ax1b.plot(time, imu_orientations[start_ind:end_ind, 2], label='imu z', linestyle='--', color=cset.blue)
        ax1b.set_ylabel('Euler Angle in Degrees')
        axs[i].set_title(pos)
        axs[i].set_ylabel('Deviation in Degrees')
        axs[i].set_xlabel('Samples')
    legend_elements = [plt.plot(0,0, color=cset.red, label='marker x')[0],
                          plt.plot(0,0, color=cset.green, label='marker y')[0],
                          plt.plot(0,0, color=cset.blue, label='marker z')[0],
                          plt.plot(0,0, linestyle='--', color=cset.red, label='imu x')[0],
                          plt.plot(0,0, linestyle='--', color=cset.green, label='imu y')[0],
                          plt.plot(0,0, linestyle='--', color=cset.blue, label='imu z')[0]]
    fig.legend(handles=legend_elements, loc='center', bbox_to_anchor=(0.5, -0.05))
    fig.suptitle(f'Deviations for measurement {subject_name} - {exercise_name}')
    plt.show()


def plot_orth_lens(data, subject_name, exercise_name, start_ind, end_ind, offset=0):
    fig, axs = plt.subplots(3,3, figsize=(15,10), constrained_layout = True, sharey=True)
    axs = axs.flatten()
    for i,  (pos, d) in enumerate(data.items()):
        # deviations = d['deviations_angles']
        # errors = list(set(d['error_indices']))
        # deviations[errors] = 0.0

        R_imu_marker = R.from_quat(d['marker_orientations'])
        R_imu_marker = R_imu_marker[:len(R_imu_marker)-offset]
        R_imu = R.from_quat(d['imu_orientations'])[offset:]

        q_diff = R_imu_marker * R_imu.inv()

        deviations = np.linalg.norm(q_diff.as_rotvec(degrees=True), axis=1)
        max_ind = len(d['deviations_angles'])
        errors = list(set(d['error_indices']))
        errors = [x for x in errors if x < max_ind]
        deviations[errors] = 0.0


        time = np.arange(0, len(deviations))[start_ind:end_ind]

        orthogon = d['orthogonality']
        orthogon[errors] = 0.0
        axs[i].plot(time, orthogon[start_ind:end_ind], color=cset.purple)
        axs[i].set_ylim(-0.3, 0.3)
        ax1b = axs[i].twinx()


        len_x_axis = d['len_x_axis']
        len_x_axis[errors] = 0.0
        len_y_axis = d['len_y_axis']
        len_y_axis[errors] = 0.0
        ax1b.plot(time, len_x_axis[start_ind:end_ind], label='marker x', color=cset.red, linestyle='--' )
        ax1b.plot(time, len_y_axis[start_ind:end_ind], label='marker x', color=cset.green, linestyle='--' )
        ax1b.set_ylabel('Length in mm')
        ax1b.set_ylim(0, 100)
        axs[i].set_title(pos)
        axs[i].set_ylabel('Dot-Product')
        axs[i].set_xlabel('Samples')
    legend_elements = [plt.plot(0,0, color=cset.purple, label='dot product')[0],
                       plt.plot(0,0, linestyle='--', color=cset.green, label='length marker y')[0],
                       plt.plot(0,0, linestyle='--', color=cset.red, label='length marker x')[0]]
    fig.legend(handles=legend_elements, loc='center', bbox_to_anchor=(0.5, -0.05))
    # legend_elements = [plt.plot(0,0, color=cset.red, label='marker x')[0],
    #                    plt.plot(0,0, color=cset.green, label='marker y')[0],
    #                    plt.plot(0,0, color=cset.blue, label='marker z')[0],
    #                    plt.plot(0,0, linestyle='--', color=cset.red, label='imu x')[0],
    #                    plt.plot(0,0, linestyle='--', color=cset.green, label='imu y')[0],
    #                    plt.plot(0,0, linestyle='--', color=cset.blue, label='imu z')[0]]
    # fig.legend(handles=legend_elements, loc='center', bbox_to_anchor=(0.5, -0.05))
    fig.suptitle(f'Deviations for measurement {subject_name} - {exercise_name}')
    plt.show()


def read_trc_file(filename):

    if not os.path.exists(filename):
        print('file does not exist')

    # read all lines
    file_id = open(filename, 'r')
    all_lines = file_id.readlines()
    file_id.close()

    marker_names = all_lines[3].split('\t')
    marker_names.remove('Frame#')
    marker_names.remove('Time')

    if '\n' in marker_names:
        marker_names.remove('\n')

    marker_names = [x for x in marker_names if x != '']
    col_names = ['Time']
    for m in marker_names:
        col_names.append(f'{m}_x')
        col_names.append(f'{m}_y')
        col_names.append(f'{m}_z')

    data = []
    line_idx = 5
    while line_idx < all_lines.__len__():
        d = all_lines[line_idx].split('\t')[1:]
        if len(d) != len(col_names):
            raise ValueError(f'Number of columns in line {line_idx} does not match number of marker columns')
        data.append(d)
        line_idx += 1

    measured_data = pd.DataFrame(data=data['deviations_angles'], columns=col_names).astype('float', errors='ignore')
    return measured_data

In [None]:
base_path = Path('/Users/andi/Downloads/Data2')

subject_name = 'darryl'
exercise_name = 'ng'

start_ind = 0
end_ind = -1
data_path = base_path / subject_name / exercise_name
data = pkl.load(open(data_path / 'imu_marker_deviations_second_version.pkl', 'rb'))
# plot_sample(data, subject_name, exercise_name, start_ind, end_ind)
# plot_sample_w_offset(data, subject_name, exercise_name, 3, start_ind, end_ind)
# # plot_orth_lens(data, subject_name, exercise_name, start_ind, end_ind, offset=0)
# plot_sample_w_filter(data, subject_name, exercise_name, cutoff=0.5, offset=3)


# other possibilities: darryl, elodie, erna, etsuko
# subject_names = ['austra', 'darryl', 'elodie', 'erna', 'etsuko', 'gregers', 'hamit', 'hans', 'julia', 'jung-hee', 'katee', 'latifah', 'laurel', 'marquise', 'neves', 'rehema', 'rushda', 'yaxkin', 'ziri']
# for subject_name in subject_names:
#    # try:
#     data_path = base_path / subject_name / exercise_name
#     data = pkl.load(open(data_path / 'imu_marker_deviations_second_version.pkl', 'rb'))
#     # plot_sample(data, subject_name, exercise_name, start_ind, end_ind)
#     # plot_sample_w_offset(data, subject_name, exercise_name, 3, start_ind, end_ind)
#     plot_sample_w_filter(data, subject_name, exercise_name, cutoff=0.5, offset=3)
#    # except:
#    #     print('file not found')

plot_all_w_filter(exercise_name, start_ind, end_ind, cutoff=10, offset=0)


  # Analyse Euler-Angles

In [None]:

# example ausreiser wegen zeitlichem offset : toes_r, austra, ce, ausreiser 25050, 25150
# example ausreiser wegen qtm, marker verdeckt durch hand: femur_r, austra, ce, ausreiser  11050, 11150
# example verdreht -> julia, femur_r,
plot_euler_angles(data, subject_name, exercise_name, 10000, 10200, offset=0)

# Berechnung Korrelation

In [None]:
base_path = Path('/Users/andi/Downloads/Data')

subject_name = 'darryl'
exercise_name = 'gwo'

data_path = base_path / subject_name / exercise_name
data = pkl.load(open(data_path / 'imu_marker_deviations_second_version.pkl', 'rb'))

In [None]:
def remove_jumps_from_euler_angle(
        angle_series: np.ndarray,
) -> np.ndarray:
    """
    Remove discontinuities (jumps > 330°) from Euler angle sequences.

    Parameters
    ----------
    angle_series : np.ndarray
        Array of shape (T,) with Euler angle values.

    Returns
    -------
    np.ndarray
        Smoothed angle array with jumps corrected.
    """
    t_diff = np.diff(angle_series, n=1)
    jump_indices = np.argwhere(np.abs(t_diff) > 330).flatten()
    corrected = np.copy(angle_series)

    for i in range(0, len(jump_indices), 2):
        delta = t_diff[jump_indices[i]]
        start = jump_indices[i] + 1
        if (i + 1) > (len(jump_indices) - 1):
            correction = -360 if delta > 0 else 360
            corrected[start:] += correction
        else:
            end = jump_indices[i + 1] + 1
            correction = -360 if delta > 0 else 360
            corrected[start:end] += correction

    return corrected

In [None]:
d = data['toes_r']

R_imu_marker = R.from_quat(d['marker_orientations'])
R_imu_marker = R_imu_marker[:len(R_imu_marker)]
R_imu = R.from_quat(d['imu_orientations'])

q_diff = R_imu_marker * R_imu.inv()

max_ind = len(d['deviations_angles'])
errors = list(set(d['error_indices']))
errors = [x for x in errors if x < max_ind]

marker_orientations = R_imu_marker.as_euler('xyz', degrees=True)
marker_orientations[errors, :] = [0.0, 0.0, 0.0]
marker_orientations[:, 0] = remove_jumps_from_euler_angle(marker_orientations[:, 0])
marker_orientations[:, 1] = remove_jumps_from_euler_angle(marker_orientations[:, 1])
marker_orientations[:, 2] = remove_jumps_from_euler_angle(marker_orientations[:, 2])

imu_orientations = R_imu.as_euler('xyz', degrees=True)
imu_orientations[errors, :] = [0.0, 0.0, 0.0]
imu_orientations[:, 0] = remove_jumps_from_euler_angle(imu_orientations[:, 0])
imu_orientations[:, 1] = remove_jumps_from_euler_angle(imu_orientations[:, 1])
imu_orientations[:, 2] = remove_jumps_from_euler_angle(imu_orientations[:, 2])

# sprünge entfernen

In [None]:

# in 1000er Fenstern über eine Achse
chunk_size = 250
corr_coeff = []
for i in tqdm(range(0, len(marker_orientations), chunk_size)):
    marker_chunk = marker_orientations[i:i + chunk_size, 0]
    imu_chunk = imu_orientations[i:i + chunk_size, 0]
    corr_offsets = {}
    for offset in [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8]:
        if offset < 0:
            marker_chunk = marker_chunk[abs(offset):]
            imu_chunk = imu_chunk[:offset]
        elif offset > 0:
            imu_chunk = imu_chunk[offset:]
            marker_chunk = marker_chunk[:-offset]
        corr_offsets[str(offset)] = np.corrcoef(marker_chunk, imu_chunk)[0, 1]

    corr_coeff.append(corr_offsets)




    # offsets draufrechnen -5 -> +5
    # pearson
    # niedrigster pearson
    # graphische evaluierung

#

In [None]:
chunk_size = 250
corr_coeff = []
for i in tqdm(range(0, len(marker_orientations), chunk_size)):
    marker_chunk = marker_orientations[i:i + chunk_size, 0]
    imu_chunk = imu_orientations[i:i + chunk_size, 0]

    marker_chunk = marker_chunk - np.mean(marker_chunk)
    imu_chunk = imu_chunk - np.mean(imu_chunk)

    corr = correlate(marker_chunk, imu_chunk, mode='full')
    lags = np.arange(-len(marker_chunk) + 1, len(imu_chunk))

    lag = lags[np.argmax(corr)]
    corr_coeff.append(lag)



In [None]:
fig, axs = plt.subplots(1, 1)
offset = 3
chunk_size = 250
start = 48
stop = 49
start = 2*chunk_size
stop = 3*chunk_size
time = np.arange(start, stop)
axs.plot(time, marker_orientations[start:stop, 0], label='marker orientations')
axs.plot(time, imu_orientations[(start+offset):(stop+offset), 0], label='imu orientations')
plt.show()