In [80]:
import os
import pandas as pd
import numpy as np
from scipy.signal import welch, find_peaks
from scipy.stats import skew, kurtosis
from scipy.fft import fft

In [85]:
# TODO: specify path to folder containing training data
# directory = 'path/Individuals'
directory = '../Data/Participants'

In [82]:
def smoothen(df, window_size_ms = 100):
    """
    Applies a sliding window to each column in the given DataFrame.
    """
    # 100 ms equates to sliding window of 5 samples
    sampling_interval_ms = 20
    window_size_samples = int(window_size_ms / sampling_interval_ms)
    return df.rolling(window=window_size_samples, min_periods=1).mean()

In [4]:
def get_borg_df(folder_path, timestamps):
    """
    Given the 'folder_path' to the participant's data, interpolate the borg results linearly according to the timestamps.
    """
    df = pd.read_csv(folder_path + '/borg.csv', index_col=0, parse_dates=True)
    min_time = df.index.min()
    df.index = ((df.index - min_time).total_seconds() * 1000).rename('Timestamp (ms)')
    extra_indices = [i for i in list(df.index) if i not in list(timestamps)]
    df_reindexed = df.reindex(sorted(list(timestamps) + extra_indices))
    df_interpolated = df_reindexed.interpolate(method='linear')
    df_interpolated.drop(extra_indices, inplace=True)
    return df_interpolated

In [57]:
def extract_features(data):
    features = []
    features.append(np.mean(data))                           # mean
    features.append(np.std(data))                            # standard deviation
    features.append(skew(data))                              # skewness
    features.append(kurtosis(data))                          # kurtosis
    features.append(np.max(data) - np.min(data))             # range
    features.append(np.max(data))                            # maximum
    features.append(np.min(data))                            # minimum
    features.append(np.sqrt(np.mean(data**2)))               # root mean square
    features.append(np.corrcoef(data[:-1], data[1:])[0, 1])  # lag 1 autocorrelation

    # power spectral density
    nperseg = min(256, len(data))
    f, Pxx = welch(data, nperseg=nperseg)
    features.append(np.sum(Pxx))                             # total power

    # fast fourier transform
    fft_values = np.abs(fft(data))
    fft_freqs = np.fft.fftfreq(len(fft_values))
    peaks, _ = find_peaks(fft_values)                        # dominant frequency
    features.append(fft_freqs[peaks[0]] if peaks.size > 0 else 0)      
    return features

In [6]:
def extract_feature_matrix(data_segment):
    feature_matrix = []
    for col_index in range(data_segment.shape[1]):
        feature_matrix.append(extract_features(data_segment[:, col_index]))
    return np.array(feature_matrix).flatten()

In [83]:
# TODO: specify path to folder where you want to save features data
target_directory = '../Data/Features Data'

In [84]:
# TODO: run this to save the features data into the specified target folder
individuals_folders = [folder for folder in sorted(os.listdir(directory)) if 'individual' in folder]
for individual_folder in individuals_folders:
    experiment_folders = [folder for folder in sorted(os.listdir(f'{directory}/{individual_folder}')) if 'experiment' in folder]
    for i in range(len(experiment_folders)):
        experiment_folder = experiment_folders[i]
        folder_path = f'{directory}/{individual_folder}/{experiment_folder}'
        new_folder = f'{target_directory}/{individual_folder}/{experiment_folder}'
        if not os.path.exists(new_folder):
            os.makedirs(new_folder, exist_ok=True)
            imu_df = pd.read_csv(f'{folder_path}/imu_data.csv', index_col=0)
            timestamps = list(imu_df.index)
            borg_df = get_borg_df(folder_path, timestamps)
            repetitions_df = pd.read_csv(f'{folder_path}/repetitions.csv')
            experiment_X, experiment_y = [], []
            for idx, (start, end) in repetitions_df.iterrows():
                start_ms = timestamps[start]
                end_ms = timestamps[end]
                data_segment = imu_df.loc[start_ms : end_ms].values
                borg = borg_df.loc[start_ms : end_ms].fatigue.mean()
                feature_matrix = extract_feature_matrix(data_segment)
                if idx == 0:
                    normalization_matrix = np.where(feature_matrix == 0, 1e-8, feature_matrix)
                feature_matrix = feature_matrix / normalization_matrix
                experiment_X.append(feature_matrix)
                experiment_y.append(borg)
            np.save(f'{new_folder}/X.npy', np.array(experiment_X), allow_pickle=True)
            np.save(f'{new_folder}/y.npy', np.array(experiment_y), allow_pickle=True)