In [11]:
import h5py
import torch
from torch import nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torchsummary import summary

In [12]:
# 1. 加载数据
with h5py.File('C:/Users/liusi/Desktop/research/transformer/dataset.h5', 'r') as f:
    input_data = f['input_data'][:]  # 读取输入数据

    # 检查labels是否为标量数据集
    if f['labels'].shape == ():  # 如果是标量
        labels = f['labels'][()]  # 读取单个标量值
    else:
        labels = f['labels'][:]  # 读取数组

print("input_data shape:", input_data.shape)
print("labels shape:", labels.shape)



input_data shape: (20000, 1, 101, 2)
labels shape: (20000,)


In [13]:
# 2. 创建自定义数据集
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = torch.FloatTensor(labels)  # 转换为浮点型

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# 3. 划分训练和验证集
X_train, X_val, y_train, y_val = train_test_split(input_data, labels, test_size=0.2, random_state=42)

# 创建Dataset对象
train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)

# 4. 创建DataLoader
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=64, shuffle=False)

In [14]:
class CNN(nn.Module):
    def __init__(self, num_classes, in_channels):
        super(CNN, self).__init__()
        self.cnn1 = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=40, kernel_size=(2, 2), stride=1, padding=0), # 输出尺寸：(N, 40, H-1, W-1)
            nn.BatchNorm2d(40),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 1), stride=(1, 1)) # 这个池化层可能不会改变尺寸，取决于实际需求
        )
        self.cnn2 = nn.Sequential(
            nn.Conv2d(in_channels=40, out_channels=1, kernel_size=(1, 1), stride=1, padding=0), # 输出尺寸：(N, 1, H-1, W-1)
            nn.BatchNorm2d(1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 1), stride=(1, 1))  # 这个池化层可能不会改变尺寸，取决于实际需求
        )
        # 在此全连接层之前，需要知道上一层的输出尺寸
        # 假设上一层的输出尺寸是 (1, H', W')
        # 全连接层的输入尺寸将是 1 * H' * W'
        self.fc1 = nn.Linear(1 * 100 * 1, 2)  # 全连接层有2个神经元
        self.fc2 = nn.Linear(2, 1)  # 对于回归问题，输出层有1个神经元

    def forward(self, x):
        out = self.cnn1(x)
        out = self.cnn2(out)
        out = out.view(out.size(0), -1)  # 展平
        out = F.relu(self.fc1(out))  # 应用ReLU激活函数
        out = self.fc2(out)
        return out


In [15]:
# 模型实例化
num_classes = 5  # 类别数
in_channel = 1   # 输入通道数

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNN(num_classes=num_classes, in_channels=in_channel).to(device)
summary(model, input_size=(in_channel, 101, 2))


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 40, 100, 1]             200
       BatchNorm2d-2           [-1, 40, 100, 1]              80
              ReLU-3           [-1, 40, 100, 1]               0
         MaxPool2d-4           [-1, 40, 100, 1]               0
            Conv2d-5            [-1, 1, 100, 1]              41
       BatchNorm2d-6            [-1, 1, 100, 1]               2
              ReLU-7            [-1, 1, 100, 1]               0
         MaxPool2d-8            [-1, 1, 100, 1]               0
            Linear-9                    [-1, 2]             202
           Linear-10                    [-1, 1]               3
Total params: 528
Trainable params: 528
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.13
Params size (MB): 0.00
Estimated Total Siz

##### 训练模型

In [16]:
learning_rate = 1e-4
# 使用均方误差损失函数（MSE）对于回归任务
criterior = nn.MSELoss()
# 或者使用平均绝对误差损失函数（MAE）
# criterior = nn.L1Loss()
# criterior = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [17]:
num_epochs = 100
for epoch in range(num_epochs):
    for batch_idx, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # forward
        outputs = model(images)
        loss = criterior(outputs, labels)

        # backward
        optimizer.zero_grad() # 梯度清零
        loss.backward() # 反向传播

        # update
        optimizer.step() # 更新参数

        if (batch_idx+1) % 250 == 0:
            print(f"{epoch+1}/{num_epochs}, step {batch_idx+1}/{len(train_loader)}, loss = {loss.item():.4f}")

1/100, step 250/250, loss = 0.6233
2/100, step 250/250, loss = 0.6441
3/100, step 250/250, loss = 0.4406
4/100, step 250/250, loss = 0.3793
5/100, step 250/250, loss = 0.5811
6/100, step 250/250, loss = 0.3985
7/100, step 250/250, loss = 0.4840
8/100, step 250/250, loss = 0.4693
9/100, step 250/250, loss = 0.5658
10/100, step 250/250, loss = 0.5249
11/100, step 250/250, loss = 0.5683
12/100, step 250/250, loss = 0.4525
13/100, step 250/250, loss = 0.4835
14/100, step 250/250, loss = 0.5239
15/100, step 250/250, loss = 0.5277
16/100, step 250/250, loss = 0.4825
17/100, step 250/250, loss = 0.3460
18/100, step 250/250, loss = 0.3615
19/100, step 250/250, loss = 0.5871
20/100, step 250/250, loss = 0.5903
21/100, step 250/250, loss = 0.6963
22/100, step 250/250, loss = 0.6650
23/100, step 250/250, loss = 0.4868
24/100, step 250/250, loss = 0.5556
25/100, step 250/250, loss = 0.6523
26/100, step 250/250, loss = 0.4807
27/100, step 250/250, loss = 0.6556
28/100, step 250/250, loss = 0.4932
2

##### Model evaluation

In [18]:
# 1. 加载测试数据
with h5py.File('C:/Users/liusi/Desktop/research/transformer/dataset_test.h5', 'r') as f:
    test_input_data = f['input_data'][:]  # 假设测试集的特征数据键是 'input_data'
    test_labels = f['labels'][:]  # 假设测试集的标签数据键是 'labels'

# 2. 创建测试集 Dataset
test_dataset = CustomDataset(test_input_data, test_labels)

# 3. 创建测试集 DataLoader
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)  # 通常测试集不需要打乱

In [22]:
# 测试
# 计算回归模型性能的指标
total_loss = 0.0
total_mae = 0.0
n_samples = 0

model.eval()  # 将模型设置为评估模式
with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        outputs = outputs.squeeze()  # 移除单一维度

        # 计算损失
        loss = criterior(outputs, labels)
        total_loss += loss.item() * images.size(0)

        # 计算平均绝对误差（MAE）
        mae = torch.abs(outputs - labels).mean()
        total_mae += mae.item() * images.size(0)

        n_samples += images.size(0)

# 计算平均损失
mean_loss = total_loss / n_samples

# 计算平均绝对误差
mean_mae = total_mae / n_samples

print(f'Mean Loss (MSE): {mean_loss}')
print(f'Mean Absolute Error (MAE): {mean_mae}')



Mean Loss (MSE): 0.5082457321166992
Mean Absolute Error (MAE): 0.57504308385849


In [19]:
# 分类问题的测试
total = 0
correct = 0
for images, labels in test_loader:
    images = images.to(device)
    labels = labels.to(device)

    outputs = model(images)
    predicted = torch.argmax(outputs, dim=1)

    total += labels.shape[0]
    correct += (predicted == labels).sum().item()
    
print(f'{correct}/{total}, accuracy = {correct/total}')

7041/20000, accuracy = 0.35205
