In [9]:
# Import public packages and functions
import os
import pandas as pd
import numpy as np
import pickle
import sys
import json
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.signal import find_peaks

# inserting the lib folder to the compiler
sys.path.insert(0, './lib')
sys.path.insert(0, './utils/')

import warnings
warnings.filterwarnings("ignore")

from lib_event import EVENTS
from lib_accelerometer import ACCELEROMETER
from lib_data import DATA_IO

import utils_plotting_tapping, utils_misc, utils_accelerometer

In [11]:
PATH_CURR   = os.path.abspath(os.curdir)    # current code
PATH        = (str(Path(PATH_CURR).parent)) # data repository: upper directory where datasets situated
PATH_DATA   = PATH + "\\data"               # the recordings data directory
PATH_EVENTS = PATH + "\\events"
SUB_LIST    = utils_misc.get_SUB_list(PATH_DATA) # get the SUB id list which we have a recording of them

# 1. Get Tapping Indices

In [14]:
def custom_tap_finding_p017(acc_class, move_type='tap'):

    acc_class = acc_class.get_data()
    fs        = int(acc_class.fs)

    if move_type == 'tap':
        
        sel = acc_class.data[:, 4] == 'tap'  # 4 is task column
        d   = acc_class.data[:, 1][sel]  # 1 because of ACC_L_X
        t   = acc_class.times[sel]
        idx = np.arange(acc_class.data.shape[0])[sel]

        THR                  = .5e-7
        peak_idx, peak_props = find_peaks(d, height=THR, distance=int(fs*5))

        peak_t               = t[peak_idx]
        peak_i               = idx[peak_idx]

        # create boolean positive during tap
        tap_bool             = np.zeros(acc_class.data.shape[0])

        for i in peak_i:
            try:
                if acc_class.times[i+fs] - acc_class.times[i-(2*fs)] > 3:
                    tap_bool[i - (2*fs):i] = 1
                else: tap_bool[i - (2*fs):i+fs] = 1
            except IndexError:
                tap_bool[i - (2*fs):i] = 1
        
    return peak_i, peak_t, tap_bool
    
def filter_events_by_time_threshold(event_indices, fs, time_threshold):
    # Convert minimum time from seconds to samples
    min_time_samples = fs * time_threshold
    filtered_events  = [event_indices[0]]  # Start with the first event
    deleted_events   = []  # List to store deleted events

    # Iterate over the remaining events
    for i in range(1, len(event_indices)):
        start_current, end_current = event_indices[i]
        _, end_previous = filtered_events[-1]
        
        # Check if the current event starts at least min_time_samples after the previous event
        if((start_current - end_previous) >= min_time_samples):
            filtered_events.append(event_indices[i])
        else:
            deleted_events.append(event_indices[i])

    return filtered_events, deleted_events

def plot_single_tap(tap_indices, index, fs, hand, alignment):
    if(alignment=="onset"):
        tap_start_index  = tap_indices[index][0] - 5*fs
        tap_finish_index = tap_indices[index][1]
        
        if(hand=="right"):
            tap = SUB_ACCELEROMETER.ACC_R[tap_start_index:tap_finish_index]
        else:
            tap = SUB_ACCELEROMETER.ACC_L[tap_start_index:tap_finish_index]
            
        if(len(tap)<fs*10):
            tap = np.pad(tap, (0, fs*10- len(tap)), constant_values=np.nan)
        else:
            tap = tap[0:fs*10]
    else:
        tap_start_index  = tap_indices[index][1] - (tap_indices[index][1]-tap_indices[index][0])
        tap_finish_index = tap_indices[index][1] + 5*fs
        if(hand=="right"):
            tap = SUB_ACCELEROMETER.ACC_R[tap_start_index:tap_finish_index]
        else:
            tap = SUB_ACCELEROMETER.ACC_L[tap_start_index:tap_finish_index]
        if(len(tap)<fs*10):
            tap = np.pad(tap, (fs*10- len(tap),0), constant_values=np.nan) # pad the front
        else:
            tap = tap[-fs*10:] 

    t = np.linspace(-5, 5, fs*10)
    if(hand=="right"):
        plt.plot(t, tap, c="r", linewidth=1)
    else:
        plt.plot(t, tap, c="b", linewidth=1)
    plt.axvline(x=0, color='k', label=alignment)
    plt.ylim([0,1e-6])
    plt.title(hand + " hand:  index " + str(index))
    plt.show()

In [16]:
SUB_ACCELEROMETER                                     = ACCELEROMETER(PATH, "017")
acc_class_L                                           = DATA_IO(PATH, "017", 'acc_left')

tapping_L_boolean                                     = custom_tap_finding_p017(acc_class_L, move_type='tap')[2]  # boolean array
tapping_L_indices_filtered                            = utils_accelerometer.get_task_period_times_indices(tapping_L_boolean) # indices
tapping_L_indices_filtered, tapping_L_indices_removed = filter_events_by_time_threshold(tapping_L_indices_filtered, fs=512, time_threshold=4) # remove taps within 4 seconds range
    
print("left hemisphere removed overlapping taps: " + str(len(tapping_L_indices_removed)))

ACCELEROMETER: SUB-017
... loading started
... pickle loading: C:\Users\a.kaymak\Desktop\Papers\2025 Parkinson STN-DBS Dyskinesia LFP-ECG\files\data\sub-017\017_mergedData_v4.0_acc_right.P
... pickle loading: C:\Users\a.kaymak\Desktop\Papers\2025 Parkinson STN-DBS Dyskinesia LFP-ECG\files\data\sub-017\017_mergedData_v4.0_acc_left.P
... pickle loading: C:\Users\a.kaymak\Desktop\Papers\2025 Parkinson STN-DBS Dyskinesia LFP-ECG\files\data\sub-017\017_mergedData_v4.0_acc_left.P
left hemisphere removed overlapping taps: 8


# 2. Refine Taps

In [None]:
for index in range(len(tapping_L_indices_filtered)):
    plot_single_tap(tapping_L_indices_filtered, index=index, fs=512, hand="left", alignment="onset")
    plot_single_tap(tapping_L_indices_filtered, index=index, fs=512, hand="left", alignment="offset")
    print("--------------------------------------------------------------------------")
    print("--------------------------------------------------------------------------")
    print("--------------------------------------------------------------------------")

In [None]:
fs    = 512
index = 91
hand  = "left"

if(hand=="right"):
    print("length : " + str((tapping_R_indices_filtered[index][1]-tapping_R_indices_filtered[index][0]) / fs) + " seconds")
else:
    print("length : " + str((tapping_L_indices_filtered[index][1]-tapping_L_indices_filtered[index][0]) / fs) + " seconds")
    
if(hand=="right"):
    plot_single_tap(tapping_L_indices_filtered, index=index, fs=512, hand="right", alignment="onset")
    plot_single_tap(tapping_L_indices_filtered, index=index, fs=512, hand="right", alignment="onset")
else:
    plot_single_tap(tapping_L_indices_filtered, index=index, fs=512, hand="left", alignment="onset")
    plot_single_tap(tapping_L_indices_filtered, index=index, fs=512, hand="left", alignment="offset")

In [None]:
# correct onset
correction = -0.5

if(hand=="right"):
    plot_single_tap([(int(tapping_R_indices_filtered[index][0]+correction*fs),tapping_R_indices_filtered[index][1])], index=0, fs=512, hand=hand, alignment="onset")
else:
    plot_single_tap([(int(tapping_L_indices_filtered[index][0]+correction*fs),tapping_L_indices_filtered[index][1])], index=0, fs=512, hand=hand, alignment="onset")

In [None]:
# correct offset

correction = 1

if(hand=="right"):
    plot_single_tap([(int(tapping_R_indices_filtered[index][0]),int(tapping_R_indices_filtered[index][1]+correction*fs))], index=0, fs=512, hand=hand, alignment="offset")
else:
    plot_single_tap([(int(tapping_L_indices_filtered[index][0]),int(tapping_L_indices_filtered[index][1]+correction*fs))], index=0, fs=512, hand=hand, alignment="offset")

In [22]:
tapping_L_indices_filtered[0]  = (int(tapping_L_indices_filtered[0][0]+1.4*fs),int(tapping_L_indices_filtered[0][1]))
tapping_L_indices_filtered[1]  = (int(tapping_L_indices_filtered[1][0]+0.8*fs),int(tapping_L_indices_filtered[1][1]+0.5*fs))
tapping_L_indices_filtered[2]  = (int(tapping_L_indices_filtered[2][0]+0.5*fs),int(tapping_L_indices_filtered[2][1]+0.5*fs))
tapping_L_indices_filtered[3]  = (int(tapping_L_indices_filtered[3][0]+0.25*fs),int(tapping_L_indices_filtered[3][1]+0.5*fs))
tapping_L_indices_filtered[5]  = (int(tapping_L_indices_filtered[5][0]+1*fs),int(tapping_L_indices_filtered[5][1]+1.2*fs))
tapping_L_indices_filtered[6]  = (int(tapping_L_indices_filtered[6][0]+0.6*fs),int(tapping_L_indices_filtered[6][1]))
tapping_L_indices_filtered[8]  = (int(tapping_L_indices_filtered[8][0]),int(tapping_L_indices_filtered[8][1]+0.6*fs))
tapping_L_indices_filtered[14] = (int(tapping_L_indices_filtered[14][0]),int(tapping_L_indices_filtered[14][1]+1*fs))
tapping_L_indices_filtered[15] = (int(tapping_L_indices_filtered[15][0]-0.3*fs),int(tapping_L_indices_filtered[15][1]-0.3*fs))
tapping_L_indices_filtered[29] = (int(tapping_L_indices_filtered[29][1]-0.3*fs),int(tapping_L_indices_filtered[29][1]+0.8*fs))
tapping_L_indices_filtered[30] = (int(tapping_L_indices_filtered[30][0]+1.2*fs),int(tapping_L_indices_filtered[30][1]))
tapping_L_indices_filtered[31] = (int(tapping_L_indices_filtered[31][0]+1.2*fs),int(tapping_L_indices_filtered[31][1]+0.5*fs))
tapping_L_indices_filtered[60] = (int(tapping_L_indices_filtered[60][1]-0.85*fs),int(tapping_L_indices_filtered[60][1]+0.6*fs))
tapping_L_indices_filtered[61] = (int(tapping_L_indices_filtered[61][0]+1*fs),int(tapping_L_indices_filtered[61][1]))
tapping_L_indices_filtered[63] = (int(tapping_L_indices_filtered[61][0]-0.5*fs),int(tapping_L_indices_filtered[63][1]))

remove_L_indices               = [7,8,9,10,11,12,13,16,17,18,19,20,21,22,23,24,25,26,27,28,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,
                                  51,52,53,54,55,56,57,58,59,62,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91]
tapping_L_indices_corrected    = [value for i, value in enumerate(tapping_L_indices_filtered) if i not in remove_L_indices]

In [26]:
sub_taps          = {}
sub_taps["right"] = []
sub_taps["left"]  = tapping_L_indices_corrected
with open(DATA_IO.path_events + "017_updated_taps", 'wb') as handle:
    pickle.dump(sub_taps, handle, protocol=pickle.HIGHEST_PROTOCOL)