In [None]:
import os
import numpy as np
import scipy.io as sio
from joblib import Parallel, delayed
import dill as pickle
from preprocess.preprocessing_library import FFT, Slice, Magnitude, Log10
from utils.pipeline import Pipeline
import warnings
import scipy.signal
import re

def natural_key(string):
    return [int(s) if s.isdigit() else s.lower() for s in re.split(r'(\d+)', string)]

import h5py

from scipy.io import loadmat

In [None]:
# Perform sliding window FFT on the sliced signal sequence for feature extraction
# Generate a four-dimensional feature file with dimensions of (all_stamples, timew_windows, channels, frequency)

def load_mat_segment(file_path):
    mat_data = loadmat(file_path)
    data = mat_data['segment']
    return data
    # transpose

data_dir = r"P:\Pain_EEG\sliced\X1"
save_path = r"P:\Pain_EEG\preprocessed\X1"

sampling_frequency = 200  # Hz
fft_min_freq = 1  # Hz
fft_max_freq_actual = 60
window_length = 4  # second
window_step = 1.5  # second
# Sampling rate and sliding window FFT parameter settings

fft_max_freq = fft_max_freq_actual


pipeline = Pipeline([FFT(), Slice(fft_min_freq, fft_max_freq), Magnitude(), Log10()])

def process_single_file(mat_path):
    data = load_mat_segment(mat_path)
    start, step = 0, int(np.floor(window_step * sampling_frequency))
    stop = start + int(np.floor(window_length * sampling_frequency))
    fft_windows = []

    while stop <= data.shape[1]:
        signal_window = data[:, start:stop]
        fft_window = pipeline.apply(signal_window)  # shape: (21, frequency)
        fft_windows.append(fft_window)
        start += step
        stop += step

    return np.stack(fft_windows, axis=0)  # shape: (time_windows, channels, frequency)



mat_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.mat')]
# Sort files naturally (1.mat, 2.mat, ..., 10.mat, etc.)
mat_files.sort(key=natural_key)

all_data = []
warnings.filterwarnings("ignore")

for path in mat_files:
    fft_data = process_single_file(path)
    print(path)
    all_data.append(fft_data)

final_array = np.stack(all_data, axis=0)  # shape: (all_samples, time_windows, channels, frequency)

np.save(os.path.join(save_path, "X1_Pain_feature_x.npy"), final_array)
print(f"Success! Proprecessed data shape = {final_array.shape}, saved in: {save_path}")

In [None]:
# Not necessary!!!
# If you want to try channel montage/electrode differentiation, you can run this code on the sliced. mat file before generating the feature file

import os
import scipy.io as sio
import numpy as np

# Define electrode differential pairs
diff_pairs = [
    (2, 3), (2, 7), (2, 5), (2, 4), (7, 5), (5, 4), (3, 4), (3, 6), (3, 8),
    (8, 6), (6, 4), (4, 9), (9, 10), (10, 12), (9, 12), (9, 11), (11, 13),
    (9, 13), (9, 14), (14, 15), (15, 17), (14, 17), (17, 20), (15, 20),
    (14, 20), (14, 16), (16, 18), (14, 18), (14, 21), (16, 21), (18, 21),
    (20, 21)
]


input_folder = r'P:\Pain_EEG\sliced\X1'
output_folder = r'P:\Pain_EEG\sliced_montage\X1'

os.makedirs(output_folder, exist_ok=True)

mat_files = [f for f in os.listdir(input_folder) if f.endswith('.mat')]

for mat_file in mat_files:
    input_path = os.path.join(input_folder, mat_file)
    mat_data = sio.loadmat(input_path)
    segment = mat_data['segment']
    n_pairs = len(diff_pairs)
    result = np.zeros((n_pairs, segment.shape[1]))
    
    # Perform differential operation on each electrode pair
    for i, (a, b) in enumerate(diff_pairs):
        result[i, :] = segment[a-1, :] - segment[b-1, :]
    
    output_data = {'segment': result}
    output_path = os.path.join(output_folder, mat_file)
    sio.savemat(output_path, output_data)
    
    print(f'Processed and saved: {mat_file}')

print('All files processed successfully!')

In [None]:
# Test of Feature array dimension
test = np.load(r'P:\Pain_EEG\preprocessed\X1\X1_Pain_feature_x.npy')
print(test.shape)
test = np.mean(test,axis=0)
print(test.shape)
test = np.mean(test,axis=0)
print(test.shape)

# test = np.mean(test,axis=0)
# print(test.shape)


(225, 91, 62, 75)
(91, 62, 75)
(62, 75)


In [None]:
# Generate classification label file: Pain_label_y.npy
# 3 class: generate a numpy array as [0,0 … 1,1 … 2,2]

import pandas as pd
import numpy as np

save_path = r"P:\Pain_EEG\y_label"

df = pd.read_excel('Clinical_Pain_label.xlsx')
column_data = df['Pain_category']

mapping = {
    'Neuropathic Pain': 0,
    'Not-Neuropathic Pain': 1,
    'Control': 2
}

label_array = np.array([mapping[item] for item in column_data], dtype=int)


np.save(os.path.join(save_path, "Pain_label_y.npy"), final_array)
print(f"Success! Classification label data shape = {final_array.shape}, saved in: {save_path}")

In [None]:
# Generate Regression label file: Pain_score_NRS.npy & Pain_score_VAS.npy
# The NRS score is a decimal from 0 to 100, and the VAS score is an integer from 0 to 10
# Healthy individuals do not have pain ratings

import pandas as pd
import numpy as np

save_path = r"P:\Pain_EEG\y_label"

df = pd.read_excel('X1_Clinical_Pain_label.xlsx')

nrs_array = df['NRS'].astype(int).to_numpy()
vas_array = df['VAS'].astype(int).to_numpy()

np.save(os.path.join(save_path, "X1_Pain_score_NRS.npy"), nrs_array)
np.save(os.path.join(save_path, "X1_Pain_score_VAS.npy"), vas_array)

print(f"Success! Regression label data shape = {nrs_array.shape}, saved in: {save_path}")

In [None]:
# For the 3-classification task, it is necessary to concatenate the feature arrays of the three types of samples along the samples

import numpy as np

save_path = r"P:\Pain_EEG\preprocessed"

x1 = np.load('X1_Pain_feature_x.npy')
x2 = np.load('X2_Pain_feature_x.npy')
x3 = np.load('X3_Pain_feature_x.npy')

pain_feature_x = np.concatenate((x1, x2, x3), axis=0)

print("shape of pain_feature_x.npy:", pain_feature_x.shape)

np.save(os.path.join(save_path, "Pain_feature_x.npy"), pain_feature_x)

# Feature: (all_samples, time_windows, channels, frequency)
# Predicted label: Pain_label_y.npy (all_samples,)


In [None]:
# For the Regression task, it is necessary to concatenate the feature arrays of the 2 types of Pain (Neuropathic & Not-neuropathic)
import numpy as np
import os

save_path = r"P:\Pain_EEG\preprocessed"

nrs1 = np.load('X1_Pain_score_NRS.npy')
nrs2 = np.load('X2_Pain_score_NRS.npy')
pain_score_nrs = np.concatenate((nrs1, nrs2), axis=0)
print("shape of Pain_score_NRS.npy:", pain_score_nrs.shape)
np.save(os.path.join(save_path, "Pain_score_NRS.npy"), pain_score_nrs)

vas1 = np.load('X1_Pain_score_VAS.npy')
vas2 = np.load('X2_Pain_score_VAS.npy')
pain_score_vas = np.concatenate((vas1, vas2), axis=0)
print("shape of Pain_score_VAS.npy:", pain_score_vas.shape)
np.save(os.path.join(save_path, "Pain_score_VAS.npy"), pain_score_vas)

# Feature: (all_samples, time_windows, channels, frequency)
# Predicted label: Pain_score_VAS.npy Pain_score_NRS.npy