天池心跳信号预测
https://tianchi.aliyun.com/competition/entrance/531883/information

In [None]:
import torch
from torch.utils import data as Data
import pandas as pd
import os
import csv
import numpy as np
import torch.nn as nn
import time
from torch.optim.lr_scheduler import CosineAnnealingLR

设置参数

In [None]:
batch_size = 256
train_split = 0.9  # 训练集比例
valid_split = 0.1  # 测试机比例
epoch = 130

读取数据集

In [None]:
# 定义数据适配器
class DataAdapter(Data.Dataset):

    def __init__(self, X, Y):
        super(DataAdapter, self).__init__()
        self.X = torch.FloatTensor(X)  # 特征值为float类型
        self.Y = torch.LongTensor(Y)  # 标签值为int类型

    def __getitem__(self, index):
        return self.X[index, :], self.Y[index]

    def __len__(self):
        return len(self.X)


def read_data(batch_size, train_split, valid_split):
    signal = []
    label = []

    train_data = r'/content/drive/MyDrive/Colab/DeepLearning/心跳信号预测/train.csv'  # 训练文件路径
    # 用pandas读取数据，在对特征进行拆分的时候，会由于内存不足报错
    with open(train_data, 'r') as f:
        reader = csv.DictReader(f)
        for line in reader:
            signal.append([float(num) for num in line['heartbeat_signals'].split(',')])  # 拆分后的数据
            label.append(int(float(line['label'])))  # 标签

    dataset = DataAdapter(signal, label)  # 构造数据集
    train_size = int(len(signal) * train_split)
    valid_size = len(signal) - train_size
    train_dataset, valid_dataset = Data.random_split(dataset, [train_size, valid_size])  # 随机划分训练集和验证集

    train_loader = Data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)  # 加载DataLoader
    valid_loader = Data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

    return train_loader, valid_loader


# 定义该函数用于重新打乱训练集和验证集
def shuffle_data(train_loader, valid_loader, valid_split, batch_size):
    train_dataset = train_loader.dataset.dataset  # 获取训练集的数据集
    valid_dataset = valid_loader.dataset.dataset
    X = torch.cat((train_dataset.X, valid_dataset.X), 0)  # 拼接数据集
    Y = torch.cat((train_dataset.Y, valid_dataset.Y), 0)
    dataset = DataAdapter(X, Y)  # 重新生成数据集
    train_dataset, valid_dataset = Data.random_split(dataset, [len(dataset) - int(len(dataset) * valid_split), int(len(dataset) * valid_split)])  # 重新划分训练集和验证集
    train_loader = Data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    valid_loader = Data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    return train_loader, valid_loader

训练一个epoch

In [None]:
# 定义训练函数
def train_model(train_loader, model, criterion, optimizer, device):
    model.train()
    train_loss = []
    train_acc = []

    for i, data in enumerate(train_loader, 0):

        inputs, labels = data[0].cuda(), data[1].cuda()  # 获取数据

        outputs = model(inputs)  # 预测结果
        
        _, pred = outputs.max(1)  # 求概率最大值对应的标签

        num_correct = (pred == labels).sum().item()
        acc = num_correct / len(labels)  # 计算准确率

        loss = criterion(outputs, labels)  # 计算loss
        optimizer.zero_grad()  # 梯度清0
        loss.backward()  # 反向传播
        optimizer.step()  # 更新系数

        train_loss.append(loss.item())
        train_acc.append(acc)

    return np.mean(train_loss), np.mean(train_acc)

测试函数

In [None]:
# 定义测试函数，具体结构与训练函数相似
def test_model(test_loader, criterion, model, device):
    model.eval()
    test_loss = []
    test_acc = []

    for i, data in enumerate(test_loader, 0):
 
        inputs, labels = data[0].cuda(), data[1].cuda()

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        _, pred = outputs.max(1)

        num_correct = (pred == labels).sum().item()
        acc = num_correct / len(labels)
        # 测试不需要反向传播
        test_loss.append(loss.item())
        test_acc.append(acc)

    return np.mean(test_loss), np.mean(test_acc)

定义模型

In [None]:
# 定义模型结构
class CNN(nn.Module):

    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(16, 32, 3, 1, 1)
        self.conv3 = nn.Conv1d(32, 64, 3, 1, 1)
        self.conv4 = nn.Conv1d(64, 64, 5, 1, 2)
        self.conv5 = nn.Conv1d(64, 128, 5, 1, 2)
        self.conv6 = nn.Conv1d(128, 128, 5, 1, 2)
        self.maxpool = nn.MaxPool1d(3, stride=2)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.softmax = nn.Softmax(dim=1)
        self.dropout = nn.Dropout(0.5)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(6400, 256)
        self.fc21 = nn.Linear(6400, 16)
        self.fc22 = nn.Linear(16, 256)
        self.fc3 = nn.Linear(256, 4)

    def forward(self, x):
        x = x.view(x.size(0), 1, x.size(1))
        x = self.conv1(x)  # nn.Conv1d(in_channels = 1,out_channels = 32,kernel_size = 11,stride = 1,padding = 5)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.conv4(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv5(x)
        x = self.relu(x)
        x = self.conv6(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.dropout(x)
        x = self.flatten(x)
        x1 = self.fc1(x)
        x1 = self.relu(x1)
        x21 = self.fc21(x)
        x22 = self.relu(x21)
        x22 = self.fc22(x22)
        x2 = self.sigmoid(x22)
        x = self.fc3(x1 + x2)
        return x

预测函数

In [None]:
def predict_ali_testset(batch_size, model, device):
    '''
    该函数用于生成预测文件
    '''
    ipath = r'/content/drive/MyDrive/Colab/DeepLearning/心跳信号预测'  # 输入数据文件路径
    opath = r'/content/drive/MyDrive/Colab/DeepLearning/心跳信号预测'  # 输出提交文件路径
    signal = []

    with open(os.path.join(ipath, 'testA.csv'), 'r') as f:
        reader = csv.DictReader(f)
        for line in reader:
            signal.append([float(num) for num in line['heartbeat_signals'].split(',')])  # 拆分特征

    test_set = DataAdapter(signal, [0 for i in range(len(signal))])
    test_loader = Data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0)

    res = []

    with torch.no_grad():
        for i, data in enumerate(test_loader):

            inputs,_ = data[0].cuda(),data[1].cuda()  # 读取数据
            # 预测结果
            outputs = model(inputs)
            _, pred = outputs.max(1)
            # 将预测结果转成numpy
            pred_npy = pred.cpu().numpy()
            # 转成四元组形式
            for ii in range(len(pred_npy)):
                a = [0, 0, 0, 0]
                a[pred_npy[ii]] = 1
                res.append(a)

    res = pd.DataFrame(res)

    result = pd.read_csv(os.path.join(ipath, 'sample_submit.csv'))  # 构造输出文件
    result['label_0'] = res[0]
    result['label_1'] = res[1]
    result['label_2'] = res[2]
    result['label_3'] = res[3]
    result['id'] = [i for i in range(100000, 120000)]
    result.to_csv('/content/drive/MyDrive/Colab/DeepLearning/心跳信号预测/sample_submit.csv', index=False)

    print('预测文件写入完成')

整合代码

In [None]:
if __name__ == '__main__':

    device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
    print(device)
    model = CNN().cuda()  # 初始化模型，
    criterion = nn.CrossEntropyLoss()

    print('******开始读取数据******')

    start = time.time()
    train_loader, valid_loader = read_data(batch_size, train_split, valid_split)
    end = time.time()
    run = end - start
    print('[1]load： %.5f sec' % run)

    print('******开始打乱数据******')

    train_loader, valid_loader = shuffle_data(train_loader, valid_loader, valid_split, batch_size)  # 打乱训练集及验证集

    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)  # 使用Adam优化算法
    clr = CosineAnnealingLR(optimizer, T_max=150)  # 使用余弦退火算法改变学习率
    best_loss = 10

    print('******开始训练模型******')
    start = time.time()
    for epoch in range(epoch):
        time_all = 0
        start_time = time.time()
        train_loss, train_acc = train_model(train_loader, model, criterion, optimizer, device)  # 训练模型
        clr.step()  # 学习率迭代
        time_all = time.time() - start_time
        valid_loss, valid_acc = test_model(valid_loader, criterion, model, device)  # 测试模型
        print('- Epoch: %d - Train_loss: %.5f - Train_acc: %.5f - Val_loss: %.5f - Val_acc: %5f - T_Time: %.3f 当前学习率：%f'
              % (epoch, train_loss, train_acc, valid_loss, valid_acc, time_all, optimizer.state_dict()['param_groups'][0]['lr']))

        if valid_loss < best_loss:
            best_loss = valid_loss
            # torch.save(model.state_dict(), '/content/drive/MyDrive/Colab/深度学习/心跳信号预测/best_model.pt')  # 保存最优模型

    torch.cuda.empty_cache()
    end = time.time()
    run = end - start
    print('[2]train： %.5f sec' % run)

    print('******开始预测数据******')

    model = CNN()
    model.load_state_dict(torch.load('/content/drive/MyDrive/Colab/DeepLearning/心跳信号预测/best_model.pt'))  # 加载模型参数
    model.eval()
    model.cuda()
    predict_ali_testset(batch_size, model, device)  # 生成预测文件
    print('[3] prediction done')
    print('the end')

cuda:1
******开始预测数据******
预测文件写入完成
[3] prediction done
the end
