In [4]:
# Import packages
import numpy as np
import pandas as pd
from scipy.stats import tvar, skew, kurtosis
from scipy.integrate import simps

from utils.dataloaders import OneSignal
from utils import random_state
# import random

random_state(36)

In [37]:
class FeatureConstructor:
    def __init__(self, data_name=None):
        """
        :param
            --data_name: filename like 'S001_128.mat' or tuple (data_path,label_path,peaks_path)
        """
        # Initialize class arguments
        self.data_name = data_name

        # Get information for patient with data_name
        self.signal = OneSignal(data_name=self.data_name)
        # Filter the PPG signal
        self.signal.filter(fL = 0.5, fH = 4.3, order = 4)
        # Align onsets to determine crops: always 1 peak between 2 onsets
        self.signal.align_onsets()

        # Set attributes of FeatureExtractor
        self.ppg = self.signal.ppg              # get filtered ppg and derivatives
        self.vpg = self.signal.vpg
        self.apg = self.signal.apg
        self.jpg = self.signal.jpg
        self.fs = self.signal.fs                # sampling frequency                        --> int
        self.peaks = self.signal.peaks.flatten()# peaks array                               --> (number_of_peaks,)
        self.labels = self.signal.labels        # labels                                    --> (number_of_peaks,)
        self.onsets = self.signal.on            # determined by self.signal.align_onsets()  --> (number_of_peaks+1,)


    def generate_crops(self):
        self.crops = []
        while self.signal.indx < self.signal.indx_max:
            # (x, y), (x_r, y_r) = self.signal.crop(raw=True)
            crop, _ = self.signal.crop(raw=False)
            self.crops.append(crop)

    def get_intra_crop_features(self):
        ""
        self.ft_intra_crop_names = ['crop_duration','t_peak','mean','median','std','tvar','skew','kurt',
                                    'auc','peak_amplitude','pulse_width','symmetry',]
        self.ft_intra_crop = np.zeros(((self.peaks.shape[0]), len(self.ft_intra_crop_names)))

        # Construct self.crops
        self.generate_crops()

        # Loop over all crops: extract features
        for i, crop in enumerate(self.crops):
            self.ft_intra_crop[i,:] = np.array(
                [crop.shape[0] / self.fs,
                 np.argmax(crop) / self.fs,
                 np.mean(crop),
                 np.median(crop),
                 np.std(crop),
                 tvar(crop), # tune values?
                 skew(crop),
                 kurtosis(crop),
                 simps(np.abs(crop), dx=1/self.fs), # AUC: Simpson's rule for numeral integration
                 np.max(crop)-np.min(crop),
                 self.pulse_width(crop),
                 self.symmetry_index(crop)
                 ])

    def get_inter_crop_features(self):
        ""
        self.ft_inter_crop_names = ['PTP','N_last_X_s']

        self.ft_inter_crop = np.zeros(((self.peaks.shape[0]), len(self.ft_inter_crop_names)))
        
        self.ft_inter_crop[:,0] = self.peak_to_peak_times()
        self.ft_inter_crop[:,1] = self.N_ratio()

    def get_patient_specific_features(self):
        ""
        self.ft_patient_names = []
        self.ft_patient = np.array([[]])

    def construct_dataframe(self):
        # Get features
        self.feature_names = self.ft_intra_crop_names + self.ft_inter_crop_names + self.ft_patient_names
        features = np.concatenate([self.ft_intra_crop, self.ft_inter_crop, self.ft_patient])

        # Create a DataFrame
        data = {'peaks': self.peaks, 'labels': self.labels}
        for i, feature_name in enumerate(self.feature_names):
            data[feature_name] = features[:, i]

        self.df = pd.DataFrame(data)
        self.df.to_csv('your_dataframe.csv', index=False)

    #---------------------------------------------------
    #   Functions used in get_intra_crop_features()
    #---------------------------------------------------
        
    def pulse_width(self, crop):
        if len(crop) == 0:
            return 0  # or some default value, as appropriate
        
        half_peak = max(crop) / 2
        idx_peak = np.argmax(crop)
        
        # Find indices on both sides of the peak
        # Index of half value of peak before peak
        idx_t1 = self.find_nearest(crop[:idx_peak], half_peak)
        # Index of half value of peak after peak
        idx_t2 = self.find_nearest(crop[idx_peak:], half_peak) + idx_peak

        # Calculate the width
        width = (idx_t2 - idx_t1)/self.fs # [s]

        return width

    #From https://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array
    def find_nearest(self, array, value):
        array = np.asarray(array)
        if len(array) == 0:
            idx = 0
        else:
            idx = (np.abs(array - value)).argmin()
        return idx
    
    def symmetry_index(self, crop):
        middle_idx = len(crop) // 2
        
        left_half = crop[:middle_idx]
        right_half = crop[middle_idx:]
        
        mean_left = np.mean(left_half)
        mean_right = np.mean(right_half)

        symmetry_index = mean_right / mean_left
        
        return symmetry_index
    
    #---------------------------------------------------
    #   Functions used in get_inter_crop_features()
    #---------------------------------------------------

    def peak_to_peak_times(self):
        time_between_peaks = np.diff(self.peaks) / self.fs
        mean_PTP = np.mean(time_between_peaks)
        time_between_peaks = mean_PTP
        time_between_peaks = np.insert(time_between_peaks, 0, mean_PTP)

        return time_between_peaks
    
    def N_ratio(self, time_window=20):
        indices_before = int(time_window*self.fs)

        number_of_Ns = np.zeros(len(self.peaks))
        for i, peak_idx in enumerate(self.peaks):
            indices_in_window = np.where((peak_idx - indices_before <= self.peaks) & (self.peaks < peak_idx))[0]
            count = np.count_nonzero(self.labels[indices_in_window] ==  'N')
            number_of_Ns[i] = count

        return number_of_Ns
        
    # def drop_empty_crops(self, crops):

    #     out = [crop for crop in crops if len(crop) > 4]
    #     print("Number of crops eliminated: ", len(crops)-len(out))
    #     return out

    # def get_max_freq(self, crop, fs): #NEED FS OF INDIVIDUAL SIGNAL :(
    #     fft = np.fft.fft(crop)
    #     freqs = np.fft.fftfreq(len(crop), d=1/fs)
    #     dom_freq_idx = np.argmax(np.abs(fft))
    #     dom_freq = np.abs(freqs[dom_freq_idx])
    #     return dom_freq

In [38]:
patient = FeatureConstructor('S001_128.mat')

filtering signal S001_128.mat...


In [39]:
patient.get_intra_crop_features()
patient.get_inter_crop_features()
patient.get_patient_specific_features()

In [40]:
patient.construct_dataframe()

ValueError: all the input array dimensions except for the concatenation axis must match exactly, but along dimension 0, the array at index 0 has size 2603 and the array at index 2 has size 1

In [31]:
import numpy as np

def P2P(peaks_idx, fs):
    distances = np.diff(peaks_idx)
    print("Distances:", distances)

    deltat = 1 / fs
    distances = distances * deltat
    print("Distances with time:", distances)

    mean_distance = np.mean(distances)
    print("Mean distance:", mean_distance)

    distances = np.insert(distances, 0, mean_distance)
    print("Distances with mean at the beginning:", distances)

    return distances

# Test variables
peaks_idx_test = [10, 20, 30, 40, 50]  # Replace with your actual peaks_idx
fs_test = 100  # Replace with your actual sampling frequency

# Call the function with the test variables
result = P2P(peaks_idx_test, fs_test)


Distances: [10 10 10 10]
Distances with time: [0.1 0.1 0.1 0.1]
Mean distance: 0.1
Distances with mean at the beginning: [0.1 0.1 0.1 0.1 0.1]


In [32]:
class YourClass:
    def __init__(self, peaks, fs):
        self.peaks = peaks
        self.fs = fs

    def peak_to_peak_times(self):
        time_between_peaks = np.diff(self.peaks) / self.fs
        mean_PTP = np.mean(time_between_peaks)
        time_between_peaks[0] = mean_PTP

        return time_between_peaks

# Test variables
peaks_test = np.array([10, 20, 30, 40, 50])  # Replace with your actual peaks
fs_test = 100  # Replace with your actual sampling frequency

# Create an instance of YourClass
test_instance = YourClass(peaks_test, fs_test)

# Call the peak_to_peak_times method
result = test_instance.peak_to_peak_times()

# Print the result
print("Result:", result)

Result: [0.1 0.1 0.1 0.1]


In [None]:
def find_nearest(array, value): #From https://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array, used for one of features
    array = np.asarray(array)
    if len(array) == 0:
        idx = 0
    else:
        idx = (np.abs(array - value)).argmin()
    return idx

def get_pulse_width(crop):  #need to verify if this done correctly...

    half_peak = max(crop)/2
    idx_peak = np.argmax(crop[1:])
    idx_t1 = find_nearest(crop[:idx_peak], half_peak) #number of timesteps for amplitude to get to half value of peak before peak
    idx_t1 = len(crop[:idx_peak]) -idx_t1 #time from that point to peak
    idx_t2 = find_nearest(crop[idx_peak:], half_peak) #number of timesteps for amplitude to get to half value of peak after peak

    return idx_t2+idx_t1 #time between half maximum values
    
def drop_empty_crops(crops):

    out = [crop for crop in crops if len(crop) > 4]
    print("Number of crops eliminated: ", len(crops)-len(out))
    return out

def get_max_freq(crop, fs): #NEED FS OF INDIVIDUAL SIGNAL :(
    fft = np.fft.fft(crop)
    freqs = np.fft.fftfreq(len(crop), d=1/fs)
    dom_freq_idx = np.argmax(np.abs(fft))
    dom_freq = np.abs(freqs[dom_freq_idx])
    
    return dom_freq

def get_symmetry_index(crop): #
    middle_idx = len(crop) // 2
    
    left_half = crop[:middle_idx]
    right_half = crop[middle_idx:]
    

    mean_left = np.mean(left_half)
    mean_right = np.mean(right_half)

    symmetry_index = mean_right / mean_left
    
    return symmetry_index


def crops2features(crops):

    list_of_features = ['mean', 'median', 'std', 'var', 'skew', 'kurt','auc','peak_amplitude','pulse_width','max_freq','symmetry', 'peak_position']

    ind = 0
    df = pd.DataFrame(columns = list_of_features)
    for crop in crops:

        #First 6 features taken from Lab 1, rest inspired by ChatGPT
        values = [np.mean(crop),
                  np.median(crop),
                  np.std(crop),
                  stats.tvar(crop),
                  stats.skew(crop),
                  stats.kurtosis(crop),

                    sum(np.array(crop)-min(crop)), #not sure if sound, since min could also be before or after crop...
                    max(crop)-min(crop), #idem
                    get_pulse_width(crop),
                    get_max_freq(crop, 128), #assumed 128 for now....
                    get_symmetry_index(crop),
                    int(np.argmax(crop))
                  ]

        df2 = pd.DataFrame(dict(zip(list_of_features,
                                    values)), index = [ind])
        df = pd.concat([df,df2])
        ind += 1

    return df


In [19]:
patient = FeatureConstructor('S001_128.mat')

filtering signal S001_128.mat...


In [20]:
patient.peaks.shape
# patient.labels.shape
# patient.onsets.shape

(2603,)

In [21]:
patient.fs

128