In [None]:
import numpy as np
import pandas as pd
from scipy.io import loadmat

file_names = ['0_0.mat','7_1.mat','7_2.mat','7_3.mat','14_1.mat','14_2.mat','14_3.mat','21_1.mat','21_2.mat','21_3.mat']

for file in file_names:
    data = loadmat(f'matfiles\\{file}')
    print(list(data.keys()))


In [None]:
data_columns = ['X097_DE_time', 'X105_DE_time', 'X118_DE_time', 'X130_DE_time', 'X169_DE_time',
                'X185_DE_time','X197_DE_time','X209_DE_time','X222_DE_time','X234_DE_time']
columns_name = ['de_normal','de_7_inner','de_7_ball','de_7_outer','de_14_inner','de_14_ball','de_14_outer','de_21_inner','de_21_ball','de_21_outer']
data_12k_10c = pd.DataFrame()
for index in range(10):
    data = loadmat(f'matfiles\\{file_names[index]}')
    dataList = data[data_columns[index]].reshape(-1)
    data_12k_10c[columns_name[index]] = dataList[:119808]  
print(data_12k_10c.shape)
data_12k_10c

In [None]:
data_12k_10c.set_index('de_normal',inplace=True)
data_12k_10c.to_csv('data_12k_10c.csv')
data_12k_10c.shape

In [None]:
import numpy as np
import pandas as pd
import sklearn
from joblib import dump, load
from sklearn.model_selection import KFold  
import torch

def split_data_with_overlap(data, time_steps, label, overlap_ratio=0.5):
    stride = int(time_steps * (1 - overlap_ratio)) 
    samples = (len(data) - time_steps) // stride + 1  
    data_list = []
    for i in range(samples):
        start_idx = i * stride
        end_idx = start_idx + time_steps
        temp_data = data[start_idx:end_idx].tolist()
        temp_data.append(label)  
        data_list.append(temp_data)
    return pd.DataFrame(data_list, columns=[x for x in range(time_steps + 1)])

def normalize(data):
    data = np.array(data)
    s = (data - np.min(data)) / (np.max(data) - np.min(data) + 1e-8) 
    return s

def make_datasets(data_file_csv, test_rate=0.1):
    origin_data = pd.read_csv(data_file_csv)
    time_steps = 1024  
    overlap_ratio = 0.5  
    all_samples = []
    label = 0  

    for column_name, column_data in origin_data.items():
        column_data = normalize(column_data)  
        split_data = split_data_with_overlap(column_data, time_steps, label, overlap_ratio)
        label += 1
        all_samples.append(split_data)

    min_samples = min([len(sample) for sample in all_samples])
    all_samples_align = [sample.iloc[:min_samples, :] for sample in all_samples]

    total_data = pd.concat(all_samples_align, axis=0).reset_index(drop=True)
    total_data = sklearn.utils.shuffle(total_data).reset_index(drop=True)

    test_len = int(len(total_data) * test_rate)
    test_set = total_data.iloc[:test_len, :]
    kfold_data = total_data.iloc[test_len:, :]

    return kfold_data, test_set

def make_data_labels(dataframe):
    x_data = dataframe.iloc[:, 0:-1]
    y_label = dataframe.iloc[:, -1]
    x_data = torch.tensor(x_data.values).float()
    y_label = torch.tensor(y_label.values.astype('int64'))
    x_data = x_data.unsqueeze(1)
    
    return x_data, y_label

if __name__ == '__main__':
    kfold_data, test_set = make_datasets('data_12k_10c.csv', test_rate=0.1)
    test_x, test_y = make_data_labels(test_set)
    dump(test_x, 'testX_1024_10c')
    dump(test_y, 'testY_1024_10c')
    print(f"独立测试集 shape: X={test_x.shape}, Y={test_y.shape}")
    kf = KFold(n_splits=5, shuffle=True, random_state=42)  
    fold_num = 1

    for train_index, val_index in kf.split(kfold_data):
        print(f"\n===== 正在生成第 {fold_num} 折数据集 =====")
        train_fold = kfold_data.iloc[train_index, :]
        val_fold = kfold_data.iloc[val_index, :]
        
        train_x, train_y = make_data_labels(train_fold)
        val_x, val_y = make_data_labels(val_fold)
        
        dump(train_x, f'trainX_fold{fold_num}_1024_10c')
        dump(train_y, f'trainY_fold{fold_num}_1024_10c')
        dump(val_x, f'valX_fold{fold_num}_1024_10c')
        dump(val_y, f'valY_fold{fold_num}_1024_10c')
        
        print(f"第{fold_num}折 训练集 shape: X={train_x.shape}, Y={train_y.shape}")
        print(f"第{fold_num}折 验证集 shape: X={val_x.shape}, Y={val_y.shape}")
        
        fold_num += 1


In [None]:
import logging
import numpy as np
import random
from scipy.signal import hilbert
import torch
from joblib import dump, load
import warnings
from sklearn.svm import SVR

if not hasattr(np, 'find_common_type'):
    def find_common_type(array_types, scalar_types):
        return np.result_type(*array_types, *scalar_types)
    np.find_common_type = find_common_type
if not hasattr(np, 'chararray'):
    class chararray(np.ndarray):
        pass
    np.chararray = chararray

warnings.filterwarnings('ignore')
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=RuntimeWarning)

try:
    from PyEMD import EMD
except ImportError:
    from PyEMD.EMD import EMD

np.random.seed(42)
random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True)

def svr_extension(signal, extend_num=50, C=1.0, gamma=0.1, epsilon=0.1):
    N = len(signal)
    left_train_t = np.arange(N).reshape(-1, 1)
    left_train_x = signal[:N]
    svr_left = SVR(C=C, gamma=gamma, epsilon=epsilon)
    svr_left.fit(left_train_t, left_train_x)
    left_ext = svr_left.predict(np.arange(-extend_num, 0).reshape(-1, 1))

    right_train_t = np.arange(len(signal) - N, len(signal)).reshape(-1, 1)
    right_train_x = signal[-N:]
    svr_right = SVR(C=C, gamma=gamma, epsilon=epsilon)
    svr_right.fit(right_train_t, right_train_x)
    right_ext = svr_right.predict(np.arange(len(signal), len(signal) + extend_num).reshape(-1, 1))

    return np.concatenate([left_ext, signal, right_ext])

def calculate_RMS(data1, data2):
    return np.sqrt(np.mean((data1 - data2)**2))

def calculate_hilbert_spectrum_entropy(imfs):
    total_entropy = 0
    for imf in imfs:
        analytic_signal = hilbert(imf)
        amplitude_envelope = np.abs(analytic_signal)
        power_spectrum = np.abs(np.fft.fft(amplitude_envelope))**2
        power_spectrum_normalized = power_spectrum / np.sum(power_spectrum)
        entropy = -np.sum(power_spectrum_normalized * np.log2(power_spectrum_normalized + 1e-10))
        total_entropy += entropy
    return total_entropy / len(imfs) if len(imfs) > 0 else 0

def evaluate_imfs(imfs, original):
    original_length = len(original)
    end_region_length = int(0.1 * original_length)
    rmse_front = calculate_RMS(imfs[:, :end_region_length], original[:end_region_length])
    rmse_back = calculate_RMS(imfs[:, -end_region_length:], original[-end_region_length:])
    rmse = (rmse_front + rmse_back) / 2
    hse = calculate_hilbert_spectrum_entropy(imfs)
    return {'rmse': rmse, 'hse': hse}

fitness_cache = {}

def fitness_function(params, signal, extend_num=50):
    params_tuple = tuple(params)
    if params_tuple in fitness_cache:
        return fitness_cache[params_tuple]
    C, gamma, epsilon = params
    x_svr = svr_extension(signal, extend_num, C, gamma, epsilon)
    emd = EMD(max_imf=7, max_sifting=200, tol=1e-1, spline_kind='linear')
    imfs_svr = emd.emd(x_svr)
    if len(imfs_svr) == 0:
        fitness = [9999,9999]
        fitness_cache[params_tuple] = fitness
        return fitness
    imfs_svr_cropped = imfs_svr[:, extend_num:extend_num + len(signal)]
    eval_result = evaluate_imfs(imfs_svr_cropped, signal)
    fitness = [eval_result['rmse'], eval_result['hse']]
    fitness_cache[params_tuple] = fitness
    return fitness

def non_dominated_sorting(population, signal, extend_num=50):
    pop_size = len(population)
    if pop_size == 0:
        return [[0]]
    fronts = []
    S = [[] for _ in range(pop_size)]
    n = [0] * pop_size
    rank = [0] * pop_size
    fitness_values = [fitness_function(p, signal, extend_num) for p in population]

    for p in range(pop_size):
        for q in range(pop_size):
            if p != q:
                p_dominates = all(fitness_values[p][i] <= fitness_values[q][i] for i in range(2)) and any(fitness_values[p][i] < fitness_values[q][i] for i in range(2))
                if p_dominates:
                    S[p].append(q)
                else:
                    n[p] += 1
        if n[p] == 0:
            rank[p] = 0
            if not fronts: fronts.append([])
            fronts[0].append(p)
    i = 0
    while i < len(fronts) and fronts[i]:
        Q = []
        for p in fronts[i]:
            for q in S[p]:
                n[q] -= 1
                if n[q] == 0:
                    rank[q] = i+1
                    Q.append(q)
        i +=1
        if Q: fronts.append(Q)
    if not fronts:
        fronts = [[i for i in range(pop_size)]]
    return fronts

def calculate_crowding_distance(front, fitness_values):
    front_len = len(front)
    if front_len <= 1:
        return [0.0] * front_len
    
    crowding_distances = [0.0] * front_len
    for i in range(2):
        if len(fitness_values) == 0:
            continue
        sorted_idx = sorted(front, key=lambda x: fitness_values[x][i])
        crowding_distances[sorted_idx[0]] = float('inf')
        crowding_distances[sorted_idx[-1]] = float('inf')
        
        f_max = fitness_values[sorted_idx[-1]][i]
        f_min = fitness_values[sorted_idx[0]][i]
        if f_max - f_min > 1e-6:
            for j in range(1, front_len-1):
                crowding_distances[sorted_idx[j]] += (fitness_values[sorted_idx[j+1]][i] - fitness_values[sorted_idx[j-1]][i])/(f_max-f_min)
    return crowding_distances

def quantum_mutation(coati, lower_bound, upper_bound):
    num_dimensions = len(coati)
    for i in range(num_dimensions):
        if random.random() < 0.1:
            delta = 0.5 * np.pi * (2*random.random()-1)
            coati[i] = np.clip(coati[i] + delta*(upper_bound[i]-lower_bound[i]), lower_bound[i], upper_bound[i])
    return coati

def logistic_map(x0=0.2, r=3.9, num_points=10):
    seq = [x0]
    for _ in range(num_points-1): seq.append(r*seq[-1]*(1-seq[-1]))
    return np.array(seq)

def MOCOA(num_coatis=10, num_dimensions=3, num_generations=1, lower_bound=[0.1,0.01,0.01], upper_bound=[10,1,1], signal=None, extend_num=50):
    lower_bound = np.array(lower_bound)
    upper_bound = np.array(upper_bound)
    population = np.zeros((num_coatis, num_dimensions))
    for d in range(num_dimensions):
        population[:,d] = lower_bound[d] + logistic_map(num_points=num_coatis)*(upper_bound[d]-lower_bound[d])

    for generation in range(num_generations):
        fronts = non_dominated_sorting(population, signal, extend_num)
        if not fronts:
            return population, []
        fitness_values = [fitness_function(p, signal, extend_num) for p in population]
        crowding_distances = []
        for front in fronts: 
            cd = calculate_crowding_distance(front, fitness_values)
            crowding_distances.extend(cd)
        if len(fronts[0])>0 and len(crowding_distances)>0:
            leader_idx = fronts[0][np.argmax(crowding_distances[:len(fronts[0])])]
            leader = population[leader_idx]
        else:
            leader = population[0]

        if generation < num_generations*0.7:
            for i in range(num_coatis):
                population[i] = np.clip(population[i] + (2*random.random()-1)*(leader-population[i]), lower_bound, upper_bound)
        else:
            for i in range(num_coatis):
                population[i] = np.clip(population[i] + random.random()*0.1*(leader-population[i]), lower_bound, upper_bound)
        
        for i in range(num_coatis):
            if random.random() <0.1: population[i] = quantum_mutation(population[i], lower_bound, upper_bound)

    final_fronts = non_dominated_sorting(population, signal, extend_num)
    if not final_fronts or len(final_fronts[0])==0:
        return population, []
    return [population[i] for i in final_fronts[0]], []

def select_final_params(pareto_front, signal, extend_num=50):
    if len(pareto_front) == 0:
        return [2.0, 0.2, 0.1]
    fitness_values = [fitness_function(p, signal, extend_num) for p in pareto_front]
    return pareto_front[np.argmin([f[0] for f in fitness_values])]

def imf_make_unify(data, labels, imfs_unify=7):
    samples, _, signal_len = data.shape
    emd_result = np.zeros((samples, imfs_unify, signal_len))
    delete_list = []
    for i in range(samples):
        imf = data[i]
        if imf.shape[0] == imfs_unify:
            emd_result[i] = imf
        elif imf.shape[0] > imfs_unify:
            emd_result[i] = np.vstack([imf[:imfs_unify-1], np.sum(imf[imfs_unify-1:], axis=0)])
        else:
            delete_list.append(i)
    for idx in sorted(delete_list, reverse=True):
        emd_result = np.delete(emd_result, idx, axis=0)
        labels = np.delete(labels, idx, axis=0)
    return torch.tensor(emd_result, dtype=torch.float32), torch.tensor(labels, dtype=torch.int64)

def unify_imf_components(imfs, target_num=7):
    if imfs.size == 0 or imfs.ndim < 2:
        return np.zeros((target_num, 1024))
    current_num, sig_len = imfs.shape
    if current_num == target_num: return imfs.copy()
    elif current_num > target_num: return np.vstack([imf[:target_num-1] for imf in [imfs]] + [np.sum(imfs[target_num-1:], axis=0)])
    else: return np.vstack([imfs, np.zeros((target_num-current_num, sig_len))])

def process_emd_with_mocoa(test_xdata, test_ylabel, target_imfs=7, num_coatis=10, num_generations=1, extend_num=50):
    global fitness_cache
    fitness_cache.clear()
    if isinstance(test_xdata, torch.Tensor):
        test_xdata = test_xdata.numpy().squeeze(axis=1) if test_xdata.ndim==3 else test_xdata.numpy()
    if isinstance(test_ylabel, torch.Tensor):
        test_ylabel = test_ylabel.numpy()
    
    emd = EMD(max_imf=7, max_sifting=200, tol=1e-1, spline_kind='linear')
    all_emd_results = []
    len_data = len(test_xdata)
    logging.info(f"开始处理样本，共 {len_data} 个")

    for idx, signal in enumerate(test_xdata):
        if idx % 20 == 0: logging.info(f"进度: {idx}/{len_data}")
        pareto_front, _ = MOCOA(num_coatis=num_coatis, num_generations=num_generations, signal=signal, extend_num=extend_num)
        best_params = select_final_params(pareto_front, signal, extend_num)

        extended_signal = svr_extension(signal, extend_num, *best_params)
        imfs = emd.emd(extended_signal)

        cropped_imfs = imfs[:, extend_num:extend_num+len(signal)] if imfs.ndim>=2 else np.zeros((7, len(signal)))
        unified_imfs = unify_imf_components(cropped_imfs, target_imfs)
        all_emd_results.append(unified_imfs)
    
    all_emd_results = np.array(all_emd_results)
    processed_x, processed_y = imf_make_unify(all_emd_results, test_ylabel, target_imfs)
    logging.info(f"处理完成，输出维度: {processed_x.shape}")
    return processed_x, processed_y

def process_kfold_emd_mocoa(kfold_num=5, target_imfs=7, num_coatis=8, num_generations=1, extend_num=50):
    for fold in range(1, kfold_num+1):
        logging.info("="*50)
        logging.info(f"处理第 {fold} 折数据集")
        trainX = load(f'trainX_fold{fold}_1024_10c')
        trainY = load(f'trainY_fold{fold}_1024_10c')
        valX = load(f'valX_fold{fold}_1024_10c')
        valY = load(f'valY_fold{fold}_1024_10c')
        
        train_x_emd, train_y_emd = process_emd_with_mocoa(trainX, trainY, target_imfs, num_coatis, num_generations, extend_num)
        val_x_emd, val_y_emd = process_emd_with_mocoa(valX, valY, target_imfs, num_coatis, num_generations, extend_num)
        
        dump(train_x_emd, f'emd_trainX_fold{fold}_MOCOA_1024_10c')
        dump(train_y_emd, f'emd_trainY_fold{fold}_MOCOA_1024_10c')
        dump(val_x_emd, f'emd_valX_fold{fold}_MOCOA_1024_10c')
        dump(val_y_emd, f'emd_valY_fold{fold}_MOCOA_1024_10c')
        logging.info(f"第 {fold} 折处理完成！")

    logging.info("="*50)
    logging.info("处理测试集")
    testX = load('testX_1024_10c')
    testY = load('testY_1024_10c')
    test_x_emd, test_y_emd = process_emd_with_mocoa(testX, testY, target_imfs, num_coatis, num_generations, extend_num)
    dump(test_x_emd, 'emd_testX_MOCOA_1024_10c')
    dump(test_y_emd, 'emd_testY_MOCOA_1024_10c')

if __name__ == "__main__":
    process_kfold_emd_mocoa(
        kfold_num=5,
        target_imfs=7,
        num_coatis=8,        
        num_generations=1,  
        extend_num=50       
    )