In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from captum.attr import IntegratedGradients
import shap

# 基础模块
class TCNLayer(nn.Module):
    def __init__(self, input_size, hidden_size, kernel_size, dilation, dropout):
        super(TCNLayer, self).__init__()
        self.conv = nn.Conv1d(input_size, hidden_size, kernel_size, dilation=dilation, padding=(kernel_size - 1) * dilation // 2)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.dropout(x)
        x = self.relu(x)
        return x


class TCN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, kernel_size=3, dropout=0.2):
        super(TCN, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(num_layers):
            dilation = 2 ** i
            in_channels = input_size if i == 0 else hidden_size
            self.layers.append(TCNLayer(in_channels, hidden_size, kernel_size, dilation, dropout))

    def forward(self, x):
        # 输入为 (batch_size, seq_len, input_size)，调整为 (batch_size, input_size, seq_len)
        x = x.permute(0, 2, 1)
        for layer in self.layers:
            x = layer(x)
        return x.permute(0, 2, 1)  # 返回 (batch_size, seq_len, hidden_size)


class iTransformer(nn.Module):
    def __init__(self, input_size, hidden_size, num_heads, num_layers, dropout=0.1):
        super(iTransformer, self).__init__()
        self.embedding = nn.Linear(input_size, hidden_size)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=num_heads, dropout=dropout)
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.projection = nn.Linear(hidden_size, 1)  # 输出维度为 1，预测单个目标值

    def forward(self, x):
        x = self.embedding(x)
        x = self.encoder(x)
        return self.projection(x).squeeze(-1)


class TimeSeriesPredictor(nn.Module):
    def __init__(self, input_size, tcn_hidden_size, tcn_layers, transformer_hidden_size, transformer_heads, transformer_layers):
        super(TimeSeriesPredictor, self).__init__()
        self.tcn = TCN(input_size, tcn_hidden_size, tcn_layers)
        self.transformer = iTransformer(tcn_hidden_size, transformer_hidden_size, transformer_heads, transformer_layers)

    def forward(self, x):
        tcn_out = self.tcn(x)
        transformer_out = self.transformer(tcn_out)
        return transformer_out


In [4]:
import numpy as np

# 生成时间序列数据
def generate_data(num_samples, seq_len, num_features):
    x = np.random.rand(num_samples, seq_len, num_features)
    y = np.mean(x, axis=1)  # 目标是预测序列均值
    return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)


# 训练数据
num_samples = 1000
seq_len = 30
num_features = 10
x_train, y_train = generate_data(num_samples, seq_len, num_features)
x_test, y_test = generate_data(200, seq_len, num_features)


In [5]:
# 定义超参数
input_size = num_features
tcn_hidden_size = 32
tcn_layers = 2
transformer_hidden_size = 64
transformer_heads = 4
transformer_layers = 2
epochs = 20
lr = 1e-3

# 初始化模型
model = TimeSeriesPredictor(input_size, tcn_hidden_size, tcn_layers, transformer_hidden_size, transformer_heads, transformer_layers)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.MSELoss()

# 训练
model.train()
for epoch in range(epochs):
    optimizer.zero_grad()
    output = model(x_train)
    loss = loss_fn(output, y_train)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")


Using a target size (torch.Size([1000, 10])) that is different to the input size (torch.Size([1000, 30])). This will likely lead to incorrect results due to broadcasting. Please ensure they have the same size.


RuntimeError: The size of tensor a (30) must match the size of tensor b (10) at non-singleton dimension 1

In [None]:
# 创建 SHAP 解释器
explainer = shap.DeepExplainer(model, x_train[:100])  # 使用部分训练数据
shap_values = explainer.shap_values(x_test[:10])

# 可视化 SHAP 值
shap.summary_plot(shap_values, x_test[:10].numpy())


In [None]:
# 创建 Integrated Gradients 解释器
ig = IntegratedGradients(model)

# 计算特定样本的特征重要性
sample = x_test[0].unsqueeze(0)  # 单个样本
attributions, delta = ig.attribute(sample, target=0, return_convergence_delta=True)

# 可视化重要性
import matplotlib.pyplot as plt
plt.bar(range(seq_len), attributions.squeeze(0).mean(dim=1).detach().numpy())
plt.xlabel("Time Steps")
plt.ylabel("Feature Importance")
plt.title("Integrated Gradients Attribution")
plt.show()


In [None]:
# 测试模型
model.eval()
with torch.no_grad():
    predictions = model(x_test)

# 可视化预测结果
import matplotlib.pyplot as plt
plt.plot(y_test[:50].numpy(), label="True")
plt.plot(predictions[:50].numpy(), label="Predicted")
plt.legend()
plt.title("Prediction vs True")
plt.show()
