# Class for Windowing Procedure and Peak Detection

## Windowing the test data

The windowing follows a certain procedure (parameters are variable):

>•	Taking a 1 s block of the data

>•	Varying the block length from 1 s to 5 s with an increment of 200 ms (starting point remains the same for all blocks)

>•	Sectioning and feature generation for all blocks

>•	For each block class probabilities are calculated (ML classifier) 

>•	Sliding the starting point with an increment of 200 ms and starting again with a 1 s block varying to 5 s

In [3]:
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib.widgets import Slider, Button
import pandas as pd
import numpy as np
import time
import os
import itertools
import skinematics as skin
from scipy.signal import butter, lfilter
from IPython.display import clear_output
from scipy.ndimage.filters import maximum_filter

In [4]:
def print_progress_func(current_num, max_num, prev_prog, add_info=None):
    '''
    Function to print progress [%] in a loop.
    
    
    Parameters
    ----------
    current_num : int
        Number of the current run in a loop.
        
    max_num : int
        Maximum number of runs in a loop.
        
    prev_prog : int
        Previous progress, to print only if necessary.
        
    add_info : str
        Additional information to print instead of "Progress".
    
    
    Returns
    -------
    int
        Previous progress, important for next run.
    '''
    new_prog = int(current_num/max_num*100)
    
    if new_prog > prev_prog:
        clear_output(wait=True)
        
        if isinstance(add_info, str):
            print(add_info + ' {:3d}%'.format(new_prog))
        else:
            print('Progress: {:3d}%'.format(new_prog))
        
    return new_prog

In [5]:
def rotate_signal(signal_data, axis=0, rot_angle=90, signals=['Acc','Gyr']):
    '''
    Function to rotate signals around x, y or z-axis.
    
    Parameters
    ----------
    signal_data : dict
        Dictionary with the signals in the 'signals' argument as keys.
        The signal arrays must have three columns (x, y, z).
        
    axis : int
        Axis for rotation:
        0, 1 or 2 --> x, y or z
        
    rot_angle : int or float
        Rotation angle in degree.
        
    signals : list of strings
        Names of the signals, which shall be considered for rotation (e.g. ['Acc','Gyr']).


    Returns
    -------
    Dictionary with rotated selected signals.
    (Same structure as input signal dictionary.)

    '''
    # if no signals are given as keys, select all keys of the input dictionary
    if signals is None:
        signals = [*signal_data]
    
    # create rotation matrix
    R = skin.rotmat.R(axis=axis, angle=rot_angle)
    
    # dictionary for rotated data
    rot_signal_data = {}
    
    # rotate the signals
    for sig in signals: 
        rot_signal_data[sig] = (R @ signal_data[sig].T).T
        
    return rot_signal_data

In [6]:
def add_noise_to_signal(signal_data, target_snr_db=20, signals=['Acc','Gyr'], signal_orientations=['x','y','z']):
    '''
    Function to add Additive White Gaussian Noise (AWGN) to all signals with a defined SNR.
    
    Used formulas:
    SNR = P_signal / P_noise
    SNR_db = 10 * log10(P_signal / P_noise)
    SNR_db = P_signal_db - P_noise_db
    
    Parameters
    ----------
    signal_data : dict
        Dictionary with the signals in the 'signals' argument as keys.
        
    target_snr_db : int or float
        Target signal to noise ration in db.
        
    signals : list of strings
        Names of the signals, which shall be considered for rotation (e.g. ['Acc','Gyr']).
        
    signal_orientations : list of strings
        Orientations of the signals (e.g. ['x','y','z']).
    
    
    Returns
    -------
    Dictionary with noisy signals.
    (Same structure as input signal dictionary.)

    '''
    # if no signals are given as keys, select all keys of the input dictionary
    if signals is None:
        signals = [*signal_data]
    
    # dictionary for noisy data
    noisy_signal_data = {}
    
    # adding noise using target SNR
    for sig in signals:
        
        # fill in old values
        noisy_signal_data[sig] = np.zeros(np.shape(signal_data[sig]))
        
        for ii in range(len(signal_orientations)):
            
            # get power of the signal [watts] (with removed offset)
            P_signal_watts = (signal_data[sig][:,ii]-np.mean(signal_data[sig][:,ii])) ** 2
            P_signal_mean_watts = np.mean(P_signal_watts) # get mean
            P_signal_mean_db = 10 * np.log10(P_signal_mean_watts) # convert to db
            
            P_noise_mean_db = P_signal_mean_db - target_snr_db # get corresponding noise power
            P_noise_mean_watts = 10 ** (P_noise_mean_db/10) # convert from db to watts
            noise_mean_std = np.sqrt(P_noise_mean_watts) # std of noise (P_noise_mean_watts is variance)
            
            # generate sample of white noise (power = variance = P_noise_mean_watts)
            noise = np.random.normal(0, noise_mean_std, len(signal_data[sig][:,ii]))
            
            # add noise to original signal
            noisy_signal_data[sig][:,ii] = signal_data[sig][:,ii] + noise

    return noisy_signal_data

In [7]:
def butter_lowpass(cutoff, fs, order=5):
    '''
    Function to get filter coefficients for butterworth filter.
    
    Parameters
    ----------
    cutoff : int or float
        Cutoff-frequency of the applied filter.
    
    fs : int or float
        Sampling rate in Hz.
    
    order : int
        Order of the applied filter.
    
    
    Returns
    -------
    Filter coefficients for butterworth filter (a, b).
    '''
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a


def butter_lowpass_filter(data, cutoff, fs, order=5):
    '''
    Filter data with butterworth filter.
    
    Parameters
    ----------
    data : array or matrix like
        N-dimensional input array (if matrix --> one signal per column).
    
    cutoff : int or float
        Cutoff-frequency of the applied filter.
    
    fs : int or float
        Sampling rate in Hz.
    
    order : int
        Order of the applied filter.
    
    
    Returns
    -------
    Filtered data (matrix or array-like)
    '''
    
    b, a = butter_lowpass(cutoff, fs, order=order) # from scipy.signal
    
    # filter data along one-dimension
    y = lfilter(b, a, data, axis=0) # from scipy.signal
    return y

In [8]:
def get_sensor_data(in_file, 
                    signals=['Acc','Gyr','Mag'], 
                    sampling_rate=256, 
                    start_time=None, 
                    stop_time=None, 
                    skip_rows=0, 
                    sep=',',
                    return_time_array=True,
                    add_info='no info'):
    '''
    Function to read sensor data from a file, in order to return data from selected sensors and time range.
    
    Parameters
    ----------
    in_file: string
        Directory and file name of data (e.g. 'Subject_01/subject01.csv').
    
    signals: list of stings
        Sensor signal abbreviations (have to be equal to the first letters of the data column names!).
    
    sampling_rate: int or float
        Sampling rate of the measured signals in Hz.
    
    start_time : int or float
        Start time for selecting data in sec (if None --> start from beginning).
    
    stop_time : int or float
        Stop time for selecting data in sec (if None --> until end of data).
    
    csv_skiprows : int
        Number of rows to skip for pandas read_csv() function.
    
    csv_separator : char
        Seperator for pandas read_csv() function.
    
    return_time_array : boolean
        If True: out dict has an item (np.array) containing the time (key: "time").
    
    add_info: string
        Additional info to plot if error occurs.
    
    
    Returns
    -------
    Dictionary with selected data and time array [s]
    '''
    
    data = pd.read_csv(in_file, skiprows=skip_rows, sep=sep)
    
    num_steps = np.shape(data.values)[0] # total number of data points
    
    if start_time is None:
        start_index = 0
    else:
        start_index = round(start_time * sampling_rate)
        
    if stop_time is None:
        stop_index = num_steps
    else:
        stop_index = round(stop_time * sampling_rate)
        
    if start_index < 0 or stop_index > num_steps or start_index >= stop_index:
        print('Error at selecting data from given time range. (' + add_info + ')')
        return {}
        
    data_dict = {}
    for signal in signals:
        data_dict[signal] = data.filter(regex=signal+'*').values[start_index:stop_index]
    
    if return_time_array:
        data_dict['time'] = np.arange(num_steps)[start_index:stop_index] / sampling_rate
    
    return data_dict

In [9]:
def signal_windowing_via_indices(test_subject_path,
                                 number_sections=10,
                                 sig_names=['Acc','Gyr'],
                                 signal_orientations=['x','y','z'],
                                 sampling_rate=256,
                                 cutoff=10,
                                 order=6,
                                 win_start_inc=0.2,
                                 win_stretch_inc=0.2,
                                 win_min_len=1,
                                 win_max_len=5,
                                 win_start=0,
                                 win_last_start=None,
                                 print_progress=True,
                                 progress_info='Generate feature map...',
                                 rot_axis=0,
                                 rot_angle=0,
                                 add_noise=False,
                                 target_snr_db=20,
                                 csv_skiprows=0,
                                 csv_separator=','):
    '''
    This function applies a defined windowing procedure in order to split a signal 
    into different sections, which can be then taken as features for machine learning.
    The different section values are determined by taking the index in the middle of
    the corresponding section.
    In order to avoid extreme outliers a butterworth filter is used before sectioning.
    
    Parameters
    ----------
    test_subject_path : str
        Path to the csv-file of the test subject data.
        
    number_sections: int
        Number of sections to split each window.
        
    sig_names : list of strings
        Signal names, used as keys for signal dictionaries.
        
    signal_orientations : list of strings
        Orientations of the signals (e.g. ['x','y','z']).
        
    sampling_rate : int or float
        Sampling rate of the signals.
        
    cutoff : int or float
        Cutoff frequency of the butterworh filter.
        
    order : int
        Order of the butterworth filter.
        
    win_start_inc : int or float
        Start increment for the window [s].
        
    win_stretch_inc : int or float
        Stretch increment for the window [s].
    
    win_min_len : int or float
        Minimum window length [s].
    
    win_max_len : int or float
        Maximum window length [s].
    
    win_start : int or float
        Start time of the window [s].
    
    win_last_start : int or float or None
        Last start time of the window [s].
        If None, set to time where the minimum window length just fits into the sensor data.
    
    print_progress : boolean
        If True --> print progress at feature generation.
    
    progress_info : str
        Information to print with progress.
        
    rot_axis : int or list of int
        Axis for rotation:
        0, 1 or 2 --> x, y or z
        --> if list: sequence of rotations
        (Length of list has to match with the length of rot_angle,
        otherwise the shorter list of the two is taken and all other values are omitted.)
        
    rot_angle : int or float or list of int or float
        Rotation angle in degree.
        --> if list: sequence of rotations
        (Length of list has to match with the length of rot_axis,
        otherwise the shorter list of the two is taken and all other values are omitted.)
        
    add_noise : boolean
        If True --> noise is added to signals.
        
    target_snr_db : int or float
        Signal to noise ratio in db for the generated noisy signals.
    
    csv_skiprows : int
        Number of rows to skip for pandas read_csv() function.
    
    csv_separator : char
        Seperator for pandas read_csv() function.
    
    
    Returns
    -------
    list
        list[0] : numpy.ndarray
            Matrix with sectioned signal data.
                (Number of columns = number of features)
                (Number of rows = number of data points)
        
        list[1] : numpy.ndarray
            Array with all possible window start points for the choosen parameters [s].
        
        list[2] : numpy.ndarray
            Array with all possible window lengths for the choosen parameters [s].
        
        list[3] : int
            Length of the original signals (number of indices).
    '''


    # get data from selected file
    sensor_data = get_sensor_data(in_file=test_subject_path,
                                  signals=sig_names,
                                  sampling_rate=sampling_rate,
                                  skip_rows=csv_skiprows,
                                  sep=csv_separator)
    
    # rotate the signals
    if not isinstance(rot_axis, list): # if not list --> make list
        rot_axis = [rot_axis]
    if not isinstance(rot_angle, list): # if not list --> make list
        rot_angle = [rot_angle]
    # going through all rotation axes and rotation angles
    for current_rot_axis, current_rot_angle in zip(rot_axis, rot_angle):
        # apply rotation only if rotation angle is not zero
        if current_rot_angle != 0:
            sensor_data = rotate_signal(sensor_data, 
                                        axis=current_rot_axis, 
                                        rot_angle=current_rot_angle, 
                                        signals=sig_names)

    # add noise to signal if corresponding parameter is True
    if add_noise is True:
        sensor_data = add_noise_to_signal(sensor_data,
                                          target_snr_db=target_snr_db, 
                                          signals=sig_names, 
                                          signal_orientations=signal_orientations)

    # filter data with butterworth filter and save to new dictionary
    sensor_data_filt = {}
    for signal in sig_names:
        sensor_data_filt[signal] = butter_lowpass_filter(sensor_data[signal], 
                                                         cutoff=cutoff, 
                                                         fs=sampling_rate, 
                                                         order=order)

    # signal length: all sensor data must have same length --> Acc, Gyr, ...
    # --> but to ensure that indices are not out of range in case of wrong input data
    # let's take the smallest stop index of the different signals
    signal_len = float('inf')
    for sig in sig_names:
        if np.shape(sensor_data_filt[sig])[0] < signal_len:
            signal_len = np.shape(sensor_data_filt[sig])[0]

    # last window start is None--> set to time where the minimum window length just fits into the sensor data
    if win_last_start is None:
        win_last_start = signal_len/sampling_rate - win_min_len
    
    # array with all possible window start points
    all_window_start_points = np.arange(win_start, win_last_start, win_start_inc)
    # include win_last_start if the last start point plus the increment is equal to that value (adding end point)
    # (round due to small discrepancy)
    if round(all_window_start_points[-1] + win_start_inc, 5) == win_last_start:
        all_window_start_points = np.append(all_window_start_points, win_last_start)
    
    # array with all possible window lengths
    all_window_lengths = np.arange(win_min_len, win_max_len, win_stretch_inc)
    # include win_max_len if the max window length plus the increment is equal to that value (adding end point)
    # (round due to small discrepancy)
    if round(all_window_lengths[-1] + win_stretch_inc, 5) == win_max_len:
        all_window_lengths = np.append(all_window_lengths, win_max_len)
    
    # number of different window start points
    num_start_points = len(all_window_start_points)
    
    # number of different window sizes
    num_win_sizes = len(all_window_lengths)

    # matrix with all generated features (number_sections*6 = number of features --> Acc, Gyr (x,y,z))
    feature_map = np.zeros([num_start_points * num_win_sizes, number_sections*6])
    
    # counter for current position (row) in the feature map
    count = 0
    
    # variables for progress printing
    max_count = len(feature_map)
    prev_progress = 0 # previous progress

    # going through all window start points
    for ii, win_pos in enumerate(all_window_start_points):

        # going through all window lengths
        for jj, win_len in enumerate(all_window_lengths):

            # calculate start and stop index (type: float --> conversion to int happens afterwards)
            start_index = win_pos * sampling_rate
            stop_index = start_index + (win_len * sampling_rate)

            # check if stop index is out of range
            if stop_index >= signal_len:
                stop_index = signal_len-1 # set equal to last index

            # get indices of the sections
            section_indices, step = np.linspace(start_index, stop_index, number_sections, endpoint=False, retstep=True)

            #  + step/2 in order to get the indices in the middle of the sections
            section_indices = (section_indices + step/2).round().astype(int)

            # putting the feature map together
            #feature_map[count,:] = np.concatenate((sensor_data_filt[sig_names[0]][section_indices,:].transpose(), 
            #                                       sensor_data_filt[sig_names[1]][section_indices,:].transpose())).flatten().reshape(1, -1)
            feature_map[count,:] = np.concatenate([sensor_data_filt[sig][section_indices,:].transpose().flatten() for sig in sig_names])

            count += 1
        
        # print progress of feauture map generation
        if print_progress:
            prev_progress = print_progress_func(count, max_count, prev_progress, add_info=progress_info)
    
    return [feature_map, all_window_start_points, all_window_lengths, signal_len]

In [10]:
def detect_prob_map_peaks(prob_matrix,
                          win_start_inc,
                          num_win_sizes,
                          threshold_prob=0.5, 
                          footprint_length=1.5):
    '''
    Function to detect the local peaks of a probability map.
    
    Parameters
    ----------
    prob_matrix : 2d-array
        Matrix with predicted probabilities.
        
    threshold_prob : int or float
        Find only peaks with a minimum probability (threshold).
        
    footprint_length : int or float
        Length of the footprint for the maximum_filter in order to find peaks [s].
        
    win_start_inc : int or float
        Window start increment [s].
        
    num_win_sizes : int
        Number of different window sizes.

    Returns
    -------
    array 
        array[0] ... peak time indices
        array[1] ... peak window length indices
        e.g. ([[ 390, 723, 1331, ...], [4, 4, 10, ...]], dtype=int64)
    '''
    
    # length and height of the footprint for the maximum_filter (see below)
    footprint_length_indices = int(footprint_length / win_start_inc)
    footprint_height = num_win_sizes * 2  # take twice the number of all window sizes for footprint height
    
    footprint=np.ones((footprint_length_indices,footprint_height))
    
    # applying a maximum filter and generating a boolean map for local maxima
    local_max = maximum_filter(prob_matrix, footprint=footprint)==prob_matrix
    
    # removing all maxima below the threshold
    local_max = (prob_matrix>=threshold_prob) & local_max
    
    # check if there are several points with the same probability at one local maxima (within footprint length)
    #   --> remove them, otherwise we get more than one local maxima
    peak_indices_check = np.argwhere(local_max)
    if len(peak_indices_check) > 1:
        for ii in range(len(peak_indices_check)-1):
            row_ind, col_ind = peak_indices_check[ii]
            row_ind_next, col_ind_next = peak_indices_check[ii+1]
            if row_ind_next-row_ind <= footprint_length_indices/2:
                local_max[row_ind,col_ind] = False
    
    # get the maxima indices of the probability map
    peak_indices = np.argwhere(local_max).transpose()
    
    return peak_indices

In [11]:
def evaluate_peaks(peak_ind,
                   prob_matrix,
                   win_start_inc,
                   exercise_abbrs_peak_eval,
                   max_time_between_peaks=10,
                   min_peaks_per_block=3):
    '''
    Function to evaluate the detected peaks.
    (see function detect_prob_map_peaks(prob_matrix))
    
    --> assign peaks to repetition blocks with min two repetitions
    --> if blocks are overlapping, keep only the block with the highest predicted probabilities (sum)
    
    Parameters
    ----------
    peak_ind : dict
        Exercise-abbreviations as keys (e.g. 'RF', 'RO', ...)
        --> values: 2d-array 
        array[0] ... peak time indices
        array[1] ... peak window length indices
        e.g. ([[ 390, 723, 1331, ...], [4, 4, 10, ...]], dtype=int64)
        
    prob_matrix : dict
        Exercise-abbreviations as keys (e.g. 'RF', 'RO', ...)
        --> values: 2d-array 
        Matrices with predicted probabilities.
        
    win_start_inc : int or float
        Window start increment.
        
    exercise_abbrs_peak_eval : list of strings
        Exercises considered for peak evaluation.
        
    max_time_between_peaks : int or float
        Maximum time between two peaks in the same block [s].
        
    min_peaks_per_block : int
        Minimum number of peaks per block.

    Returns
    -------
    dict
        Dictionary with exercise abbreviations as keys --> repetition blocks
        
        Example: rep_blocks['RF'][0] (np.narray)  (first block of exercise 'RF'):
        [[4121, 9],
         [4135, 11],
         [4150, 11],
         [4166, 9],
         [4179, 10],
         [4193, 10],
         [4207, 10],
         [4221, 12],
         [4236, 13],
         [4251, 13]]
           --> 1st column: indices corresponding to horizontal axis (window start position)
           --> 2nd column: indices corresponding to vertical axis (window stretching)
           --> 10 rows --> 10 repetitions in this block
    '''
    
    # define the maximum time between two peaks in a block
    max_ind_between_peaks = int(max_time_between_peaks / win_start_inc)
    
    exercise_abbrs_peak_eval = [*peak_ind]
    
    # assign peaks to repetition blocks
    rep_blocks = {}
    for ex in exercise_abbrs_peak_eval:
        rep_blocks[ex] = []
        new_block = True # remember if current peak belongs to a new block
        
        # going through all time indices of the peaks of the current exercise
        for current_peak_time_ind, current_peak_win_ind in zip(peak_ind[ex][0], peak_ind[ex][1]):
            
            # if the current time index belongs to a new block --> append new block
            if new_block is True:
                rep_blocks[ex].append(np.array([[current_peak_time_ind, current_peak_win_ind]]))
                new_block = False
            
            # check if previous peak is within acceptable temporal distance in order to belong to the same block
            elif current_peak_time_ind - rep_blocks[ex][-1][-1,0] <= max_ind_between_peaks:
                # append (stack) the current peak to the last block
                rep_blocks[ex][-1] = np.vstack((rep_blocks[ex][-1], 
                                               np.array([[current_peak_time_ind, current_peak_win_ind]])))
                
            # append a new block
            else:
                rep_blocks[ex].append(np.array([[current_peak_time_ind, current_peak_win_ind]]))
    
    
    # check if the repetition blocks have a minimum number of peaks (min_peaks_per_block)
    valid_rep_blocks = {}
    for ex in exercise_abbrs_peak_eval:
        valid_rep_blocks[ex] = []
        # going through all blocks of the current exercise
        for rep_block in rep_blocks[ex]:
            # retain the block only if there is a minimum number of peaks
            if np.shape(rep_block)[0] >= min_peaks_per_block:
                valid_rep_blocks[ex].append(rep_block)
    
    
    # if blocks are overlapping --> retain only the block with the highest predicted probabilities (sum)
    #    --> the more peaks in the block, the higher the sum of probabilities (in general)
    blocks_to_remove = []
    # check all combinations of two exercises
    for ex1, ex2 in itertools.combinations(exercise_abbrs_peak_eval, 2):
        for ii in range(len(valid_rep_blocks[ex1])):
            for jj in range(len(valid_rep_blocks[ex2])):
                start_1 = valid_rep_blocks[ex1][ii][0,0] # time index of the first peak in the current block 1
                stop_1 = valid_rep_blocks[ex1][ii][-1,0] # time index of the last peak in the current block 1
                start_2 = valid_rep_blocks[ex2][jj][0,0] # time index of the first peak in the current block 2
                stop_2 = valid_rep_blocks[ex2][jj][-1,0] # time index of the last peak in the current block 2

                # check if the two blocks overlap
                if (start_1 >= start_2 and start_1 <= stop_2) or (stop_1 >= start_2 and stop_1 <= stop_2) \
                or (start_2 >= start_1 and start_2 <= stop_1) or (stop_2 >= start_1 and stop_2 <= stop_1):

                    # selecet the corresponding probability values of prob_matrix and sum them up
                    sum_prob_block_1 = prob_matrix[ex1][rep_blocks[ex1][ii][:,0], 
                                                        rep_blocks[ex1][ii][:,1]].sum()

                    sum_prob_block_2 = prob_matrix[ex2][rep_blocks[ex2][jj][:,0], 
                                                        rep_blocks[ex2][jj][:,1]].sum()

                    # compare the sum of the probabilities of the two blocks
                    if sum_prob_block_1 < sum_prob_block_2:
                        blocks_to_remove.append([ex1, ii])
                    else:
                        blocks_to_remove.append([ex2, jj])
    
    # ensure that there are no duplicates in the nested list
    blocks_to_remove_unique = []
    for sublist in blocks_to_remove:
        if sublist not in blocks_to_remove_unique:
            blocks_to_remove_unique.append(sublist)
    
    # by removing the blocks take the reversed sorted order of the block index
    #    --> so it is possible to remove all blocks without "refreshing" the indices
    #        (if one block is removed, higher indices of all other blocks are changing)
    for ex, block_ind in sorted(blocks_to_remove_unique, key=lambda x: x[1])[::-1]:
        valid_rep_blocks[ex].pop(block_ind)
        
    return valid_rep_blocks

In [12]:
def convert_time_format(min_sec, sampling_rate=None, time_offset=0, max_index=None, convert_to_s=False):
    '''
    Function converts a string with the time format 'min:sec' (e.g. 5:17.2)
    to a corresponding index, considering the sampling rate.
    If index would be negative, 0 is returned.
    If convert_to_s is True --> convert to seconds instead.
    
    Parameters
    ----------
    min_sec : string
        Time data, defined format: 'min:sec'
    
    sampling_rate : float or int
        Sampling rate for the index calculation. [Hz]
        
    time_offset : float of int
        Time offset, considered at the index calculation. [s]
        
    max_index : int
        Maximum valid index.
        If provided and calculated index is out of range,
        max_index is returned instead.
        
    convert_to_s : boolean
        If True --> convert to seconds.
    
    Returns
    -------
    int or float
        Corresponding index or value [s] to parameter 'min_max'.
    '''
    
    # split time string and convert to float
    minutes = float(min_sec.split(':')[0])
    seconds = float(min_sec.split(':')[1])
    
    # start and stop time in seconds
    time_s = minutes*60 + seconds + time_offset
    
    if convert_to_s is True:
        return time_s
    
    # get corresponding index
    index = round(time_s * sampling_rate)
    
    # ensure that index is not below 0
    if index < 0:
        index = 0
    
    # ensure that index is in valid range if max index is given
    if max_index is not None and index > max_index:
        index = max_index
            
    return index

In [13]:
def indices_to_time(start_index, stop_index, win_start_inc):
    '''
    Function convert indices to time string.
    
    Parameters
    ----------
    start_index : int
        
    stop_index : int
    
    win_start_inc : int or float
    
    Returns
    -------
    str
        String with start and stop time (e.g. '14:39.6 - 15:19.4').
    '''
    
    start_time_text = '{0:02}:{1:04.1f}'.format(int(start_index*win_start_inc/60), 
                                               (start_index*win_start_inc)%60)
    stop_time_text = '{0:02}:{1:04.1f}'.format(int(stop_index*win_start_inc/60), 
                                               (stop_index*win_start_inc)%60)
    return start_time_text + ' - ' + stop_time_text

In [14]:
def fill_prediction_matrix(pred_probs,
                           exercise_abbrs,
                           num_start_points,
                           num_win_sizes,
                           all_window_start_points,
                           all_window_lengths):
    
    '''
    Function to write the predicted probabilities into a dictionary 
    with prediction matrices as elements for each exercise (key).
    
    Parameters
    ----------
    pred_probs : numpy.ndarray
        Matrix with all predicted probabilities.
        (Number of rows: number of data points)
        (Number of columns: number of labels)
        
    exercise_abbrs : list of strings
        Abbreviations of exercises.
        
    num_start_points : int
        Number of window start points.
        
    num_win_sizes : int
        Number of different window sizes.
        
    all_window_start_points : array
        Array with all window start points.
        
    all_window_lengths : array
        Array with all different window lengths.
    
    Returns
    -------
    str
        String with start and stop time (e.g. '14:39.6 - 15:19.4').
    '''
    
    count = 0 # counter for the current row of the matrix with the predicted probabilities

    # dictionary with matrices to save predicted values for all classes
    prob_matrix_dict = {}
    for ex in exercise_abbrs:
        prob_matrix_dict[ex] = np.zeros([num_start_points, num_win_sizes])

    # going through all window start points
    for ii, win_pos in enumerate(all_window_start_points):

        # going through all window lengths 
        for jj, win_len in enumerate(all_window_lengths):

            for kk, ex in enumerate(exercise_abbrs):
                prob_matrix_dict[ex][ii,jj] = pred_probs[count,kk]
            
            count += 1

    return prob_matrix_dict

In [15]:
class PhysioData_WindowingProcedure():
    '''
    Class for feature generation according to a certain windowing procedure.
    There are various selectable options --> see Parameters. 
    
    Parameters
    ----------
    test_subject_dir : string
        Directory to the csv-file of the test subject data.
    
    test_subject_file : string
        Name of the csv-file.
        
    number_sections: int
        Number of sections to split each window.
        
    sig_names : list of strings
        Signal names, used as keys for signal dictionaries.
        
    signal_orientations : list of strings
        Orientations of the signals (e.g. ['x','y','z']).
        
    sampling_rate : int or float
        Sampling rate of the signals.
        
    cutoff : int or float
        Cutoff frequency of the butterworh filter.
        
    order : int
        Order of the butterworth filter.
        
    win_start_inc : int or float
        Start increment for the window [s].
        
    win_stretch_inc : int or float
        Stretch increment for the window [s].
    
    win_min_len : int or float
        Minimum window length [s].
    
    win_max_len : int or float
        Maximum window length [s].
    
    win_start_min_sec : string
        Start time of the window ['min:sec'] (e.g. '05:30.0').
    
    win_last_start_min_sec : string or None
        Last start time of the window ['min:sec'] (e.g. '10:30.0').
        If None, set to time where the minimum window length just fits into the sensor data.
    
    print_progress : boolean
        If True --> print progress at feature generation.
    
    progress_info : str
        Information to print with progress.
        
    rot_axis : int or list of int
        Axis for rotation:
        0, 1 or 2 --> x, y or z
        --> if list: sequence of rotations
        (Length of list has to match with the length of rot_angle,
        otherwise the shorter list of the two is taken and all other values are omitted.)
        
    rot_angle : int or float or list of int or float
        Rotation angle in degree.
        --> if list: sequence of rotations
        (Length of list has to match with the length of rot_axis,
        otherwise the shorter list of the two is taken and all other values are omitted.)
        
    add_noise : boolean
        If True --> noise is added to signals.
        
    target_snr_db : int or float
        Signal to noise ratio in db for the generated noisy signals.
    
    csv_skiprows : int
        Number of rows to skip for pandas read_csv() function.
    
    csv_separator : char
        Seperator for pandas read_csv() function.
        
    exercise_abbrs : list of strings
        Exercise abbreviations (sequence matters).
        
    exercise_abbrs_peak_eval : list of strings
        Exercises to consider for peak evaluation (e.g. omit non-exercise).
        

    Methods
    -------
    get_feature_map()
        Returns the feature map.
        
    evaluate_probability_matrix()
        Method to evaluate a probability matrix.
        --> Parameters: See docstring of method.
        
    print_rep_blocks()
        (!) Call this method only after evaluate_probability_matrix().
        Method to print the found repetition blocks of each exercise 
        with time range and number of repetitons.
        --> Parameters: See docstring of method.
        
    plot_probability_matrices_and_peaks()
        (!) Call this method only after evaluate_probability_matrix()
        Method to plot the probability matrix as well as
        the evaluated peaks (repetitions).
        --> Parameters: See docstring of method.
        
    '''
    def __init__(self,
                 test_subject_dir  = r'E:\Physio_Data\Subject_01',
                 test_subject_file = 'subject01.csv',
                 number_sections=10,
                 signal_abbrs=['Acc','Gyr'],
                 signal_orientations=['x','y','z'],
                 sampling_rate=256,
                 cutoff=10,
                 order=6,
                 win_start_inc=0.2,
                 win_stretch_inc=0.2,
                 win_min_len=1,
                 win_max_len=5,
                 win_start_min_sec='00:00.0',
                 win_last_start_min_sec=None,
                 print_progress=True,
                 progress_info='Generate feature map...',
                 rot_axis=0,
                 rot_angle=0,
                 add_noise=False,
                 target_snr_db=20,
                 csv_skiprows=0,
                 csv_separator=',',
                 exercise_abbrs=['RF','RO','RS','LR','BC','TC','MP','SA','P1','P2','NE'],
                 exercise_abbrs_peak_eval = ['RF','RO','RS','LR','BC','TC','MP','SA','P1','P2']):
        """
        Parameters
        ----------
        --> See class docstring.
        """
        
        # convert window start position and last window start position to value in seconds
        self.win_start = convert_time_format(win_start_min_sec, convert_to_s=True)
        if win_last_start_min_sec:
            self.win_last_start = convert_time_format(win_last_start_min_sec, convert_to_s=True)
        else:
            self.win_last_start = None
        
        # parameters for the windowing procedure
        self.win_start_inc = win_start_inc
        self.win_stretch_inc = win_stretch_inc
        self.win_min_len = win_min_len
        self.win_max_len = win_max_len
        self.sampling_rate = sampling_rate
        self.exercise_abbrs = exercise_abbrs
        self.exercise_abbrs_peak_eval = exercise_abbrs_peak_eval
        self.win_start_min_sec = win_start_min_sec
        self.win_last_start_min_sec = win_last_start_min_sec
        
        # file (csv) of selected test subject
        self.test_subject_path = os.path.join(test_subject_dir, test_subject_file)
        
        # get the feature map, start points, window lengths and the signal length of the selected data
        self.feature_map, self.all_window_start_points, \
        self.all_window_lengths, self.signal_len = signal_windowing_via_indices(
                                                                         self.test_subject_path,
                                                                         number_sections=number_sections,
                                                                         sig_names=signal_abbrs,
                                                                         signal_orientations=signal_orientations,
                                                                         sampling_rate=sampling_rate,
                                                                         cutoff=cutoff,
                                                                         order=order,
                                                                         win_start_inc=win_start_inc,
                                                                         win_stretch_inc=win_stretch_inc,
                                                                         win_min_len=win_min_len,
                                                                         win_max_len=win_max_len,
                                                                         win_start=self.win_start,
                                                                         win_last_start=self.win_last_start,
                                                                         print_progress=print_progress,
                                                                         progress_info=progress_info,
                                                                         rot_axis=rot_axis,
                                                                         rot_angle=rot_angle,
                                                                         add_noise=add_noise,
                                                                         target_snr_db=target_snr_db,
                                                                         csv_skiprows=csv_skiprows,
                                                                         csv_separator=csv_separator)
        
        # last window start time --> time where the minimum window length just fits into the sensor data
        self.win_last_start = self.signal_len/self.sampling_rate - self.win_min_len
        
        # number of different window start points
        self.num_start_points = len(self.all_window_start_points)
        
        # number of different window sizes
        self.num_win_sizes = len(self.all_window_lengths)

    
    # method to get the feature map
    def get_feature_map(self):
        return self.feature_map
    
    
    def print_rep_blocks(self, print_rep_len_prob=True):
        '''
        Method to print the found repetition blocks of each exercise 
        with time range and number of repetitons.

        Parameters
        ----------
        print_rep_len_prob : boolean
            If Ture --> print individual repetition lengths and predicted probabilities.

        Returns
        -------
        no returns
        '''
        
        # going through all exercises
        for ex in self.exercise_abbrs_peak_eval:
            print('\nExercise: ' + ex)
            print('Number of blocks: {}\n'.format(len(self.rep_blocks[ex])))

            # going through all repetition blocks of the current exercise
            for block_num in range(len(self.rep_blocks[ex])):
                print('\tBlock #{}:'.format(block_num+1))
                print('\t\tRepetitions: {}'.format(np.shape(np.array(self.rep_blocks[ex][block_num]))[0]))
                
                # for both indices we have to consider the start position of the first window (win_start)
                start_index = self.rep_blocks[ex][block_num][0,0] + \
                    convert_time_format(self.win_start_min_sec, sampling_rate=1/self.win_start_inc)
                stop_index = self.rep_blocks[ex][block_num][-1,0] + \
                    convert_time_format(self.win_start_min_sec, sampling_rate=1/self.win_start_inc)
                
                # for the stop index we have to consider the length of the last repetition
                stop_index += int((self.rep_blocks[ex][block_num][-1,1]*self.win_stretch_inc \
                                   + self.win_min_len) / self.win_start_inc)
                
                print('\t\tTime range: ' + indices_to_time(start_index, stop_index, self.win_start_inc))
                
                if print_rep_len_prob is True:
                    print('\t\tRepetition lengths [s] and predicted prob.: ')
                    for kk, rep_length_index in enumerate(self.rep_blocks[ex][block_num][:,1]):
                        win_pos_index = self.rep_blocks[ex][block_num][kk,0]
                        print('\t\t\t{0:3d}\t{1:.2f}\t({2:.3f})'.format(kk+1,
                                                  rep_length_index*self.win_stretch_inc + self.win_min_len,
                                                  self.prob_matrix_dict[ex][win_pos_index,rep_length_index])) 
    
    
    def evaluate_probability_matrix(self,
                                    pred_probabilities,
                                    max_time_between_peaks=10,
                                    min_peaks_per_block=3,
                                    threshold_prob=0.5,
                                    footprint_length=1.5,
                                    print_rep_len_prob=True):
        '''
        Evaluate a probability matrix in order to find repetition blocks.
        There are various selectable options --> see Parameters.
        After the evaluation the method print_rep_blocks() is called
        to print the found repetition blocks.

        Parameters
        ----------
        pred_probabilities :  np.narray
            Matrix with probabilities to evaluate.
            
        max_time_between_peaks : int or float
            Maximum time between two peaks of the same block [s].
        
        min_peaks_per_block : int
            Minimum number of peaks per block.
            
        threshold_prob : int or float
            Find only peaks with a minimum probability (threshold).
            (Value from 0 ... 1)
        
        footprint_length : int or float
            Length of the footprint for the maximum_filter in order to find peaks [s].
        
        print_rep_len_prob : boolean
            If Ture --> print individual repetition lengths and predicted probabilities.

        Returns
        -------
        no returns
        '''
        
        self.prob_matrix_dict = fill_prediction_matrix(pred_probabilities,
                                                       exercise_abbrs=self.exercise_abbrs,
                                                       num_start_points=self.num_start_points,
                                                       num_win_sizes=self.num_win_sizes,
                                                       all_window_start_points=self.all_window_start_points,
                                                       all_window_lengths=self.all_window_lengths)

        self.peak_ind_dict = {}
        for ex in self.exercise_abbrs_peak_eval:
            self.peak_ind_dict[ex] = detect_prob_map_peaks(prob_matrix=self.prob_matrix_dict[ex],
                                                           win_start_inc=self.win_start_inc,
                                                           num_win_sizes=self.num_win_sizes,
                                                           threshold_prob=threshold_prob, 
                                                           footprint_length=footprint_length)

        self.rep_blocks = evaluate_peaks(peak_ind=self.peak_ind_dict,
                                         prob_matrix=self.prob_matrix_dict,
                                         win_start_inc=self.win_start_inc,
                                         exercise_abbrs_peak_eval=self.exercise_abbrs_peak_eval,
                                         max_time_between_peaks=max_time_between_peaks,
                                         min_peaks_per_block=min_peaks_per_block)
        
        self.print_rep_blocks(print_rep_len_prob)
        
        
    def plot_probability_matrices_and_peaks(self,
                                            test_subject_id=None,
                                            figsize=(18,9),
                                            cross_size=10,
                                            plot_actual_classes=True,
                                            timetable_file_dir = r'E:\Physio_Data\Exercise_time_tables',
                                            timetable_file_name = 'Timetable_subject01.txt',
                                            exercise_timetable_names = {'Raises Front':'RF',
                                                                        'Raises Oblique':'RO',
                                                                        'Raises Side':'RS',
                                                                        'Rotation Wrist':'LR',
                                                                        'Biceps Curls':'BC',
                                                                        'Triceps Curls':'TC',
                                                                        'Military Press':'MP',
                                                                        'Shoulder Adduct.':'SA',
                                                                        'PNF Diagonal 1':'P1',
                                                                        'PNF Diagonal 2':'P2'}
                                           ):
        '''
        Print the probability matrix as well as the found repetitions
        by means of green crosses.

        Parameters
        ----------
        test_subject_id : int or None
            Just for the title of the plot, not necessary.
        
        figsize : tuple
            Figure size of the plot (e.g. (18,9)).
            
        cross_size : int
            Size of the green crosses, indicating the individual repetitions.
            
        plot_actual_classes : boolean
            If True --> show a separate axis with the actual classes from a timetable.
        
        timetable_file_dir : string
            Directory to the timetable file.
            (Only necessary if plot_actual_classes is True.)
            
        timetable_file_name : string
            Name of the txt-file containing the timetable with the actual classes.
            (Only necessary if plot_actual_classes is True.)

        Returns
        -------
        no returns
        '''
        
        # text for current subject
        if isinstance(test_subject_id, int):
            self.sub_text = 'Subject {}'.format(test_subject_id)
        else:
            self.sub_text = ''

        yticks = np.arange(0, self.win_max_len-self.win_min_len+self.win_stretch_inc, 2) / self.win_stretch_inc
        ylabels = ['{}'.format(yticks[ii] * self.win_stretch_inc + self.win_min_len) for ii in range(len(yticks))]

        # plot one axis less if plot_actual_classes is False      
        if plot_actual_classes is False:
            self.fig, self.axis = plt.subplots(len(self.exercise_abbrs),1,figsize=figsize, sharex=True)
        else:
            self.fig, self.axis = plt.subplots(len(self.exercise_abbrs)+1,1,figsize=figsize, sharex=True)


        # image color settings for RFC probabilities
        cmap = plt.cm.seismic
        vmin=0
        vmax=1

        for ax, ex in zip(self.axis, self.exercise_abbrs):
            s = ax.imshow(self.prob_matrix_dict[ex].transpose(), interpolation='nearest', 
                          aspect='auto', cmap=cmap, vmin=vmin, vmax=vmax)
            ax.invert_yaxis()
            ax.set_yticks(yticks)
            ax.set_yticklabels(ylabels, fontsize=7)
            ax.set_ylabel(ex, rotation=0, fontsize=13)
            ax.yaxis.labelpad = 32
            ax.xaxis.set_ticklabels([])

        # dictionary for cross plots (in order to toggle visibility)
        self.cross_plot = {}

        # plot crosses for image peaks
        for ax, ex in zip(self.axis, self.exercise_abbrs_peak_eval):
            #ax.plot(peak_ind[ex][0], peak_ind[ex][1], '+g', markersize=8, markeredgewidth=1.5)
            self.cross_plot[ex] = []
            for ii in range(len(self.rep_blocks[ex])):
                x_peak = np.array(self.rep_blocks[ex][ii])[:,0]
                y_peak = np.array(self.rep_blocks[ex][ii])[:,1]
                self.cross_plot[ex].append(ax.plot(x_peak, y_peak, '+g', markersize=cross_size, markeredgewidth=1.5))

        self.Button_showCross_ax = plt.axes([0.78, 0.12, 0.05, 0.03])
        self.Button_showCross = Button(self.Button_showCross_ax, 'Show rep.')
        self.Button_showCross.on_clicked(self.toggle_cross)

        self.fig.text(0.1, 0.6, r'window length $[s]$', fontsize=10, rotation=90)
        # plt.gcf().text(0.078, 0.6, r'window length $[s]$', fontsize=10, rotation=90) # for half the window size

        formatter = FuncFormatter(lambda i, x: time.strftime('%M:%S', time.gmtime(i*self.win_start_inc+self.win_start)))
        self.axis[-1].xaxis.set_major_formatter(formatter)
        self.axis[-1].set_xlabel(r'time $[min:sec]$', fontsize=13)

        self.fig.subplots_adjust(bottom=0.2, right=0.9) # make space for buttons and color bar
        self.cbar_ax = self.fig.add_axes([0.93, 0.255, 0.01, 0.625])
        self.fig.colorbar(s, cax=self.cbar_ax)

        # add slider for selections on the x axis
        self.Slider_shiftX_ax = plt.axes([0.125, 0.07, 0.775, 0.025])
        self.Slider_zoomX_ax = plt.axes([0.125, 0.035, 0.775, 0.025])

        axcolor = 'cornflowerblue'
        self.Slider_shiftX = Slider(self.Slider_shiftX_ax, 'time shift [%]', 0.0, 100.0, valinit=0, facecolor=axcolor)
        self.Slider_zoomX = Slider(self.Slider_zoomX_ax, 'time scale [%]', 0.1, 100.0, valinit=100, facecolor=axcolor)
        self.Slider_zoomX_ax.xaxis.set_visible(True)
        self.Slider_zoomX_ax.set_xticks(np.arange(0,105,5)) 

        self.Slider_shiftX.on_changed(self.updateX)
        self.Slider_zoomX.on_changed(self.updateX)

        # add button to reset view
        self.Button_resetX_ax = plt.axes([0.85, 0.12, 0.05, 0.03])
        self.Button_resetX = Button(self.Button_resetX_ax, 'Reset view')
        self.Button_resetX.on_clicked(self.resetX)

        self.start_index = 0
        self.stop_index = self.num_start_points

        self.fig.suptitle('Predicted Probabilities ' + self.sub_text + '\n' + indices_to_time(
                self.start_index + round(self.win_start/self.win_start_inc),  
                self.stop_index + round(self.win_start/self.win_start_inc), 
                self.win_start_inc), fontsize=20)

        self.axis[-1].set_xlim(0, self.num_start_points)


        # Plotting the actual classes (exercises) on the last axis:
        if plot_actual_classes is True:

            # file with timetable (csv) of the test subject
            timetable_data_path = os.path.join(timetable_file_dir, timetable_file_name)

            # read in time table
            timetable_data = pd.read_csv(timetable_data_path, skiprows=0, sep='\t', header=None)
            num_exercises = timetable_data.shape[0] # number of exercises

            self.axis[-1].set_yticks([])
            self.axis[-1].set_ylim([0,1])

            # going through all exercises in the timetable
            for ii, ex_name in enumerate(timetable_data.values[:,0]):

                # going through all repetition blocks in the timetable (5, 10 and 15 rep. blocks)
                for rep_col, start_col, stop_col in zip([1,2,3],[4,6,8],[5,7,9]): # corresponding columns
                    rep_num = timetable_data.values[ii,rep_col]
                    
                    # consider win_start for border calculation
                    left_border = convert_time_format(timetable_data.values[ii,start_col], 
                                                        sampling_rate=1/self.win_start_inc) - \
                                                        self.win_start/self.win_start_inc 
                    right_border = convert_time_format(timetable_data.values[ii,stop_col], 
                                                        sampling_rate=1/self.win_start_inc) - \
                                                        self.win_start/self.win_start_inc 
                    # mark the corresponding area
                    self.axis[-1].axvspan(left_border, right_border, color='y', alpha=0.3, lw=0)
                    # write text to the corresponding area
                    
                    # x center of marked area
                    x_center = left_border + (right_border-left_border)/2
                    self.axis[-1].text(x_center, 0.5, str(rep_num) + '\n' + exercise_timetable_names[ex_name], 
                                  horizontalalignment='center', verticalalignment='center', fontsize=10, clip_on=True)

            self.axis[-1].set_ylabel('Actual classes', rotation=0, fontsize=11)
            self.axis[-1].yaxis.labelpad = 50

        plt.show()
    
    
    # Auxiliary methods for the interactive plot:
    
    def updateX(self,val):
        self.start_index = int(self.Slider_shiftX.val / 100 * self.num_start_points)
        self.stop_index = self.start_index + self.Slider_zoomX.val / 100 * self.num_start_points
        self.axis[-1].set_xlim((self.start_index, self.stop_index))
        self.fig.suptitle('Predicted Probabilities ' + self.sub_text + '\n' + indices_to_time(
            self.start_index + round(self.win_start/self.win_start_inc),  
            self.stop_index + round(self.win_start/self.win_start_inc), 
            self.win_start_inc), fontsize=20)
        plt.draw()
        
    def toggle_cross(self,val):
        # This function is called by a button to hide/show the crosses
        for ex in self.exercise_abbrs_peak_eval:
            for ii in range(len(self.rep_blocks[ex])):
                self.cross_plot[ex][ii][0].set_visible(not self.cross_plot[ex][ii][0].get_visible())
        plt.draw()
        
    def resetX(self,val):
        self.start_index = 0
        self.stop_index = self.num_start_points
        self.axis[-1].set_xlim((self.start_index, self.stop_index))
        self.Slider_shiftX.reset()
        self.Slider_zoomX.reset()
        self.fig.suptitle('Predicted Probabilities ' + self.sub_text + '\n' + indices_to_time(
            self.start_index + round(self.win_start/self.win_start_inc),  
            self.stop_index + round(self.win_start/self.win_start_inc), 
            self.win_start_inc), fontsize=20)
        plt.draw()
        

In [16]:
PD_wp = PhysioData_WindowingProcedure(test_subject_dir  = r'E:\Physio_Data\Subject_01',
                                      test_subject_file = 'subject01.csv',
                                      number_sections=10,
                                      signal_abbrs=['Acc','Gyr'],
                                      signal_orientations=['x','y','z'],
                                      sampling_rate=256,
                                      cutoff=10,
                                      order=6,
                                      win_start_inc=0.2,
                                      win_stretch_inc=0.2,
                                      win_min_len=1,
                                      win_max_len=5,
                                      win_start_min_sec='00:00.0',
                                      win_last_start_min_sec=None,
                                      print_progress=True,
                                      progress_info='Generate feature map...',
                                      rot_axis=0,
                                      rot_angle=0,
                                      add_noise=False,
                                      target_snr_db=20,
                                      csv_skiprows=0,
                                      csv_separator=',')

np.shape(PD_wp.get_feature_map())

Generate feature map... 100%


(210945, 60)

## Test the created class

In [16]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
from pivottablejs import pivot_ui
import sys
sys.path.append('..')  # in order to import modules from my own package

# my package
from packageMeinhart import PhysioDataHandler as PDH
from packageMeinhart.functionsMasterProjectMeinhart import print_precision_recall_accuracy
from packageMeinhart.functionsMasterProjectMeinhart import print_misclassified_data_points

### First create and train a ML model

In [17]:
PD1 = PDH.PhysioData_SectionFeatures(num_sections=10,
                                     test_subject_ids=1,
                                     train_subject_ids=-1,
                                     test_rep_nums=-1,
                                     train_rep_nums=-1,
                                     test_ex_abbrs=-1,
                                     train_ex_abbrs=-1,
                                     with_non_Ex=True,
                                     rot_axis_test_data=0,
                                     rot_angle_test_data=0,
                                     add_noise_test_data=False,
                                     add_noise_train_data=False,
                                     snr_db=20)

# create ML model
ML_model = RandomForestClassifier(n_estimators=500, max_leaf_nodes=40, n_jobs=-1, random_state=42)
#ML_model = make_pipeline(StandardScaler(), SVC(random_state=42)) # Support Vector Classifier with input scaling

# train the model
ML_model.fit(PD1.X_train(), PD1.y_train())

# predict labels
y_pred = ML_model.predict(PD1.X_test())

# show results
print('Model: ' + type(ML_model).__name__ + '\n')

print('Total Accuracy: {:.2f}%\n'.format((accuracy_score(PD1.y_test(), y_pred))*100))
print_precision_recall_accuracy(y_pred, PD1.y_test())

report = classification_report(PD1.y_test(), y_pred, 
                               labels=np.arange(0,11),
                               target_names=['RF','RO','RS','LR','BC','TC','MP','SA','P1','P2','NE'],
                               sample_weight=None, output_dict=True)

report_df = pd.DataFrame.from_dict(report, orient='index')

print('')
print_misclassified_data_points(y_pred, PD1.y_test())

Model: RandomForestClassifier

Total Accuracy: 99.01%

Exercise	Precision [%]	Recall [%]	Accuracy [%]
  RF		  100.00	   96.67	   99.86
  RO		   96.77	  100.00	   99.86
  RS		  100.00	  100.00	  100.00
  LR		  100.00	  100.00	  100.00
  BC		  100.00	  100.00	  100.00
  TC		  100.00	  100.00	  100.00
  MP		   85.71	  100.00	   99.29
  SA		  100.00	  100.00	  100.00
  P1		  100.00	   96.67	   99.86
  P2		  100.00	  100.00	  100.00
  NE		   99.75	   98.77	   99.15

7 misclassified (709 test data points):
RF classified as RO
P1 classified as NE
NE classified as MP
NE classified as MP
NE classified as MP
NE classified as MP
NE classified as MP


### Test the new class 

In [18]:
PD_wp = PhysioData_WindowingProcedure(test_subject_dir  = r'E:\Physio_Data\Subject_01',
                                      test_subject_file = 'subject01.csv',
                                      number_sections=10,
                                      signal_abbrs=['Acc','Gyr'],
                                      signal_orientations=['x','y','z'],
                                      sampling_rate=256,
                                      cutoff=10,
                                      order=6,
                                      win_start_inc=0.2,
                                      win_stretch_inc=0.2,
                                      win_min_len=1,
                                      win_max_len=5,
                                      win_start_min_sec='13:00.0',
                                      win_last_start_min_sec='19:30.0',
                                      print_progress=True,
                                      progress_info='Generate feature map...',
                                      rot_axis=0,
                                      rot_angle=0,
                                      add_noise=False,
                                      target_snr_db=20,
                                      csv_skiprows=0,
                                      csv_separator=',')

np.shape(PD_wp.get_feature_map())

Generate feature map... 100%


(40971, 60)

In [19]:
pred_probs = ML_model.predict_proba(PD_wp.get_feature_map())
np.shape(pred_probs)

(40971, 11)

In [20]:
PD_wp.evaluate_probability_matrix(pred_probabilities=pred_probs,
                                  max_time_between_peaks=10,
                                  min_peaks_per_block=3,
                                  threshold_prob=0.5,
                                  footprint_length=1.5,
                                  print_rep_len_prob=True)


Exercise: RF
Number of blocks: 3

	Block #1:
		Repetitions: 10
		Time range: 13:44.2 - 14:13.8
		Repetition lengths [s] and predicted prob.: 
			  1	2.80	(0.716)
			  2	3.20	(0.813)
			  3	3.20	(0.837)
			  4	2.80	(0.673)
			  5	2.80	(0.822)
			  6	3.00	(0.656)
			  7	3.00	(0.619)
			  8	3.40	(0.577)
			  9	3.60	(0.517)
			 10	3.60	(0.621)
	Block #2:
		Repetitions: 15
		Time range: 14:39.6 - 15:19.2
		Repetition lengths [s] and predicted prob.: 
			  1	2.80	(0.703)
			  2	3.20	(0.784)
			  3	2.80	(0.864)
			  4	3.00	(0.858)
			  5	2.80	(0.812)
			  6	2.60	(0.725)
			  7	3.00	(0.759)
			  8	3.00	(0.823)
			  9	2.80	(0.788)
			 10	2.80	(0.717)
			 11	2.60	(0.802)
			 12	2.80	(0.828)
			 13	3.00	(0.750)
			 14	3.00	(0.723)
			 15	2.80	(0.784)
	Block #3:
		Repetitions: 5
		Time range: 16:08.4 - 16:22.6
		Repetition lengths [s] and predicted prob.: 
			  1	3.00	(0.729)
			  2	3.20	(0.782)
			  3	3.00	(0.794)
			  4	2.80	(0.649)
			  5	3.20	(0.838)

Exercise: RO
Number of blocks: 0


Exerci

In [21]:
%matplotlib auto
PD_wp.plot_probability_matrices_and_peaks(test_subject_id=1,
                                          figsize=(18,9),
                                          cross_size=10,
                                          plot_actual_classes=True,
                                          timetable_file_dir = r'E:\Physio_Data\Exercise_time_tables',
                                          timetable_file_name = 'Timetable_subject01.txt')

Using matplotlib backend: TkAgg


## Test once again

In [22]:
num_sections=10


PD2 = PDH.PhysioData_SectionFeatures(num_sections=num_sections,
                                     test_subject_ids=1,
                                     train_subject_ids=-1,
                                     test_rep_nums=-1,
                                     train_rep_nums=-1,
                                     test_ex_abbrs=['RF','RO','NE'],
                                     train_ex_abbrs=['RF','RO','NE'],
                                     with_non_Ex=True,
                                     rot_axis_test_data=0,
                                     rot_angle_test_data=0,
                                     add_noise_test_data=False,
                                     add_noise_train_data=False,
                                     snr_db=20)

# create ML model
ML_model = RandomForestClassifier(n_estimators=500, max_leaf_nodes=40, n_jobs=-1, random_state=42)
#ML_model = make_pipeline(StandardScaler(), SVC(random_state=42)) # Support Vector Classifier with input scaling

# train the model
ML_model.fit(PD2.X_train(), PD2.y_train())

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=None, max_features='auto', max_leaf_nodes=40,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=2,
            min_weight_fraction_leaf=0.0, n_estimators=500, n_jobs=-1,
            oob_score=False, random_state=42, verbose=0, warm_start=False)

In [25]:
PD_wp = PhysioData_WindowingProcedure(test_subject_dir  = r'E:\Physio_Data\Subject_01',
                                      test_subject_file = 'subject01.csv',
                                      number_sections=num_sections,
                                      signal_abbrs=['Acc','Gyr'],
                                      signal_orientations=['x','y','z'],
                                      sampling_rate=256,
                                      cutoff=10,
                                      order=6,
                                      win_start_inc=0.5,
                                      win_stretch_inc=0.5,
                                      win_min_len=1,
                                      win_max_len=4,
                                      win_start_min_sec='13:30.0',
                                      win_last_start_min_sec='14:30.0',
                                      print_progress=True,
                                      progress_info='Generate feature map...',
                                      rot_axis=[0,2],
                                      rot_angle=[10,10],
                                      add_noise=True,
                                      target_snr_db=10,
                                      csv_skiprows=0,
                                      csv_separator=',',
                                      exercise_abbrs=['RF','RO','NE'],
                                      exercise_abbrs_peak_eval = ['RF','RO'])

pred_probs = ML_model.predict_proba(PD_wp.get_feature_map())

PD_wp.evaluate_probability_matrix(pred_probabilities=pred_probs,
                                  max_time_between_peaks=10,
                                  min_peaks_per_block=3,
                                  threshold_prob=0.5,
                                  footprint_length=1.5,
                                  print_rep_len_prob=True)

%matplotlib auto
PD_wp.plot_probability_matrices_and_peaks(test_subject_id=1,
                                          figsize=(18,9),
                                          cross_size=10,
                                          plot_actual_classes=True,
                                          timetable_file_dir = r'E:\Physio_Data\Exercise_time_tables',
                                          timetable_file_name = 'Timetable_subject01.txt')

Generate feature map... 100%

Exercise: RF
Number of blocks: 1

	Block #1:
		Repetitions: 10
		Time range: 13:44.0 - 14:13.5
		Repetition lengths [s] and predicted prob.: 
			  1	3.00	(0.542)
			  2	3.00	(0.587)
			  3	3.00	(0.792)
			  4	3.00	(0.694)
			  5	2.50	(0.690)
			  6	3.00	(0.754)
			  7	3.00	(0.860)
			  8	3.50	(0.622)
			  9	3.00	(0.727)
			 10	3.00	(0.893)

Exercise: RO
Number of blocks: 0

Using matplotlib backend: TkAgg
