In [4]:
import glob
import os
import tsdf

import datetime
import dateutil.parser as parser

from paradigma.imu_preprocessing import IMUPreprocessingConfig, resample_data, transform_time_array
from paradigma.constants import TimeUnit, DataColumns
from paradigma.windowing import create_segments
from paradigma.feature_extraction import extract_temporal_domain_features, extract_spectral_domain_features


from paradigma.imu_preprocessing import butterworth_filter
from paradigma.gait_analysis_config import GaitFeatureExtractionConfig
import numpy as np

def meta_dict_single_subject(data_path, meta_file_name):
    meta_dict = {}
    l_segments = glob.glob(os.path.join(data_path, meta_file_name))
    meta_dict['segments'] = sorted(l_segments, reverse=False)
    meta_dict['file_names'] = []
    for name in meta_dict['segments']:
        meta_dict['file_names'].append(os.path.basename(name))
        meta_dict['n_files'] = len(meta_dict['file_names'])

    return meta_dict

def load_tsdf_data(path_to_data, meta_filename, values_filename, time_filename):

    with open(os.path.join(path_to_data, meta_filename)) as f:       
        metadata_dict = tsdf.load_metadata_legacy_file(f)

    metadata_values = metadata_dict[values_filename]
    metadata_time = metadata_dict[time_filename]

    df = tsdf.load_dataframe_from_binaries([metadata_time, metadata_values], tsdf.constants.ConcatenationType.columns)

    return df, metadata_values

import pandas as pd

def tabulate_windows(config, df, agg_func='first'):
    """
    Efficiently creates a windowed dataframe from the input dataframe using vectorized operations.
    
    Args:
        df (pd.DataFrame): The input dataframe, where each row represents a timestamp (0.01 sec).
        window_size_s (int): The number of seconds per window.
        step_size_s (int): The number of seconds to shift between windows.
        single_value_cols (list): List of columns where a single value (e.g., mean) is needed.
        list_value_cols (list): List of columns where all 600 values should be stored in a list.
        agg_func (str or function): Aggregation function for single-value columns (e.g., 'mean', 'first').
        
    Returns:
        pd.DataFrame: The windowed dataframe.
    """
    # If single_value_cols or list_value_cols is None, default to an empty list
    if config.single_value_cols is None:
        config.single_value_cols = []
    if config.list_value_cols is None:
        config.list_value_cols = []

    window_length = int(config.window_length_s * config.sampling_frequency)
    window_step_size = int(config.window_step_size_s * config.sampling_frequency)

    n_rows = len(df)
    if window_length > n_rows:
        raise ValueError(f"Window size ({window_length}) cannot be greater than the number of rows ({n_rows}) in the dataframe.")
    
    # Create indices for window start positions 
    window_starts = np.arange(0, n_rows - window_length + 1, window_step_size)
    
    # Prepare the result for the final DataFrame
    result = []
    
    # Handle single value columns with vectorized operations
    agg_func_map = {
        'mean': np.mean,
        'first': lambda x: x[0],
    }

    # Check if agg_func is a callable (custom function) or get the function from the map
    if callable(agg_func):
        agg_func_np = agg_func
    else:
        agg_func_np = agg_func_map.get(agg_func, agg_func_map['mean'])  # Default to 'mean' if agg_func is not recognized

        
    for window_nr, start in enumerate(window_starts, 1):
        end = start + window_length
        window = df.iloc[start:end]

        agg_data = {
            'window_nr': window_nr,
            'window_start': window[config.time_colname].iloc[0],
            'window_end': window[config.time_colname].iloc[-1],
        }
        
        # Aggregate single-value columns
        for col in config.single_value_cols:
            if col in window.columns:  # Only process columns that exist in the window
                agg_data[col] = agg_func_np(window[col].values)
        
        # Collect list-value columns efficiently using numpy slicing
        for col in config.list_value_cols:
            if col in window.columns:  # Only process columns that exist in the window
                agg_data[col] = window[col].values.tolist()

        result.append(agg_data)
    
    # Convert result list into a DataFrame
    windowed_df = pd.DataFrame(result)
    
    # Ensure the column order is as desired: window_nr, window_start, window_end, pre_or_post, and then the rest
    desired_order = ['window_nr', 'window_start', 'window_end'] + config.single_value_cols + config.list_value_cols
    
    return windowed_df[desired_order]

def store_tsdf(df, d_cols_time, d_cols_features, metadata_general, path_output):

    if not os.path.exists(path_output):
        os.makedirs(path_output)

    try:
        metadata_time = metadata_general.copy()
    except AttributeError:
        metadata_time = metadata_general.get_plain_tsdf_dict_copy()
    metadata_time['channels'] = list(d_cols_time.keys())
    metadata_time['units'] = list(d_cols_time.values())
    metadata_time['data_type'] = int

    try:
        metadata_features = metadata_general.copy()
    except AttributeError:
        metadata_features = metadata_general.get_plain_tsdf_dict_copy()
    metadata_features['channels'] = list(d_cols_features.keys())
    metadata_features['units'] = list(d_cols_features.values())
    metadata_features['data_type'] = float

    # create numpy
    data_time = df[d_cols_time.keys()].to_numpy()
    data_features = df[d_cols_features.keys()].to_numpy()

    # write binary
    l_metafiles = []
    l_metafiles.append(tsdf.write_binary_file(path_output, f"values.bin", data_features, metadata_features))
    l_metafiles.append(tsdf.write_binary_file(path_output, f"time.bin", data_time, metadata_time))

    # store metadata
    tsdf.write_metadata(l_metafiles, os.path.join(path_output, f"meta.json"))


In [5]:
subject = 'POMU0053EE359AA64252'
week_nr = '1'

path_sensor_data = os.path.join(r'C:\Users\erik_\Documents\PhD\data\ppp\raw', week_nr, subject)
meta_filename_raw = f'WatchData.IMU.Week{week_nr}.raw_segment*_meta.json'

meta_dict = meta_dict_single_subject(data_path=path_sensor_data, meta_file_name=meta_filename_raw)
try:
    n_files = meta_dict['n_files']
except KeyError:
    print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Could not determine n_files while preprocessing gait. Continuing with next participant.")
else:
    print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Preprocessing gait...")

    l_segment_dfs = []

    n_segments_completed = 0
    for segment_idx in range(1, n_files + 1):
        meta_fullpath = os.path.join(path_sensor_data, meta_dict['file_names'][segment_idx - 1])
        segment_nr = meta_dict['file_names'][segment_idx - 1].split('_')[1][-4:]

        meta_filename_raw = f'WatchData.IMU.Week{week_nr}.raw_segment{segment_nr}_meta.json'
        values_filename_raw = meta_filename_raw.replace('_meta.json', '_samples.bin')
        time_filename_raw = meta_filename_raw.replace('_meta.json', '_time.bin')

        df_ts, metadata_ts = load_tsdf_data(path_sensor_data, meta_filename_raw, values_filename_raw, time_filename_raw) 

        if n_segments_completed == 0:
            start_iso_first_segment = parser.parse(metadata_ts.start_iso8601)
            start_iso_this_segment = parser.parse(metadata_ts.start_iso8601)
        else:
            start_iso_this_segment = parser.parse(metadata_ts.start_iso8601)

            gap_to_first_segment_start_ms = (start_iso_this_segment - start_iso_first_segment).total_seconds() * 1000

        end_iso_this_segment = parser.parse(metadata_ts.end_iso8601)

        config = IMUPreprocessingConfig()

        # TODO: Once Paradigma is updated with new axes inversion rules, swap to preprocess_imu_data
        # FROM HERE
        # Rename columns
        df_ts = df_ts.rename(columns={f'rotation_{a}': f'gyroscope_{a}' for a in ['x', 'y', 'z']})
        df_ts = df_ts.rename(columns={f'acceleration_{a}': f'accelerometer_{a}' for a in ['x', 'y', 'z']})

        # Convert to relative seconds from delta milliseconds
        df_ts[config.time_colname] = transform_time_array(
            time_array=df_ts[config.time_colname],
            scale_factor=1000, 
            input_unit_type = TimeUnit.DIFFERENCE_MS,
            output_unit_type = TimeUnit.RELATIVE_MS)
        
        # Decimate and interpolate data
        df_ts = resample_data(
            df=df_ts,
            time_column=config.time_colname,
            time_unit_type=TimeUnit.RELATIVE_MS,
            unscaled_column_names = list(config.d_channels_imu.keys()),
            scale_factors=metadata_ts.scale_factors,
            resampling_frequency=config.sampling_frequency)
        
        # Invert axes to adhere to pdathome standards
        if config.side_watch == 'right':
            df_ts[DataColumns.ACCELEROMETER_X] *= -1
            df_ts[DataColumns.GYROSCOPE_Y] *= -1
            df_ts[DataColumns.GYROSCOPE_Z] *= -1

        split_segment_len = 3000000

        # Filter accelerometer data
        for col in config.d_channels_accelerometer.keys():

            # Change to correct units [g]
            if config.acceleration_units == 'm/s^2':
                df_ts[col] /= 9.81

            for result, side_pass in zip(['filt', 'grav'], ['hp', 'lp']):
                df_ts[f'{result}_{col}'] = butterworth_filter(
                    single_sensor_col=np.array(df_ts[col]),
                    order=config.filter_order,
                    cutoff_frequency=config.lower_cutoff_frequency,
                    passband=side_pass,
                    sampling_frequency=config.sampling_frequency,
                    )
                
            df_ts = df_ts.drop(columns=[col])
            df_ts = df_ts.rename(columns={f'filt_{col}': col})

        config = GaitFeatureExtractionConfig()

        n_split_segments = (df_ts.shape[0] // split_segment_len) + 1

        l_split_dfs = []
        n_split_segments_completed = 0

        config.single_value_cols = None
        config.list_value_cols = config.l_accelerometer_cols + config.l_gravity_cols

        
        for split_segment_iter in range(1,n_split_segments+1):
            if n_split_segments > 1:
                df_split = df_ts.iloc[(split_segment_iter-1)*split_segment_len:(split_segment_iter)*split_segment_len, :].copy()
            else:
                df_split = df_ts

            df_split['segment_nr'] = create_segments(df=df_split, time_column_name='time', gap_threshold_s=1.5)

            if n_segments_completed>0 and n_split_segments_completed==0:
                df_split['segment_nr'] += max_segment_nr
            elif n_split_segments_completed>0:
                df_split['segment_nr'] += max_split_segment_nr

            max_split_segment_nr = df_split['segment_nr'].max()

            l_dfs_segments = []
            df_split_copy = df_split.copy()

            for segment_nr in df_split_copy['segment_nr'].unique():
                df_split = df_split_copy.loc[df_split_copy['segment_nr']==segment_nr].copy().reset_index(drop=True)
                df_windows_segment = tabulate_windows(
                    df=df_split, config=config,
                )
                df_windows_segment['segment_nr'] = segment_nr

                l_dfs_segments.append(df_windows_segment)

            if len(l_dfs_segments) > 1:
                df_windows = pd.concat(l_dfs_segments).reset_index(drop=True)
            elif len(l_dfs_segments) == 1:
                df_windows = l_dfs_segments[0]
            else:
                print("NO")

            del df_split, df_split_copy

            if df_windows is None:
                print("A")
            elif df_windows.shape[0] == 0:
                print("B")

            df_windows = df_windows.reset_index(drop=True)  

            df_windows = extract_temporal_domain_features(config=config, df_windowed=df_windows, l_gravity_stats=['mean', 'std'])

            df_windows = extract_spectral_domain_features(config=config, df_windowed=df_windows, sensor='accelerometer', l_sensor_colnames=config.l_accelerometer_cols)

            print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Split segment {segment_idx}.{split_segment_iter} Features extracted.")

            for col in df_windows.columns:
                if pd.isna(df_windows[col]).any():
                    df_windows[col] = df_windows[col].fillna(0)

            l_split_dfs.append(df_windows)

            n_split_segments_completed += 1

            print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Split segment {segment_idx}.{split_segment_iter} finished.")

        print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Segment {segment_idx} # split segments to concatenate: {len(l_split_dfs)}.")

        if len(l_split_dfs) > 1:
            df = pd.concat(l_split_dfs)
        elif len(l_split_dfs) == 1:
            df = l_split_dfs[0]
        else:
            print("C")

        df['time'] *= 1000

        if n_segments_completed>0:
            df['time'] += gap_to_first_segment_start_ms

        max_segment_nr = df['segment_nr'].max()

        l_segment_dfs.append(df)
    
        n_segments_completed += 1

        print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Segment {segment_idx} finished.")

    print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: # segments to concatenate: {len(l_segment_dfs)}")

    if len(l_segment_dfs) == 0:
        print("D")
    elif len(l_segment_dfs) > 1:
        df = pd.concat(l_segment_dfs)
    else:
        df = l_segment_dfs[0]

    df = df.reset_index(drop=True)

    print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Storing tsdf.")

    metadata = {
        'subject_id': subject,
        'study_id': 'PPP',
        'device_id': 'Verily',
        'endianness': 'little',
        'metadata_version': '0.1',
        'side_watch': 'left',
        'start_iso8601': str(start_iso_first_segment.isoformat()),
        'end_iso8601': str(end_iso_this_segment.isoformat()),
        'window_size_sec': config.window_length_s,
        'step_size_sec': config.window_step_size_s
    }

    # store_tsdf(df, d_cols_time, d_cols_features_gait, metadata, path_output)

    print(f"Week {week_nr} - ID {subject} - Datetime {datetime.datetime.now()}: Gait preprocessed.")

Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:54:17.366028: Preprocessing gait...
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:55:38.413769: Split segment 1.1 Features extracted.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:55:38.430137: Split segment 1.1 finished.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:56:28.747732: Split segment 1.2 Features extracted.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:56:28.761026: Split segment 1.2 finished.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:56:28.761026: Segment 1 # split segments to concatenate: 2.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:56:28.778079: Segment 1 finished.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:57:59.931424: Split segment 2.1 Features extracted.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:57:59.962653: Split segment 2.1 finished.
Week 1 - ID POMU0053EE359AA64252 - Datetime 2024-10-17 14:59:19.93

# Modules

In [1]:
%load_ext autoreload
%autoreload 2

from pdathome.constants import global_constants as gc
from pdathome.classification import train_test_filtering_gait, store_filtering_gait

# Process data

In [2]:
l_classifiers = [gc.classifiers.LOGISTIC_REGRESSION, gc.classifiers.RANDOM_FOREST]
gsearch = False

In [4]:
for subject in gc.participant_ids.L_PD_IDS:
    train_test_filtering_gait(subject, gsearch=gsearch, l_classifiers=l_classifiers)

store_filtering_gait(l_classifiers=l_classifiers)