Import Libraries

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from scipy.signal import welch, find_peaks
from scipy.stats import entropy, iqr, mode

import h5py

Import 5 Seconds Sensor Data Sequences

In [2]:
with h5py.File(r'D:\Ali_Thesis\synthetic_data_generation\Data\Process_canada_data\P13_5_sec_30hz_sequences_sensor_data.h5', 'r') as f:
    X = f['data'][:]
    print(f"Loaded data shape: {X.shape}")

Loaded data shape: (417840, 6, 150)


In [3]:
num_sequences, num_channels, num_samples = X.shape
print(X.shape)

(417840, 6, 150)


Get Features From The Data

In [4]:
class SensorFeatureExtractor:
    def __init__(self, window_size=150, frequency=30):
        self.window_size = window_size
        self.frequency = frequency
    
    def teager_energy(self, signal):
        return signal[:-2]**2 - signal[1:-1] * signal[2:]
    
    def compute_acc_features(self, signal):
        signal = np.linalg.norm(signal, axis=0)
        te = self.teager_energy(signal)
        freqs, psd = welch(signal, fs=self.frequency, nperseg=len(signal))

        # Avoid division by zero (Normalize PSD with small offset)
        psd_sum = np.sum(psd)
        psd_norm = psd / (psd_sum + 1e-12)  # Add small value to prevent division by zero

        # Compute entropy and handle NaN values
        spectral_entropy = entropy(psd_norm)

        fft_values = np.fft.fft(signal)
        power_spectrum = np.abs(fft_values) ** 2

        # Normalize the power spectrum
        power_spectrum_norm = power_spectrum / np.sum(power_spectrum)

        # Extract DC power (first component)
        dc_power_value = power_spectrum_norm[0]

        return np.array([
            np.max(te),
            np.min(te),
            np.mean(te),
            np.sum(te),
            np.mean(signal),
            np.min(signal),
            np.std(signal),
            iqr(signal),
            spectral_entropy,
            dc_power_value
        ])
    
    def compute_eda_features(self, signal):
        peaks, _ = find_peaks(signal)
        freqs, psd = welch(signal, fs=self.frequency, nperseg=len(signal))

        # Avoid division by zero (Normalize PSD with small offset)
        psd_sum = np.sum(psd)
        psd_norm = psd / (psd_sum + 1e-12)  # Add small value to prevent division by zero

        # Compute FFT power spectrum
        fft_values = np.fft.fft(signal)
        power_spectrum = np.abs(fft_values) ** 2

        # Avoid division by zero (Normalize FFT power spectrum with small offset)
        power_sum = np.sum(power_spectrum)
        power_norm = power_spectrum / (power_sum + 1e-12)  # Add small value

        # Compute entropy and handle NaN values
        spectral_entropy = entropy(psd_norm)
        fft_entropy = entropy(power_norm)

        # Ensure no NaN values
        spectral_entropy = np.nan_to_num(spectral_entropy, nan=0.0)
        fft_entropy = np.nan_to_num(fft_entropy, nan=0.0)

        return np.array([
            np.max(np.gradient(signal)),
            mode(signal, keepdims=True).mode[0] if signal.size > 0 else np.nan,
            np.sum(np.abs(signal)),
            len(peaks),
            np.std(signal),
            spectral_entropy,
            fft_entropy
        ])
    
    def compute_bvp_features(self, signal):
        peaks, _ = find_peaks(signal)
        ibi = np.diff(peaks) if len(peaks) > 1 else [0]
        freqs, psd = welch(signal, fs=self.frequency, nperseg=len(signal))
        
        lf_band = (freqs >= 0.01) & (freqs < 0.08)
        mf_band = (freqs >= 0.08) & (freqs < 0.15)
        hf_band = (freqs >= 0.15) & (freqs < 0.4)
        
        lf_power = np.log(np.sum(psd[lf_band])) if np.any(lf_band) else 0
        mf_power = np.log(np.sum(psd[mf_band])) if np.any(mf_band) else 0
        hf_power = np.log(np.sum(psd[hf_band])) if np.any(hf_band) else 0
        energy_ratio = mf_power / (lf_power + hf_power) if (lf_power + hf_power) > 0 else 0
        
        return np.array([
            np.std(ibi),
            np.mean(ibi) if len(ibi) > 0 else 0,
            np.log(np.sum(psd[(freqs >= 0.1) & (freqs <= 0.2)])),
            np.sum(psd[(freqs >= 0.01) & (freqs < 0.08)]) / np.sum(psd[(freqs >= 0.15) & (freqs < 0.4)]),
            lf_power,
            mf_power,
            hf_power,
            energy_ratio,
            np.mean(signal),
            np.max(signal),
            np.min(signal),
            np.std(signal),
            iqr(signal),
            np.mean(psd),
            entropy(psd),
            np.linalg.norm(np.abs(np.fft.fft(signal))[1:]),
            np.abs(np.fft.fft(signal))[0]
        ])
    
    def compute_temp_features(self, signal):
        return np.array([
            np.arctan(np.gradient(signal)).mean(),
            np.mean(signal),
            np.std(signal)
        ])
    
    def extract_features(self, sensor_type, signal):
        if sensor_type == 'ACC':
            return self.compute_acc_features(signal)
        elif sensor_type == 'EDA':
            return self.compute_eda_features(signal)
        elif sensor_type == 'BVP':
            return self.compute_bvp_features(signal)
        elif sensor_type == 'TEMP':
            return self.compute_temp_features(signal)
        else:
            raise ValueError("Unknown sensor type")

In [5]:
# Initialize the feature extractor
extractor = SensorFeatureExtractor()

# Storage for extracted features
eda_features_list = []
acc_features_list = []
bvp_features_list = []
temp_features_list = []


for i in range(num_sequences):
    eda_data = X[i, 0, :]  # EDA
    acc_data = X[i, 1:4, :]  # ACC (x, y, z)
    bvp_data = X[i, 4, :]  # BVP
    temp_data = X[i, 5, :]  # TEMP

    # Extract features
    eda_features_list.append(extractor.extract_features('EDA', eda_data))
    acc_features_list.append(extractor.extract_features('ACC', acc_data))
    bvp_features_list.append(extractor.extract_features('BVP', bvp_data))
    temp_features_list.append(extractor.extract_features('TEMP', temp_data))

# Convert lists to NumPy arrays for easier processing
eda_features_array = np.array(eda_features_list)
acc_features_array = np.array(acc_features_list)
bvp_features_array = np.array(bvp_features_list)
temp_features_array = np.array(temp_features_list)

# Print results
print(f'EDA shape: {eda_features_array.shape}')
print(f'ACC shape: {acc_features_array.shape}')
print(f'BVP shape: {bvp_features_array.shape}')
print(f'TEMP shape: {temp_features_array.shape}')

EDA shape: (417840, 7)
ACC shape: (417840, 10)
BVP shape: (417840, 17)
TEMP shape: (417840, 3)


In [6]:
# Stack all feature arrays horizontally to form (num_sequences, num_all_features)
features_array = np.hstack([eda_features_array, acc_features_array, bvp_features_array, temp_features_array])
print(f'Features array shape: {features_array.shape}')

Features array shape: (417840, 37)


Save Features To Check It With The Other Notebook

In [7]:
np.savez_compressed('P13_5_sec_features_array.npz', features_array)