# Movement Data – Feature Extraction

## Imports

In [None]:
import json
from pathlib import Path

import pandas as pd
from empkins_macro import feature_extraction
from tqdm.auto import tqdm

from stresspose_analysis.datasets.mainstudy import MainStudyDataset
from stresspose_analysis.feature_extraction.utils import (
    load_generic_feature_dict,
    load_expert_feature_dict,
    remove_na,
)


%matplotlib widget
%load_ext autoreload
%autoreload 2

## Setup Paths

In [None]:
deploy_type = "local"

base_path = Path("../../..")

data_path = Path(json.load(base_path.joinpath("config.json").open(encoding="utf-8"))[deploy_type]["base_path"])
data_path

In [None]:
feature_dict_path = base_path.joinpath("params/feature_dicts_tsst")

output_path = base_path.joinpath("feature_export/movement_features")
output_path.mkdir(parents=True, exist_ok=True)

In [None]:
dataset = MainStudyDataset(base_path=data_path, use_cache=True)
dataset

In [None]:
sampling_rate = 60  # Hz

threshold_gyr = 5  # deg2/s2
window_sec_gyr = 0.5  # sec
overlap_percent_gyr = 0.5  # %

threshold_vel = 5e-5  # m2/s2
window_sec_vel = 0.5  # sec
overlap_percent_vel = 0.5  # %

distance_thres = 0.2  # m

generic_feature_dict = load_generic_feature_dict(feature_dict_path)
expert_feature_dict = load_expert_feature_dict(
    feature_dict_path,
    sampling_rate_hz=sampling_rate,
    threshold_gyr=threshold_gyr,
    window_sec_gyr=window_sec_gyr,
    overlap_percent_gyr=overlap_percent_gyr,
    threshold_vel=threshold_vel,
    window_sec_vel=window_sec_vel,
    overlap_percent_vel=overlap_percent_vel,
    distance_thres=distance_thres,
)

In [None]:
result_dict_expert = {}
result_dict_generic = {}

In [None]:
index_levels = ["subject", "condition"]
system = "xsens"

In [None]:
for subset in tqdm(list(dataset.groupby(index_levels))):
    subject_id = subset.index["subject"][0]
    condition = subset.index["condition"][0]
    
    if (subject_id, condition) in result_dict_expert:
        continue
    print(f"{subject_id} {condition}")

    mocap_data = subset.mocap_data

    expert_features = feature_extraction.extract_expert_features(mocap_data, expert_feature_dict, system=system)
    generic_features = feature_extraction.extract_generic_features(mocap_data, generic_feature_dict, system=system)

    result_dict_expert[(subject_id, condition)] = expert_features
    result_dict_generic[(subject_id, condition)] = generic_features

result_data_expert = pd.concat(result_dict_expert, names=index_levels)
result_data_generic = pd.concat(result_dict_generic, names=index_levels)

In [None]:
movement_data_total = pd.concat((result_data_generic, result_data_expert), axis=0)
movement_data_total = remove_na(movement_data_total)
movement_data_total

## Feature Cleaning

In [None]:
movement_data_cleaned = movement_data_total.unstack(["subject", "condition"])

# drop features that are NaN for any subject
movement_data_cleaned = movement_data_cleaned.dropna(how="any", axis=0)
# drop features that are constant (e.g., 0) for all subjects
std_mask = movement_data_cleaned.std(axis=1) != 0
movement_data_cleaned = movement_data_cleaned.loc[std_mask]

# bring dataframe back in original format
movement_data_cleaned = movement_data_cleaned.stack(["subject", "condition"])
movement_data_cleaned = movement_data_cleaned.reorder_levels(movement_data_cleaned.index.names).sort_index()

movement_data_cleaned

## Export

In [None]:
movement_data_total.to_csv(output_path.joinpath("movement_features.csv"))
movement_data_cleaned.to_csv(output_path.joinpath("movement_features_cleaned.csv"))
movement_data_cleaned.to_csv(output_path.joinpath("movement_features_for_classification.csv"))