In [None]:
import numpy as np
import torch

from joblib import Parallel, delayed
from scipy.linalg import eigh
from scipy.signal import butter, filtfilt
from sklearn.feature_selection import mutual_info_regression
from sklearn.model_selection import train_test_split

In [None]:
# Generate Data Inter-Subject

subject = np.load('./datasets/EEG_EMG_Fusion/subject.npy')
label = np.load('./datasets/EEG_EMG_Fusion/label.npy')
new_label = np.load('./datasets/EEG_EMG_Fusion/new_label.npy')
eeg_data = np.load('./datasets/EEG_EMG_Fusion/eeg_data.npy')
emg_data = np.load('./datasets/EEG_EMG_Fusion/emg_data.npy')

subject = torch.from_numpy(subject).long()
label = torch.from_numpy(label).long()
new_label = torch.from_numpy(new_label).long()
eeg_data = torch.from_numpy(eeg_data).float()
emg_data = torch.from_numpy(emg_data).float()

SAMPLE_X = eeg_data
SAMPLE_Y = emg_data
label_tensor = new_label

indices = np.arange(SAMPLE_X.shape[0])

train_X, test_X, train_labels, test_labels, idx_train, idx_test = train_test_split(SAMPLE_X, label_tensor, indices, test_size=0.2, random_state=42)

train_major_label, test_major_label = new_label[idx_train], new_label[idx_test]
train_subtle_label, test_subtle_label = label[idx_train], label[idx_test]
train_subj_label, test_subj_label = subject[idx_train], subject[idx_test]

index_subj = [np.where(train_subj_label==k)[0] for k in range(0, 5)]

eeg_data_extract = eeg_data[index_subj[0]]
emg_data_extract = emg_data[index_subj[0]]

eeg_data = eeg_data_extract
emg_data = emg_data_extract

In [None]:
# Baselines of CMC

fs = 1000  # Sampling frequency
alpha_low, alpha_high = 8, 13
beta_low, beta_high = 13, 30
gamma_low, gamma_high = 30, 100


def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a


def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = filtfilt(b, a, data)
    return y


def create_segments(signal, nperseg, noverlap):
    n_segments = (len(signal) - noverlap) // (nperseg - noverlap)
    return np.asarray([signal[i * (nperseg - noverlap):i * (nperseg - noverlap) + nperseg] for i in range(n_segments)])


def compute_fourier_segments(signal, window, nperseg, noverlap):
    segments = create_segments(signal, nperseg, noverlap)
    windowed_segments = window * segments
    fft_segments = np.fft.rfft(windowed_segments, axis=1)
    return fft_segments


def negative_half_power(RX):
    E1,V1 = torch.linalg.eigh(RX)
    RF_NORM = V1@torch.diag(E1**(-1/2))@V1.T
    return RF_NORM


def compute_gram_matrix(X, Y, sigma):
    """Compute the Gram matrix between datasets X and Y using the Gaussian kernel."""
    # Compute pairwise squared distances between X and Y in a vectorized manner
    pairwise_sq_dists = np.sum(X**2, axis=1).reshape(-1, 1) + np.sum(Y**2, axis=1) - 2 * np.dot(X, Y.T)

    # Compute the Gram matrix
    gram_matrix = np.exp(-pairwise_sq_dists / (2 * sigma**2))
    return gram_matrix


def gaussian_kernel(x, y, sigma):
    return np.exp(-np.linalg.norm(x-y)**2 / (2*sigma**2))


def kernel_matrix(data, sigma):
    return compute_gram_matrix(data[:].reshape(-1, 1), data[:].reshape(-1, 1), sigma)


def KICA_spectrum(data_x, data_y, sigma=.1, alpha=1e-1):
    gram_matrix_x = compute_gram_matrix(data_x, data_x, sigma)
    gram_matrix_u = compute_gram_matrix(data_y, data_y, sigma)

    N0_matrix = (torch.eye(gram_matrix_x.shape[0]) - 1/gram_matrix_x.shape[0]).float()
    gram_matrix_x = torch.from_numpy(gram_matrix_x).float()
    gram_matrix_u = torch.from_numpy(gram_matrix_u).float()

    normalized_x = N0_matrix@gram_matrix_x@N0_matrix
    normalized_u = N0_matrix@gram_matrix_u@N0_matrix

    normalized_x_add = normalized_x + torch.eye((normalized_x.shape[0]))*alpha
    normalized_u_add = normalized_u + torch.eye((normalized_u.shape[0]))*alpha

    num_sample = data_x.shape[0]

    matrix_a = np.zeros((num_sample*2, num_sample*2))

    matrix_a[:num_sample, num_sample:] = normalized_x@normalized_u.T
    matrix_a[num_sample:, :num_sample] = normalized_u@normalized_x.T

    matrix_b = np.zeros((num_sample*2, num_sample*2))

    matrix_b[:num_sample, :num_sample] = normalized_x_add@normalized_x_add.T
    matrix_b[num_sample:, num_sample:] = normalized_u_add@normalized_u_add.T

    eigenvalues, eigenvectors = eigh(matrix_a, matrix_b)

    spectrum_kica = eigenvalues[data_x.shape[0]:][::-1]

    RF = normalized_x_add@normalized_x_add.T
    RG = normalized_u_add@normalized_u_add.T
    P = normalized_x@normalized_u.T

    E1,V1 = torch.linalg.eigh(RF)
    E2,V2 = torch.linalg.eigh(RG)

    RF_NORM = V1@torch.diag(E1**(-1/2))@V1.T
    RG_NORM = V2@torch.diag(E2**(-1/2))@V2.T
    P_STAR = RF_NORM@P@RG_NORM

    measure_kica = (-1/2)*np.log((1 - spectrum_kica[:]**2)).sum()
    measure_hsic = np.trace(P_STAR)

    U = 0
    S = 0
    V = 0

    return spectrum_kica, eigenvectors, U, S, V, measure_kica, measure_hsic


def custom_mutual_information(x, y, fs, window='hann', nperseg=None, noverlap=None):
    if nperseg is None:
        nperseg = 256
    if noverlap is None:
        noverlap = nperseg // 2
    if window == 'hann':
        window = np.hanning(nperseg)

    fft_x_segments = compute_fourier_segments(x, window, nperseg, noverlap)
    fft_y_segments = compute_fourier_segments(y, window, nperseg, noverlap)

    mutual_information = np.zeros(fft_x_segments.shape[1])
    kica = np.zeros(fft_x_segments.shape[1])
    hsic = np.zeros(fft_x_segments.shape[1])

    fft_x_segments = np.abs(fft_x_segments)
    fft_y_segments = np.abs(fft_y_segments)

    for i in range(fft_x_segments.shape[1]):
        mutual_information[i] = mutual_info_regression(fft_x_segments[:, i].reshape(-1, 1), fft_y_segments[:, i].reshape(-1))
        spectrum_kica, eigenvectors, U, S, V, measure_kica, measure_hsic = KICA_spectrum(fft_x_segments[:, i].reshape(-1, 1), fft_y_segments[:, i].reshape(-1, 1), sigma=0.1, alpha=0.1)
        kica[i] = measure_kica
        hsic[i] = measure_hsic

    Sxx = np.mean(np.abs(fft_x_segments) ** 2, axis=0)
    Syy = np.mean(np.abs(fft_y_segments) ** 2, axis=0)
    Sxy = np.mean(np.conj(fft_x_segments) * fft_y_segments, axis=0)

    coherence = np.abs(Sxy) ** 2 / (Sxx * Syy)

    freqs = np.fft.rfftfreq(nperseg, 1 / fs)

    return freqs, mutual_information, kica, hsic, coherence


def compute_measures(i, eeg_data, emg_data, fs):
    measure_alpha_list = np.zeros((4, eeg_data.shape[1], emg_data.shape[1], 129))
    measure_beta_list = np.zeros((4, eeg_data.shape[1], emg_data.shape[1], 129))
    measure_gamma_list = np.zeros((4, eeg_data.shape[1], emg_data.shape[1], 129))

    for j in range(eeg_data.shape[1]):
        for k in range(emg_data.shape[1]):

            eeg_alpha = butter_bandpass_filter(eeg_data[i, j, :], alpha_low, alpha_high, fs)
            eeg_beta = butter_bandpass_filter(eeg_data[i, j, :], beta_low, beta_high, fs)
            eeg_gamma = butter_bandpass_filter(eeg_data[i, j, :], gamma_low, gamma_high, fs)

            emg_alpha = butter_bandpass_filter(emg_data[i, k, :], alpha_low, alpha_high, fs)
            emg_beta = butter_bandpass_filter(emg_data[i, k, :], beta_low, beta_high, fs)
            emg_gamma = butter_bandpass_filter(emg_data[i, k, :], gamma_low, gamma_high, fs)

            freqs, mutual_information, kica, hsic, coherence = custom_mutual_information(eeg_alpha, emg_alpha, fs, nperseg=256, noverlap=None, window='hann')
            measure_alpha_list[:, j, k, :] = np.stack((mutual_information, kica, hsic, coherence))

            freqs, mutual_information, kica, hsic, coherence = custom_mutual_information(eeg_beta, emg_beta, fs, nperseg=256, noverlap=None, window='hann')
            measure_beta_list[:, j, k, :] = np.stack((mutual_information, kica, hsic, coherence))

            freqs, mutual_information, kica, hsic, coherence = custom_mutual_information(eeg_gamma, emg_gamma, fs, nperseg=256, noverlap=None, window='hann')
            measure_gamma_list[:, j, k, :] = np.stack((mutual_information, kica, hsic, coherence))

    return measure_alpha_list, measure_beta_list, measure_gamma_list

In [None]:
# Loop through the indexes with parallel processing

n_jobs = -1  # Change the value of n_jobs to represent the maximum number of cores you want to use
results = Parallel(n_jobs=n_jobs, verbose=10)(
    delayed(compute_measures)(i, eeg_data, emg_data, fs) 
    for i in range(eeg_data.shape[0])
)

# Combine the results
save_measure_alpha_list = np.zeros((4, eeg_data.shape[0], eeg_data.shape[1], emg_data.shape[1], 129))
save_measure_beta_list = np.zeros((4, eeg_data.shape[0], eeg_data.shape[1], emg_data.shape[1], 129))
save_measure_gamma_list = np.zeros((4, eeg_data.shape[0], eeg_data.shape[1], emg_data.shape[1], 129))

for i, (measure_alpha_list, measure_beta_list, measure_gamma_list) in enumerate(results):
    save_measure_alpha_list[:, i, :, :, :] = measure_alpha_list
    save_measure_beta_list[:, i, :, :, :] = measure_beta_list
    save_measure_gamma_list[:, i, :, :, :] = measure_gamma_list

np.save('subj0_measure_alpha_list_cmc', save_measure_alpha_list)
np.save('subj0_measure_beta_list_cmc', save_measure_beta_list)
np.save('subj0_measure_gamma_list_cmc', save_measure_gamma_list)