In [16]:
import torch
import torch.nn as nn
class BasicUnit(nn.Module):
    """
        提取长短期特征
        输入通道数：32
        输出通道数：32
        参数：
        input_channels, output_channels, data
        返回：
        特征图 Tensor (N*C*L)
    """

    def __init__(self, input_channels, output_channels):
        super(BasicUnit, self).__init__()
        # 使用3x1的卷积核
        self.conv1 = nn.Conv1d(input_channels, output_channels, kernel_size=3,padding=1)  # 第一组卷积
        self.conv2 = nn.Conv1d(input_channels, output_channels, kernel_size=3,padding=1)  # 第二组卷积
        self.conv3 = nn.Conv1d(input_channels, output_channels, kernel_size=3,padding=1)  # 第三组卷积
        self.conv4 = nn.Conv1d(input_channels, output_channels, kernel_size=3,padding=1)  # 第四组卷积

    def forward(self, x):
        A = torch.tanh(self.conv1(x))  # A = tanh(x * w1 + b1)
        B = torch.sigmoid(self.conv2(x))  # B = σ(x * w2 + b2)
        C = torch.tanh(self.conv3(x))  # C = tanh(x * w3 + b3)
        D = torch.sigmoid(self.conv4(x))  # D = σ(x * w4 + b4)

        E = A * B + C * D  # E = A ⊗ B + C ⊗ D
        return E


class BrainAnalysisModule(nn.Module):
    def __init__(self, input_channels, output_channels,time_step):
        super(BrainAnalysisModule, self).__init__()
        self.fc_input_channels=output_channels*time_step
        self.fc_output_channels=1
        self.basic_unit1 = BasicUnit(input_channels, output_channels)  # 第一个基本单元
        self.basic_unit2 = BasicUnit(output_channels, output_channels)  # 第二个基本单元
        self.fc = nn.Linear(self.fc_input_channels, self.fc_output_channels)  # 添加全连接层
        # self.fc = nn.Conv1d(output_channels, 8, kernel_size=1)  # 1x1卷积层替代全连接层

    def forward(self, x):
        output1 = self.basic_unit1(x)  # 经过第一个基本单元
        print("output1", output1.shape)
        output2 = self.basic_unit2(output1)  # 以output1作为第二个单元的输入
        print("output2", output2.shape)
        output2_flat = output2.view(output2.size(0), -1)  # 将输出展平成2D形状，适用于全连接层
        print("output2_flat", output2_flat.shape)
        final_output = self.fc(output2_flat)  # 经过全连接层
        # final_output = self.fc(output2)  # 经过全连接层
        return final_output

# 示例使用
if __name__ == "__main__":
    # 假设输入数据形状为 (批大小, 通道数, 序列长度)
    input_data = torch.randn(40, 32, 17)  # 批大小为 40，通道数为 32，序列长度为 3000
    brain_module = BrainAnalysisModule(input_channels=32, output_channels=32,time_step=17)  # 创建模块实例
    output = brain_module(input_data)  # 前向传播
    print(output.shape)  # 打印输出形状
    print(output[0])


output1 torch.Size([40, 32, 17])
output2 torch.Size([40, 32, 17])
output2_flat torch.Size([40, 544])
torch.Size([40, 1])
tensor([-0.0422], grad_fn=<SelectBackward0>)


In [2]:
class AnticipationModule:
    def __init__(self):
        pass
    
    def forward(self, predicted_modes):
        """
        计算重建预测值
        输入通道数：8
        输出通道数：1
        参数：
        predicted_modes: List[torch.Tensor] - 各个分解模式的预测值
        返回：
        torch.Tensor - 重建预测值
        """
        # 将所有模式的预测值求和
        reconstructed_prediction = sum(predicted_modes)
        return reconstructed_prediction

# 示例：使用 AnticipationModule
# if __name__ == "__main__":
#     # 假设有8个模式的预测值
#     predicted_modes = [torch.rand(1) for _ in range(8)]  # 生成随机预测值作为示例
# 
#     anticipation_module = AnticipationModule()
#     reconstructed_prediction = anticipation_module.forward(predicted_modes)
# 
#     print("重建预测值：", reconstructed_prediction.item())
#     print("预测值的维度：", reconstructed_prediction.size())  # 或者使用 reconstructed_prediction.shape

重建预测值： 3.883603096008301
预测值的维度： torch.Size([1])


In [None]:
from torch.utils.data import DataLoader
from torch import optim
class ModelTrainer:
    def __init__(self, train_data, val_data, test_data, input_channels, output_channels, 
                 batch_size=40, num_epochs=200, initial_lr=0.01, min_lr=0.00001):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.train_data = train_data
        self.val_data = val_data
        self.test_data = test_data
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.batch_size = batch_size
        self.num_epochs = num_epochs
        self.initial_lr = initial_lr
        self.min_lr = min_lr

        # 初始化模型
        self.brain_analysis_module = BrainAnalysisModule(input_channels, output_channels).to(self.device)
        self.anticipation_module = AnticipationModule().to(self.device)

        # 定义损失函数和优化器
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(list(self.brain_analysis_module.parameters()) + 
                                     list(self.anticipation_module.parameters()), 
                                     lr=self.initial_lr, weight_decay=0.01)
    def save_model(self, path):
        # torch.save({'net': self.network.state_dict(),
        #             'optimizer': self.opt.optimizer.state_dict()}, path)
    def train(self):
        print('loading train dataloader')
        train_loader = DataLoader(self.train_data, batch_size=self.batch_size, shuffle=True)
        print('loading eval dataloader')
        val_loader = DataLoader(self.val_data, batch_size=self.batch_size, shuffle=False)
        learning_rate = self.initial_lr
        decay_steps = self.num_epochs // 10

        for epoch in range(self.num_epochs):
            print('\nepoch: {0}'.format(epoch + 1))
            # 训练模式
            self.brain_analysis_module.train()
            self.anticipation_module.train()
            for inputs, targets in train_loader:
                inputs = inputs.to(self.device)  # 获取输入数据并转移到 GPU
                targets = targets.to(self.device)  # 获取目标数据并转移到 GPU

                # 前向传播
                y_hat = self.brain_analysis_module(inputs)
                final_output = self.anticipation_module(y_hat)

                # 计算损失
                loss = self.criterion(final_output, targets)  # 使用 targets 计算损失

                # 反向传播和优化
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

            # 学习率衰减
            if epoch % decay_steps == 0 and learning_rate > self.min_lr:
                learning_rate *= 0.5
                for param_group in self.optimizer.param_groups:
                    param_group['lr'] = learning_rate

            # 验证模式
            self.validate(val_loader)

            # 打印损失
            if (epoch + 1) % 10 == 0:
                print(f'Epoch [{epoch + 1}/{self.num_epochs}], Loss: {loss.item():.4f}')

    def validate(self, val_loader):
        self.brain_analysis_module.eval()
        self.anticipation_module.eval()
        val_loss = 0

        with torch.no_grad():
            for inputs in val_loader:
                inputs = inputs[0].to(self.device)
                y_hat = self.brain_analysis_module(inputs)
                final_output = self.anticipation_module(y_hat)
                loss = self.criterion(final_output, inputs)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        print(f'Validation Loss: {avg_val_loss:.4f}')

# 假设已经准备好的数据集
train_data = TensorDataset(train_tensor)  # 训练集的 TensorDataset
val_data = TensorDataset(val_tensor)      # 验证集的 TensorDataset
test_data = TensorDataset(test_tensor)    # 测试集的 TensorDataset

# 创建模型训练器实例
trainer = ModelTrainer(train_data=train_data, 
                       val_data=val_data, 
                       test_data=test_data, 
                       input_channels=input_channels, 
                       output_channels=output_channels, 
                       batch_size=40, 
                       num_epochs=200, 
                       initial_lr=0.01, 
                       min_lr=0.00001)

# 训练模型
trainer.train()
