In [2]:
import random
import torch
from sklearn.model_selection import KFold
import math
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np
import torch.nn as nn
import torch.nn.functional as F

In [3]:
import torch
import torch.nn as nn
from torchsummary import summary

# 定义 SEBlock 类
class SEBlock(nn.Module):
    def __init__(self, pool_type, in_channels, reduction):
        super(SEBlock, self).__init__()
        self.global_pool = nn.AdaptiveAvgPool2d(1) if pool_type == "avg" else nn.AdaptiveMaxPool2d(1)
        self.fc1 = nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1)
        self.fc2 = nn.Conv2d(in_channels // reduction, in_channels, kernel_size=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        scale = self.global_pool(x)
        scale = self.fc1(scale)
        scale = nn.ReLU()(scale)
        scale = self.fc2(scale)
        scale = self.sigmoid(scale)
        return x * scale

# 定义 M_FANet 类
class M_FANet(nn.Module):
    def __init__(self, Ks, Ft, Kt, D, Nc):
        super(M_FANet, self).__init__()

        self.pointwiseconv2d = nn.Conv2d(in_channels=9, out_channels=1, kernel_size=1, stride=1, padding=0, bias=False)
        
        # 确保 padding 为整数
        padding = Ks // 2  # 使用整数进行padding计算
        self.conv2d1 = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(Ks, 1),  padding='same')
        
        self.TSconv = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=Ft, kernel_size=(1, Kt), padding=(0, Kt // 2)),  # padding 需要是一个元组
            nn.BatchNorm2d(Ft),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Conv2d(in_channels=Ft, out_channels=D * Ft, kernel_size=(30, 1), groups=Ft),
            nn.AvgPool2d(kernel_size=(1, 4))
        )
        
        self.SEBlock = SEBlock("avg", D * Ft, 16)
        
        self.TConv = nn.Sequential(
            nn.Conv2d(in_channels=D * Ft, out_channels=D * Ft, kernel_size=(1, 25), groups=D * Ft, padding=(0, 12)),
            nn.AvgPool2d(kernel_size=(1, 8))
        )
        
        self.classifier = nn.Conv2d(D * Ft, Nc, (1, 6))

    def forward(self, x):
        x = self.pointwiseconv2d(x)
        x = self.conv2d1(x)
        x = self.TSconv(x)
        x = self.SEBlock(x)
        x = self.TConv(x)
        x = self.classifier(x)
        return x

In [4]:
#导入数据
#rest1
datapath1=r'D:\JQ_YJS\分频段\filtered_rest1.npy' 
data1=np.load(datapath1)
print(data1.shape)


#rest2
datapath2=r'D:\JQ_YJS\分频段\filtered_rest2.npy' 
data2=np.load(datapath2)
print(data2.shape)

data_all=np.concatenate((data1,data2),axis=0)
#标签制作
label_all = torch.cat([torch.zeros(6344), torch.ones(6589)]).long()  # 标签：前10505个为0，后10505个为1
print(label_all.shape)
print(data_all.shape)

(6344, 9, 30, 200)
(6589, 9, 30, 200)
torch.Size([12933])
(12933, 9, 30, 200)


In [5]:
# 创建十折交叉验证
kfold = KFold(n_splits=10, shuffle=True, random_state=42)

# 保存结果的列表
historys = []
test_pred = []
test_real = []
accuracy, precision, recall, f1score = [], [], [], []
batchsz = 16
num_epochs = 100

# 进行十折交叉验证
for fold, (train_ind, test_ind) in enumerate(kfold.split(data_all, label_all)):
    print('fold号:', fold + 1)

    # 每一折验证前都要打乱训练集样本顺序
    n = len(train_ind)
    A = np.linspace(0, n - 1, n, dtype=int)
    random.shuffle(A)

    # 构建训练集、验证集、测试集
    epoch_train = data_all[train_ind[A[:int(0.8 * n)]]]
    epoch_val = data_all[train_ind[A[int(0.8 * n):]]]
    epoch_test = data_all[test_ind]
    label_train = label_all[train_ind[A[:int(0.8 * n)]]]
    label_val = label_all[train_ind[A[int(0.8 * n):]]]
    label_test = label_all[test_ind]

    # 转换为Tensor并创建DataLoader
    train_dataset = TensorDataset(torch.tensor(epoch_train, dtype=torch.float32), torch.tensor(label_train, dtype=torch.long))
    val_dataset = TensorDataset(torch.tensor(epoch_val, dtype=torch.float32), torch.tensor(label_val, dtype=torch.long))
    
    train_loader = DataLoader(train_dataset, batch_size=batchsz, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batchsz, shuffle=False)

    # 选择、创建模型
    model = M_FANet(Ks=4, Ft=16, Kt=3, D=2, Nc=1)
    #print(model)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    # 配置模型训练
    criterion = nn.BCEWithLogitsLoss()  # 使用交叉熵损失
    
    #optimizer = optim.SGD(model.parameters(), lr=1e-4)
    optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

    # 开始训练模型
    print('开始训练!!')
    history = {'train_loss': [], 'val_loss': []}

    for epoch in range(num_epochs):  # 训练100个epoch
        model.train()
        running_loss = 0.0

        # 训练阶段
        for inputs, targets in train_loader:
            #print(f"训练批次输入数据形状: {inputs.shape}")
            inputs = inputs.to(device)
            
            targets = targets.to(device)
            optimizer.zero_grad()
            
            outputs = model(inputs)
            targets = targets.view(-1).float() 
            #targets=targets.view(-1,1).float()
            loss = criterion(outputs.view(-1), targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        avg_train_loss = running_loss / len(train_loader)
        history['train_loss'].append(avg_train_loss)

        #print('Epoch:{}/{}'.format(epoch+1,num_epochs))
        
        # 计算训练集的指标
        with torch.no_grad():
            train_pred = model(torch.tensor(epoch_train, dtype=torch.float32).to(device))
            train_pred = (torch.sigmoid(train_pred) > 0.5).cpu().numpy().astype(int)
            print("label_train shape:", label_train.shape)
            print("train_pred shape:", train_pred.shape)
            # 将 train_pred 进行维度压缩
            train_pred = train_pred.squeeze()


            acc_train = accuracy_score(label_train, train_pred)
            pre_train = precision_score(label_train, train_pred, average='macro')
            rec_train = recall_score(label_train, train_pred, average='macro')
            f1_train = f1_score(label_train, train_pred, average='macro')

        # 输出训练集指标
       
        # 验证阶段
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, targets in val_loader:
                

                inputs = inputs.to(device)
            
                targets = targets.to(device)
                outputs = model(inputs)
                #targets=targets.view(-1,1).float()
                targets = targets.view(-1).float()
                loss = criterion(outputs.view(-1), targets)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        
        history['val_loss'].append(avg_val_loss)
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, VAL Loss: {avg_val_loss:.4f}')
        #print(f'Epoch [{epoch+1}/{num_epochs}], VAL Loss: {avg_val_loss:.4f}')
        print(f"$$训练集准确率 accuracy: {acc_train}",f"$$训练集精确率 precision: {pre_train}",f"$$训练集召回率 recall: {rec_train}",f"$$训练集 F1 评分 f1_score: {f1_train}")
        


        # 保存训练记录
        historys.append(history)
        

        # 计算、保存测试结果
        model.eval()
        with torch.no_grad():
            pred_test = model(torch.tensor(epoch_test, dtype=torch.float32).to(device))
            
            pred_test = (torch.sigmoid(pred_test) > 0.5).cpu().numpy().astype(int)


        # 保存预测结果和真实结果
        test_pred.append(pred_test)
        test_real.append(label_test)
        # 将 train_pred 进行维度压缩
        pred_test = pred_test.squeeze()

        # 计算准确率，精确率，召回率，F1评分
        acc = accuracy_score(label_test, pred_test)
        pre = precision_score(label_test, pred_test, average='macro')
        rec = recall_score(label_test, pred_test, average='macro')
        f1 = f1_score(label_test, pred_test, average='macro')
        
        accuracy.append(acc)
        precision.append(pre)
        recall.append(rec)
        f1score.append(f1)
        
        print(f"@@测试集准确率 accuracy: {acc}",f"@@测试集精确率 precision: {pre}",f"@@测试集召回率 recall: {rec}",f"@@测试集 F1 评分 f1_score: {f1}")
        

# 将每一折 history 中误差结果保存（训练集和测试集，用于反映训练过程）    
loss_train = []
loss_val = []
for history_s in historys:
    loss_val.append(history_s['val_loss'])
    loss_train.append(history_s['train_loss'])

fold号: 1


  train_dataset = TensorDataset(torch.tensor(epoch_train, dtype=torch.float32), torch.tensor(label_train, dtype=torch.long))
  val_dataset = TensorDataset(torch.tensor(epoch_val, dtype=torch.float32), torch.tensor(label_val, dtype=torch.long))


开始训练!!


  return F.conv2d(input, weight, bias, self.stride,


label_train shape: torch.Size([9311])
train_pred shape: (9311, 1, 1, 1)
Epoch [1/100], Train Loss: 0.6595, VAL Loss: 0.6182
$$训练集准确率 accuracy: 0.7763935130490818 $$训练集精确率 precision: 0.7990873378712452 $$训练集召回率 recall: 0.7731594576725744 $$训练集 F1 评分 f1_score: 0.7705894674357904
@@测试集准确率 accuracy: 0.7217928902627512 @@测试集精确率 precision: 0.7708636584182436 @@测试集召回率 recall: 0.7240682339449541 @@测试集 F1 评分 f1_score: 0.7099001614156752
label_train shape: torch.Size([9311])
train_pred shape: (9311, 1, 1, 1)
Epoch [2/100], Train Loss: 0.5478, VAL Loss: 0.5394
$$训练集准确率 accuracy: 0.7753195145526797 $$训练集精确率 precision: 0.7931689250610968 $$训练集召回率 recall: 0.7780730316367512 $$训练集 F1 评分 f1_score: 0.7729307551723485
@@测试集准确率 accuracy: 0.7774343122102009 @@测试集精确率 precision: 0.7784048355193671 @@测试集召回率 recall: 0.777090500764526 @@测试集 F1 评分 f1_score: 0.7770743157396042
label_train shape: torch.Size([9311])
train_pred shape: (9311, 1, 1, 1)
Epoch [3/100], Train Loss: 0.4866, VAL Loss: 0.5101
$$训练集准确率 accu

KeyboardInterrupt: 

In [None]:
# 假设 accuracy 列表包含了 1000 个准确率值
# 每一折包含 100 个 epoch
num_folds = 10
epochs_per_fold = 100

# 打印每一折的最大准确率和对应的epoch
for fold in range(num_folds):
    # 获取当前折的准确率列表
    start_idx = fold * epochs_per_fold
    end_idx = (fold + 1) * epochs_per_fold
    fold_accuracy = accuracy[start_idx:end_idx]

    # 找到最大准确率和对应的epoch
    max_acc = max(fold_accuracy)
    max_epoch = fold_accuracy.index(max_acc) + 1  # +1 因为索引从0开始

    # 打印结果
    print(f'第{fold + 1}折准确率最高为{max_acc:.4f}，对应的epoch为{max_epoch}')