In [1]:
import pandas as pd
import torch.nn as nn
from torch.nn.utils import weight_norm
import torch


class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        """
        其实这就是一个裁剪的模块，裁剪多出来的padding
        """
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
    def __init__(self,
                 n_inputs,
                 n_outputs,
                 kernel_size,
                 stride,
                 dilation,
                 padding,
                 dropout=0.2):
        """
        相当于一个Residual block

        :param n_inputs: int, 输入通道数
        :param n_outputs: int, 输出通道数
        :param kernel_size: int, 卷积核尺寸
        :param stride: int, 步长，一般为1
        :param dilation: int, 膨胀系数
        :param padding: int, 填充系数
        :param dropout: float, dropout比率
        """
        super(TemporalBlock, self).__init__()
        # 经过conv1，输出的size其实是(Batch, input_channel, seq_len + padding)
        self.conv1 = weight_norm(nn.Conv1d(n_inputs,
                                           n_outputs,
                                           kernel_size,
                                           stride=stride,
                                           padding=padding,
                                           dilation=dilation))
        # 裁剪掉多出来的padding部分，维持输出时间步为seq_len
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = weight_norm(nn.Conv1d(n_outputs,
                                           n_outputs,
                                           kernel_size,
                                           stride=stride,
                                           padding=padding,
                                           dilation=dilation))

        #  裁剪掉多出来的padding部分，维持输出时间步为seq_len
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1,
                                 self.chomp1,
                                 self.relu1,
                                 self.dropout1,
                                 self.conv2,
                                 self.chomp2,
                                 self.relu2,
                                 self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        self.init_weights()

    def init_weights(self):
        """
        参数初始化

        :return:
        """
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        """
        :param x: size of (Batch, input_channel, seq_len)
        :return:
        """
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)


class TemporalConvNet(nn.Module):
    def __init__(self,
                 num_inputs,
                 num_channels,
                 kernel_size=2,
                 dropout=0.2):
        """
        TCN，目前paper给出的TCN结构很好的支持每个时刻为一个数的情况，即sequence结构，
        对于每个时刻为一个向量这种一维结构，勉强可以把向量拆成若干该时刻的输入通道，
        对于每个时刻为一个矩阵或更高维图像的情况，就不太好办。

        :param num_inputs: int， 输入通道数
        :param num_channels: list，每层的hidden_channel数，例如[25,25,25,25]表示有4个隐层，每层hidden_channel数为25
        :param kernel_size: int, 卷积核尺寸
        :param dropout: float, drop_out比率
        """
        super(TemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            # 膨胀系数：1，2，4，8……
            dilation_size = 2 ** i
            # 确定每一层的输入通道数
            in_channels = num_inputs if i == 0 else num_channels[i - 1]
            # 确定每一层的输出通道数
            out_channels = num_channels[i]
            layers += [TemporalBlock(in_channels,
                                     out_channels,
                                     kernel_size,
                                     stride=1,
                                     dilation=dilation_size,
                                     padding=(kernel_size - 1) * dilation_size,
                                     dropout=dropout)]

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        """
        输入x的结构不同于RNN，一般RNN的size为(Batch, seq_len, channels)或者(seq_len, Batch, channels)，
        这里把seq_len放在channels后面，把所有时间步的数据拼起来，当做Conv1d的输入尺寸，实现卷积跨时间步的操作，
        很巧妙的设计。

        :param x: size of (Batch, input_channel, seq_len)
        :return: size of (Batch, output_channel, seq_len)
        """
        return self.network(x)

In [2]:
class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size=2, dropout=0.2):
        super(TCN, self).__init__()
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size, dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
        self.double()

    def forward(self, x):
        # x: (batch_size, input_size, seq_len)
        y = self.tcn(x)
        # y: (batch_size, num_channels[-1], seq_len)
        y = y.transpose(1, 2).contiguous()
        # y: (batch_size, seq_len, num_channels[-1])
        y = self.linear(y[:, -1, :])
        # 取最后一个时间步的输出，y: (batch_size, output_size)
        return y

In [3]:
from torch.utils.data import Dataset, DataLoader
import os
import numpy as np


# 自定义数据集类
class MyDataset(Dataset):
    def __init__(self, score_path, source_dir):
        self.score_df = pd.read_excel(score_path)
        self.data_dict = {}
        for i, row in (self.score_df).iterrows():
            file_path = os.path.join(source_dir, str(row['folder_name']), str(row['file_name']))
            self.data_dict[file_path] = row['score']
        # 获取所有数据的路径
        self.file_paths = list(self.data_dict.keys())

    def __len__(self):
        return len(self.score_df)

    def __getitem__(self, index):
        file_path = self.file_paths[index]
        # 把第一列设置为索引，输出的时候忘记了
        data_df = pd.read_csv(file_path, index_col=0)
        data = np.array(data_df)
        data = torch.from_numpy(data.T)
        label = self.score_df.loc[index, 'score']
        return data, label


In [4]:
score_path = r'.\input\score.xlsx'
source_dir = r'.\input'
dataset = MyDataset(score_path=score_path, source_dir=source_dir)

In [5]:
import torch.optim as optim


def train(model, dataset, epochs, batch_size, lr):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs):
        epoch_loss = 0.0
        for inputs, targets in dataloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), targets)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss / len(dataloader)}")


def test(model, dataset, batch_size):
    model.eval()
    test_dataloader = DataLoader(dataset, batch_size=batch_size)
    criterion = nn.MSELoss()

    test_loss = 0.0
    y_pred = []
    y_true = []
    with torch.no_grad():
        for inputs, targets in test_dataloader:
            outputs = model(inputs)
            print(outputs)
            print(y_pred)
            y_pred.append(outputs.numpy())
            y_true.append(targets.numpy())
            loss = criterion(outputs.squeeze(), targets)
            test_loss += loss.item() * len(inputs)

    y_pred = np.array(y_pred)
    y_true = np.array(y_true)
    diff = np.abs(y_pred - y_true)
    results = pd.DataFrame({'label': y_true.squeeze(), 'prediction': y_pred.squeeze(), 'abs_diff': diff.squeeze()})

    test_loss /= len(dataset)
    print(f"Test Loss: {test_loss}")
    return results


In [6]:
# from torchsummary import summary
#
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = TCN(input_size=20, output_size=1, num_channels=[16, 32, 64]).to(device)
# print(device)
# summary(model, input_size=(20, 441))

In [7]:
if __name__ == '__main__':
    input_size = 20  # 输入序列的维度
    output_size = 1  # 输出序列的维度
    num_channels = [32, 64, 128]  # 每层的隐藏层通道数
    kernel_size = 5  # 卷积核尺寸
    dropout = 0.2  # Dropout比率
    lr = 0.001  # 学习率
    batch_size = 64  # 批次大小
    num_epochs = 200  # 训练轮数

    # 实例化网络
    model = TCN(input_size, output_size, num_channels, kernel_size, dropout)

    # 将数据划分为训练集和测试集
    train_size = int(len(dataset) * 0.9)
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

    train(model, dataset=train_dataset, epochs=num_epochs, batch_size=batch_size, lr=lr)
    # df = test(model, dataset=test_dataset, batch_size=batch_size)

Epoch 1/200, Loss: 8.1867039733215
Epoch 2/200, Loss: 3.34919529259127
Epoch 3/200, Loss: 3.9153269428034907
Epoch 4/200, Loss: 2.560476190176488
Epoch 5/200, Loss: 2.024712985014739
Epoch 6/200, Loss: 2.1076122955797376
Epoch 7/200, Loss: 2.646794076218676
Epoch 8/200, Loss: 2.0253385278429588
Epoch 9/200, Loss: 1.4933864935182484
Epoch 10/200, Loss: 1.5101631297108773
Epoch 11/200, Loss: 1.7746998039241424
Epoch 12/200, Loss: 1.498765478060875
Epoch 13/200, Loss: 1.5702293216483791
Epoch 14/200, Loss: 1.3369098879435002
Epoch 15/200, Loss: 1.8225452163842444
Epoch 16/200, Loss: 1.5264417279744376
Epoch 17/200, Loss: 1.69829809908145
Epoch 18/200, Loss: 1.2859709538696567
Epoch 19/200, Loss: 1.4754642712794475
Epoch 20/200, Loss: 1.446673045338254
Epoch 21/200, Loss: 1.4680793742350755
Epoch 22/200, Loss: 1.4295110875007688
Epoch 23/200, Loss: 1.7182332440378856
Epoch 24/200, Loss: 1.2620974382327779
Epoch 25/200, Loss: 1.378017627928397
Epoch 26/200, Loss: 1.230534687230379
Epoch 27/