# 导入包和函数

In [None]:
import os
import sys
from multiprocessing import Pool, cpu_count

import numpy as np
import pandas as pd
import torch
import rpy2.robjects as robjects
from sklearn.metrics import f1_score

torch.manual_seed(756)
np.random.seed(756)

current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)

from mfdnn import *
from utils import *

# 基础设置

In [None]:

torch.manual_seed(756)
np.random.seed(756)


data_path = "EEG"

train_indices_list = pd.read_csv(os.path.join(data_path, "train_indices_list.csv"))
test_indices_list  = pd.read_csv(os.path.join(data_path, "test_indices_list.csv"))

rds_file = os.path.join(data_path, "EEG_y.rds")
r_array = robjects.r['readRDS'](rds_file)

EEG_x = np.array(r_array)  # 64*64*61
EEG_y = np.array([1]*39 + [-1]*22)  


n = 61           
p = 1              
frun = 50           

domain_range=[np.array([0, 1]), np.array([1, 64])]

model_params = {
    'num_basis': [5, 5],        
    'layer_sizes': [64, 64],    
    'epochs': 200,             
    'val_ratio': 0.1,           
    'patience': 10             
}


lam1_values = [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
lam2_values = [0, 0.001, 0.01, 0.1, 1, 2, 5]

# 回归神经网络

## 辅助函数

In [None]:
def select_best_hyperparameters(
    X_train, y_train, p, domain_range,
    lam1_values, lam2_values, model_params
):


    mse_results = np.zeros((len(lam1_values), len(lam2_values)))
    model_info = {}

    for i, lam1 in enumerate(lam1_values):
        for j, lam2 in enumerate(lam2_values):
            try:
                train_losses, val_losses, model, _ = MFDNN(
                    p=p, resp=y_train, func_cov=X_train,
                    num_basis=model_params['num_basis'],
                    layer_sizes=model_params['layer_sizes'],
                    domain_range=domain_range,
                    epochs=model_params['epochs'],
                    val_ratio=model_params['val_ratio'],
                    patience=model_params['patience'],
                    lam1=lam1, lam2=lam2,
                    std_resp=True
                )

                mse = min(val_losses) if len(val_losses) > 0 else np.mean(train_losses[-10:])
                mse_results[i, j] = mse

                model_info[f"{i}_{j}"] = model

            except Exception as e:
                mse_results[i, j] = np.inf
                model_info[f"{i}_{j}"] = None

    best_idx = np.unravel_index(np.argmin(mse_results), mse_results.shape)
    best_lam1 = lam1_values[best_idx[0]]
    best_lam2 = lam2_values[best_idx[1]]
    best_model = model_info[f"{best_idx[0]}_{best_idx[1]}"]

    return best_lam1, best_lam2, best_model

def evaluate_on_test_set(best_model, X_test, y_test, p, domain_range, model_params):

    try:
        test_predictions = MFDNN_predict(
            p, best_model, X_test, model_params['num_basis'], domain_range
        ).detach().numpy()  
        
        test_labels = np.where(test_predictions.flatten() > 0, 1, -1)

        f1 = f1_score(y_test, test_labels, pos_label=1)
        
        return f1, test_labels
    except Exception as e:
        return 0.0, None

## 回归50次循环

In [None]:
torch.manual_seed(756)
np.random.seed(756)

all_f1 = []
all_best_lam1 = []
all_best_lam2 = []

for run_idx in range(frun):
    print(f"Run {run_idx+1}/{frun}")

    train_indices = train_indices_list.iloc[:, run_idx].to_numpy() - 1
    test_indices  = test_indices_list.iloc[:, run_idx].to_numpy() - 1

    X_train = np.transpose(EEG_x[:, :, train_indices], (2, 0, 1))
    X_test  = np.transpose(EEG_x[:, :, test_indices], (2, 0, 1))
    y_train = EEG_y[train_indices]
    y_test  = EEG_y[test_indices]

    best_lam1, best_lam2, best_model = select_best_hyperparameters(
        X_train, y_train, p, domain_range, lam1_values, lam2_values, model_params
    )

    f1, _ = evaluate_on_test_set(
        best_model, X_test, y_test, p, domain_range, model_params
    )
    
    all_f1.append(f1)
    all_best_lam1.append(best_lam1)
    all_best_lam2.append(best_lam2)



Run 1/50
Run 2/50
Run 3/50
Run 4/50
Run 5/50
Run 6/50
Run 7/50
Run 8/50
Run 9/50
Run 10/50
Run 11/50
Run 12/50
Run 13/50
Run 14/50
Run 15/50
Run 16/50
Run 17/50
Run 18/50
Run 19/50
Run 20/50
Run 21/50
Run 22/50
Run 23/50
Run 24/50
Run 25/50
Run 26/50
Run 27/50
Run 28/50
Run 29/50
Run 30/50
Run 31/50
Run 32/50
Run 33/50
Run 34/50
Run 35/50
Run 36/50
Run 37/50
Run 38/50
Run 39/50
Run 40/50
Run 41/50
Run 42/50
Run 43/50
Run 44/50
Run 45/50
Run 46/50
Run 47/50
Run 48/50
Run 49/50
Run 50/50


## 回归输出结果

In [None]:
mean_f1 = np.mean(all_f1)
std_f1 = np.std(all_f1)

filename = f"EEG_MFDNN_F1_results.csv"

df = pd.DataFrame({
    "mean_f1": [mean_f1],
    "std_f1": [std_f1]
})

df.to_csv(filename, index=False)

平均 F1-score: 0.7485, 标准差: 0.1004
结果已保存到文件: EEG_MFDNN_F1_results.csv


# 分类神经网络

## 函数定义

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

torch.manual_seed(756)
np.random.seed(756)
class ClassificationNN(nn.Module):
    def __init__(self, input_size, hidden_layer_sizes):
        super(ClassificationNN, self).__init__()
        layers = []

        self.input = nn.Linear(input_size, hidden_layer_sizes[0])
        layers.append(self.input)
        layers.append(nn.ReLU())

        for i in range(len(hidden_layer_sizes) - 1):
            layers.append(nn.Linear(hidden_layer_sizes[i], hidden_layer_sizes[i+1]))
            layers.append(nn.ReLU())

        layers.append(nn.Linear(hidden_layer_sizes[-1], 1))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.model(x)

    @property
    def input_weight(self):
        return self.input.weight

def MFDNN_calssification(p, resp, func_cov, num_basis, layer_sizes, domain_range, epochs, val_ratio, patience, lam1=0, lam2=0, epsilon=0.001, std_resp=True):  
    """
    Multi-dimensional Functional Deep Neural Network (MFDNN) training function - supports only 2D functional data.

    Parameters:
        p (int): Number of functional variables.
        resp (numpy.ndarray): Response variable.
        func_cov (numpy.ndarray): Functional covariate data with shape:
            - If p=1: (N, T1, T2)
            - If p>1: (p, N, T1, T2)
        num_basis (tuple): Number of basis functions for each dimension (M1, M2).
        layer_sizes (list): Number of neurons in each hidden layer of the neural network.
        domain_range (list): List of p elements, each being [lower_bound, upper_bound].
        epochs (int): Number of training epochs.
        val_ratio (float, optional): Validation set ratio. Defaults to None.
        patience (int, optional): Early stopping patience. Defaults to None.
        lam (float, optional): Regularization parameter. Defaults to 0.
        epsilon (float, optional): Minimum change for early stopping. Defaults to 0.001.
        std_resp (bool, optional): Whether to standardize response variable. Defaults to True.

    Returns:
        tuple: (train_losses, validation_losses, model, l21_norm)
            - train_losses (list): Training loss values.
            - validation_losses (list): Validation loss values (if val_ratio is not None).
            - model (RegressionNN): Trained neural network model.
            - l21_norm (torch.Tensor): L21 normalization coefficients for variable selection.
    """
    # Process 2D functional data using optimized integral function
    A = integral(func_cov, num_basis, domain_range)  # Shape: (N, p, M1*M2) or (N, M1*M2)
    S = smooth_penalty(func_cov, num_basis, domain_range)
    S = torch.tensor(S, dtype=torch.float32)
    N = A.shape[0]

    # Calculate input feature dimension
    if len(A.shape) == 2:  # p=1 case
        M = A.shape[1]
        input_size = M
    else:  # p>1 case
        M = A.shape[2]
        input_size = p * M

    # Dataset splitting and preprocessing
    if val_ratio is not None:
        trainX, validationX, trainy, validationy = train_test_split(A, resp, test_size=val_ratio, random_state=42)
        trainX = torch.Tensor(trainX).float()
        trainy = torch.Tensor(trainy).view(-1, 1).float()
        validationX = torch.Tensor(validationX).float()
        validationy = torch.Tensor(validationy).view(-1, 1).float()

        if std_resp:
            trainy = (trainy - torch.mean(trainy)) / torch.std(trainy)
            validationy = (validationy - torch.mean(validationy)) / torch.std(validationy)
    else:
        trainX = torch.Tensor(A).float()
        trainy = torch.Tensor(resp).view(N, 1).float()
        if std_resp:
            trainy = (trainy - torch.mean(trainy)) / torch.std(trainy)
    
    # Create and train model
    model = ClassificationNN(input_size, layer_sizes)
    criterion = nn.BCEWithLogitsLoss()  # 分类损失
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  
    
    train_losses = []
    validation_losses = []
    
    if patience is not None:
        best_val_loss = float('inf')
        stopping_patience = patience
    
    # Training loop
    for _ in range(epochs):
        optimizer.zero_grad()
        outputs = model(trainX)
        loss = criterion(outputs, trainy)
        
        if lam1 == 0 or p == 1:
            weight = model.input_weight  # shape: (n1, p * M)
            norm = torch.sum((weight @ S) * weight, dim=1)
            regularization = lam2 * torch.mean(norm)
            total_loss = loss + regularization
            total_loss.backward()
        else:
            weight = model.input_weight  # shape: (n1, p * M)
            norm1 = torch.sum(weight ** 2, dim=0).view(p, M).sum(dim=1) # p * 1
            norm2 = torch.sum((weight @ S) * weight, dim=1)
            regularization1 = lam1 * torch.sum(torch.sqrt(norm1 + 1e-6))
            regularization2 = lam2 * torch.mean(norm2)
            total_loss = loss + regularization1 + regularization2
            total_loss.backward()
        
        optimizer.step()
        train_losses.append(loss.item())
        
        # Early stopping check
        if patience is not None:
            val_loss = criterion(model(validationX), validationy)
            validation_losses.append(val_loss.item())
            
            if val_loss < best_val_loss and best_val_loss - val_loss >= epsilon:
                best_val_loss = val_loss
                stopping_patience = patience
            else:
                stopping_patience -= 1
                if stopping_patience == 0:
                    break
    
    # Calculate L21 norm for variable selection
    if p == 1:
        l21_norm = torch.sqrt(torch.sum(model.input_weight ** 2, dim=0))
    else:
        l21_norm = torch.sqrt(torch.sum(model.input_weight ** 2, dim=0).view(p, M).sum(dim=1))
    
    
    return train_losses, validation_losses, model, l21_norm

def MFDNN_predict_classification(p, model, func_cov, num_basis, domain_range):
    """
    Predict using trained MFDNN model - supports only 2D functional data.

    Parameters:
        p (int): Number of functional variables.
        model (RegressionNN): Trained MFDNN model.
        func_cov (numpy.ndarray): Functional covariate data for test set with shape:
            - If p=1: (N, T1, T2)
            - If p>1: (p, N, T1, T2)
        num_basis (tuple): Number of basis functions for each dimension (M1, M2).
        domain_range (list): List of p elements, each being [lower_bound, upper_bound].

    Returns:
        torch.Tensor: Predicted values.
    """
    # Process 2D functional data using optimized integral function
    A = integral(func_cov, num_basis, domain_range)
    testX = torch.Tensor(A).float()
    logits = model(testX)
    
    return logits

## 分类50次循环

In [None]:
all_f1 = []
all_best_lam1 = []
all_best_lam2 = []

for run_idx in range(frun):
    print(f"Run {run_idx+1}/{frun}")

    train_indices = train_indices_list.iloc[:, run_idx].to_numpy() - 1
    test_indices  = test_indices_list.iloc[:, run_idx].to_numpy() - 1

    X_train = np.transpose(EEG_x[:, :, train_indices], (2, 0, 1))
    X_test  = np.transpose(EEG_x[:, :, test_indices], (2, 0, 1))
    y_train = EEG_y[train_indices]
    y_test  = EEG_y[test_indices]

    best_lam1, best_lam2, best_model = select_best_hyperparameters(
        X_train, y_train, p, domain_range, lam1_values, lam2_values, model_params
    )

    logits = MFDNN_predict_classification(p, best_model, X_test, model_params['num_basis'], domain_range)
    y_pred = torch.where(logits.flatten() > 0, 1, -1).numpy()  

    f1 = f1_score(y_test, y_pred, pos_label=1)

    all_f1.append(f1)
    all_best_lam1.append(best_lam1)
    all_best_lam2.append(best_lam2)


Run 1/50
Run 2/50
Run 3/50
Run 4/50
Run 5/50
Run 6/50
Run 7/50
Run 8/50
Run 9/50
Run 10/50
Run 11/50
Run 12/50
Run 13/50
Run 14/50
Run 15/50
Run 16/50
Run 17/50
Run 18/50
Run 19/50
Run 20/50
Run 21/50
Run 22/50
Run 23/50
Run 24/50
Run 25/50
Run 26/50
Run 27/50
Run 28/50
Run 29/50
Run 30/50
Run 31/50
Run 32/50
Run 33/50
Run 34/50
Run 35/50
Run 36/50
Run 37/50
Run 38/50
Run 39/50
Run 40/50
Run 41/50
Run 42/50
Run 43/50
Run 44/50
Run 45/50
Run 46/50
Run 47/50
Run 48/50
Run 49/50
Run 50/50
平均 F1-score: 0.7485, 标准差: 0.1004


## 分类输出结果

In [None]:

mean_f1 = np.mean(all_f1)
std_f1 = np.std(all_f1)

filename = f"EEG_MFDNN_calssification_F1_results.csv"

df = pd.DataFrame({
    "mean_f1": [mean_f1],
    "std_f1": [std_f1]
})

df.to_csv(filename, index=False)

平均 F1-score: 0.7555, 标准差: 0.0879
结果已保存到文件: EEG_MFDNN_calssification_F1_results.csv
