In [17]:
import torch
from torch import nn
from torchviz import make_dot
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GATConv
import numpy as np
from torch.utils.data import Dataset
from torch_geometric.loader import DataLoader
import shap

# 过滤警告
import warnings
warnings.filterwarnings("ignore")

In [3]:
class GAT_MODEL(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, edge_features_dim, drop_rate=0.5, inner_weights=None, outer_weights=None):
        super(GAT_MODEL, self).__init__()
        self.inner_weights = inner_weights
        self.outer_weights = outer_weights

        # Linear层
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)

        # GAT层
        self.conv1 = nn.ModuleList([GATConv(
            -1, 
            hidden_size, 
            heads=8, 
            dropout=drop_rate,
            concat=True,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True) for _ in range(6)])
      
        self.conv_out = GATConv(
            hidden_size*8, 
            output_size, 
            heads=1, 
            dropout=0.2,
            concat=False,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
    def forward(self, x, edge_index, edge_attr, edge_weight, batch):
        # 去除特殊值填充
        maskx = (x != 1000).float()
        x = x * maskx
        maske = (edge_attr != 1000).float()
        edge_attr = edge_attr * maske

        # # 对edge_attr进行加权，每一个权重值乘以一行特征(pls r2作为权重计算指标)
        # edge_attr = edge_attr * edge_weight.view(-1, 1)

        # # 对x进行加权，元素对应相乘（pls模型的权重）
        # x = x.reshape(-1, 8, x.shape[1])
        # x = torch.cat([x[i, :, :] * self.inner_weights for i in range(x.shape[0])], dim=0)

        # # 对edge_attr进行加权，元素对应相乘（pls模型的权重）
        # edge_attr = edge_attr.reshape(-1, 56, edge_attr.shape[1])
        # edge_attr = torch.cat([edge_attr[i, :, :] * self.outer_weights for i in range(edge_attr.shape[0])], dim=0)

        # GAT层
        for conv in self.conv1:
            x = conv(x, edge_index, edge_attr)
            x = F.elu(x)
            x = F.dropout(x, p=0.5, training=self.training)

        x = self.conv_out(x, edge_index, edge_attr)
        
        # print(x.shape)
        # print(batch, batch.shape)
        # assert False
        x = x.reshape(-1, 8)
        # print(x.shape)
        # 沿第二个维度平均池化
        x = torch.mean(x, dim=1, keepdim=True)
        # print(x.shape)
        assert x.shape[1] == 1
        return x
    
    def loss(self, output, target):
        target = torch.tensor(np.array(target), dtype=torch.float32)
        # print(output.shape, target.shape)
        
        output = output.to('cuda')
        target = target.to('cuda')
        loss = F.mse_loss(output, target)
        # print(output, target)
        # print(loss)
        # assert False
        return loss
    
# 创建模型实例
model = GAT_MODEL(
    input_size=4005, 
    hidden_size=32, 
    output_size=1,
    edge_features_dim=4500)

# 保存模型参数
torch.save(model.state_dict(), 'template_model.pth')

In [4]:
class GraphDataset(Dataset):
    def __init__(self, data_list):
        self.data_list = data_list

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        return self.data_list[idx]

In [None]:
# 加载模型参数
# 加载权重'models/92_best_best.pth'
model_path = 'models/92_best_best.pth'
model = torch.load(model_path)

# 读取测试数据
# 测试数据路径'dataset/test_dataset.pt'
test_data_path = 'dataset/test_dataset.pt'
test_data = torch.load(test_data_path)

# dataloader
test_loader = DataLoader(
    GraphDataset(test_data),
    batch_size=1,
    shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# # 计算梯度
# model.eval()
# for data in test_loader:
#     data = data.to(device)
#     out = model(data.x, data.edge_index, data.edge_attr, data.edge_weight, data.batch)
#     y = model.loss(out, data.y)
#     y.backward(retain_graph=True)
#     break

# # 使用 torchviz 生成计算图，并显示梯度数值
# dot = make_dot(y, params=dict(model.named_parameters()), show_attrs=False, show_saved=True)
# dot.format = 'png'
# dot.attr(dpi='300') 
# dot.render('model92计算图')

In [39]:
import torch
import torch.nn.functional as F
from captum.attr import IntegratedGradients

# 假设 model 是你的 GAT 模型实例，test_loader 是你的测试数据加载器
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
model.eval()  # 设置模型为评估模式

# 获取测试数据
for data in test_loader:
    test_data = data
    break

# 定义一个函数来预测模型输出
def model_predict(data1, data2, data3, data4):
    print('1')
    data1 = data1.to(device)
    data2 = data2.to(device)
    data3 = data3.to(device)
    data4 = data4.to(device)
    
    with torch.no_grad():
        out = model(data1, data2, data3, None, data4)
    return out

# 使用 Integrated Gradients 计算特征重要性
ig = IntegratedGradients(model_predict)
X = tuple((test_data.x, test_data.edge_index, test_data.edge_attr, test_data.batch))
attributions, delta = ig.attribute(X, target=0, return_convergence_delta=True)

# 可视化特征重要性
import matplotlib.pyplot as plt
import numpy as np

# 将 attributions 转换为 numpy 数组
attributions = attributions.cpu().detach().numpy()

# 绘制特征重要性图
plt.figure(figsize=(10, 6))
plt.bar(range(attributions.shape[1]), np.mean(np.abs(attributions), axis=0))
plt.xlabel('Feature')
plt.ylabel('Importance')
plt.title('Feature Importance using Integrated Gradients')
plt.show()

1


IndexError: The shape of the mask [56] at index 0 does not match the shape of the indexed tensor [2800, 4500] at index 0

In [None]:
class GAT_MODEL(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, edge_features_dim, drop_rate=0.5, inner_weights=None, outer_weights=None):
        super(GAT_MODEL, self).__init__()
        self.inner_weights = inner_weights
        self.outer_weights = outer_weights

        # GAT层
        self.conv1 = GATConv(
            -1, 
            hidden_size, 
            heads=8, 
            dropout=drop_rate,
            concat=True,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
        self.conv2 = GATConv(
            -1, 
            hidden_size, 
            heads=8, 
            dropout=drop_rate,
            concat=True,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
        self.conv3 = GATConv(
            -1, 
            hidden_size, 
            heads=8, 
            dropout=drop_rate,
            concat=True,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
        self.conv4 = GATConv(
            -1, 
            hidden_size, 
            heads=8, 
            dropout=drop_rate,
            concat=True,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
        self.conv5 = GATConv(
            -1, 
            hidden_size, 
            heads=8, 
            dropout=drop_rate,
            concat=True,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
        self.conv6 = GATConv(
            -1, 
            hidden_size, 
            heads=8, 
            dropout=drop_rate,
            concat=True,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
        self.conv_out = GATConv(
            hidden_size*8, 
            output_size, 
            heads=1, 
            dropout=0.2,
            concat=False,
            negative_slope=0.2,
            add_self_loops=True,
            edge_dim=edge_features_dim,
            bias=True,
            residual=True)
        
    def forward(self, x, edge_index, edge_attr, edge_weight, batch):
        # 去除特殊值填充
        maskx = (x != 1000).float()
        x = x * maskx
        maske = (edge_attr != 1000).float()
        edge_attr = edge_attr * maske

        # # 对edge_attr进行加权，每一个权重值乘以一行特征(pls r2作为权重计算指标)
        # edge_attr = edge_attr * edge_weight.view(-1, 1)

        # # 对x进行加权，元素对应相乘（pls模型的权重）
        # x = x.reshape(-1, 8, x.shape[1])
        # x = torch.cat([x[i, :, :] * self.inner_weights for i in range(x.shape[0])], dim=0)

        # # 对edge_attr进行加权，元素对应相乘（pls模型的权重）
        # edge_attr = edge_attr.reshape(-1, 56, edge_attr.shape[1])
        # edge_attr = torch.cat([edge_attr[i, :, :] * self.outer_weights for i in range(edge_attr.shape[0])], dim=0)

        # GAT层
        x = self.conv1(x, edge_index, edge_attr)
        x = F.elu(x)
        x = F.dropout(x, p=0.5, training=self.training)

        x = self.conv2(x, edge_index, edge_attr)
        x = F.elu(x)
        x = F.dropout(x, p=0.5, training=self.training)

        x = self.conv3(x, edge_index, edge_attr)
        x = F.elu(x)
        x = F.dropout(x, p=0.5, training=self.training)

        x = self.conv4(x, edge_index, edge_attr)
        x = F.elu(x)
        x = F.dropout(x, p=0.5, training=self.training)

        x = self.conv5(x, edge_index, edge_attr)
        x = F.elu(x)
        x = F.dropout(x, p=0.5, training=self.training)

        x = self.conv6(x, edge_index, edge_attr)
        x = F.elu(x)
        x = F.dropout(x, p=0.5, training=self.training)

        x = self.conv_out(x, edge_index, edge_attr)
        
        # print(x.shape)
        # print(batch, batch.shape)
        # assert False
        x = x.reshape(-1, 8)
        # print(x.shape)
        # 沿第二个维度平均池化
        x = torch.mean(x, dim=1, keepdim=True)
        # print(x.shape)
        assert x.shape[1] == 1
        return x
    
    def loss(self, output, target):
        target = torch.tensor(np.array(target), dtype=torch.float32)
        # print(output.shape, target.shape)
        
        output = output.to('cuda')
        target = target.to('cuda')
        loss = F.mse_loss(output, target)
        # print(output, target)
        # print(loss)
        # assert False
        return loss

In [15]:
import netron

# 使用 netron 可视化模型
netron.start('template_model.pth')

Serving 'template_model.pth' at http://localhost:8080


('localhost', 8080)