In [42]:
import igraph as ig
import numpy as np
import pandas as pd
import scipy.sparse as sp
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from torch_geometric.nn import MessagePassing, global_mean_pool

# Define the Apple Silicon device
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print(device)

mps


In [43]:
## 1. 数据预处理
# 读取网络数据
edge_file = "../data/raw/european/powergridEU_E.csv"
coord_file = "../data/raw/european/powergridEU_V.csv"

# 读取数据
edges = pd.read_csv(edge_file, header=None).values.tolist()
coords = pd.read_csv(coord_file, header=None)  # 节点坐标数据

# 创建无向图
g = ig.Graph.TupleList(edges, directed=False)
g.vs["name"] = [int(v) for v in g.vs["name"]]  # 节点名转为整数

num_nodes, num_edges = g.vcount(), g.ecount()
print("Number of nodes:", num_nodes, "Number of edges:", num_edges)

Number of nodes: 13478 Number of edges: 16922


In [44]:
distances_upper = sp.load_npz("../data/processed/distances.npz").toarray()
distances = distances_upper + distances_upper.T  # 恢复对称性
print("distances:", distances.shape)
print("-" * 20)

adj_sparse = sp.load_npz("../data/processed/adj_sparse.npz")
print("邻接矩阵 adj_sparse:", adj_sparse.shape)
print("-" * 20)

B1 = sp.load_npz("../data/processed/B1.npz")
print("节点-边关联矩阵 B1:", B1.shape)
print("-" * 20)

L1_tilde = sp.load_npz("../data/processed/L1_tilde.npz")
L1_tilde = torch.tensor(L1_tilde.toarray(), dtype=torch.float32)  # 转换为密集矩阵
print("Hodge 1-Laplacian 矩阵 L1_tilde:", L1_tilde.shape)
print("-" * 20)

X = np.load("../data/processed/X.npy", allow_pickle=True)
print("X:", X.shape)

distances: (13478, 13478)
--------------------
邻接矩阵 adj_sparse: (13478, 13478)
--------------------
节点-边关联矩阵 B1: (13478, 16922)
--------------------
Hodge 1-Laplacian 矩阵 L1_tilde: torch.Size([16922, 16922])
--------------------
X: (13478, 5)


In [36]:
## 数据预处理
# 模拟节点特征（例如电压、功率等）
node_features = torch.randn(num_nodes, 3)  # 3维节点特征
print(node_features.shape)

# 模拟边特征（例如电阻、电抗等）
edge_features = torch.randn(num_edges, 2)  # 2维边特征
print(edge_features.shape)

# 构建边索引，将edges转换为PyTorch张量
edges = pd.read_csv(edge_file, header=None).to_numpy()
edge_index = torch.tensor(edges, dtype=torch.long).T
print(edge_index.shape)

# 添加批次信息（假设只有一个图）
batch = torch.zeros(num_nodes, dtype=torch.long)
print(batch.shape)

torch.Size([13478, 3])
torch.Size([16922, 2])
torch.Size([2, 16922])
torch.Size([13478])


In [75]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# HoSC 单层模块：高阶单纯形卷积
class HigherOrderSimplicialConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.theta = nn.Linear(in_channels, out_channels)  # 权重矩阵 Θ_H [in_channels, out_channels]
        self.bn = nn.BatchNorm1d(out_channels)            # 批量归一化层，稳定训练

    def forward(self, Z_H, L1_tilde):
        # Z_H [M, in_channels]：输入边嵌入
        # L1_tilde [M, M]：Hodge 1-Laplacian
        Z_theta = self.theta(Z_H)                     # 线性变换 [M, out_channels]
        Z_conv = torch.sparse.mm(L1_tilde, Z_theta) if L1_tilde.is_sparse else torch.mm(L1_tilde, Z_theta)  # 卷积 [M, out_channels]
        Z_psi = F.relu(self.bn(Z_conv))               # 非线性变换（BatchNorm + ReLU）[M, out_channels]
        Z_max, _ = torch.max(Z_psi, dim=1, keepdim=True)  # 特征维度最大值 [M, 1]
        return Z_max

# HoSC 模块：多层高阶单纯形卷积
class HoSC(nn.Module):
    def __init__(self, input_dim, hidden_dims):
        super().__init__()
        self.layers = nn.ModuleList()                 # 存储多层 HoSC
        current_dim = input_dim                       # 初始输入维度
        for dim in hidden_dims:
            self.layers.append(HigherOrderSimplicialConv(current_dim, dim))  # 添加单层 [current_dim, dim]
            current_dim = 1                           # 每层输出固定为 1

    def forward(self, edge_attr, L1_tilde):
        # edge_attr [M, input_dim]：边特征
        # L1_tilde [M, M]：Hodge 1-Laplacian
        Z_list = []                                   # 存储各层输出
        Z_H = edge_attr                               # 初始边嵌入 [M, input_dim]
        for layer in self.layers:
            Z_H = layer(Z_H, L1_tilde)                # 单层 HoSC 输出 [M, 1]
            Z_list.append(Z_H)
        return torch.cat(Z_list, dim=1)               # 拼接所有层输出 [M, len(hidden_dims)]

# GNN 模块：节点特征提取
class GNN(nn.Module):
    def __init__(self, input_dim, hidden_dims):
        super().__init__()
        self.layers = nn.ModuleList()                 # 存储多层 GCN
        current_dim = input_dim                       # 初始输入维度
        for dim in hidden_dims:
            self.layers.append(nn.Linear(current_dim, dim))  # 线性层 [current_dim, dim]
            current_dim = dim                         # 更新维度

    def forward(self, X_n, A_tilde):
        # X_n [n, input_dim]：节点特征
        # A_tilde [n, n]：归一化邻接矩阵
        H = X_n                                       # 初始节点嵌入 [n, input_dim]
        for layer in self.layers:
            H = torch.mm(A_tilde, H)                  # 邻域聚合 [n, current_dim]
            H = F.relu(H @ layer.weight.T + layer.bias)  # 线性变换与激活 [n, next_dim]
        return H                                      # 最终节点嵌入 [n, hidden_dims[-1]]

# PPGN-HoSC 主模型
class PPGN_HoSC(nn.Module):
    def __init__(self, n_nodes, node_features, edge_features, node_hidden, edge_hidden):
        super().__init__()
        self.n_nodes = n_nodes                        # 节点数 n，例如 100
        self.gnn = GNN(node_features, node_hidden)    # GNN 模块，输入 node_features，输出 hidden_dims[-1]
        self.hosc = HoSC(edge_features, edge_hidden)  # HoSC 模块，输入 edge_features，输出 len(edge_hidden)
        # 全连接层
        self.fc1 = nn.Linear(node_hidden[-1] + len(edge_hidden), 2 * n_nodes)  # [d_{L_n} + L_e, 2n]
        self.fc2 = nn.Linear(2 * n_nodes, n_nodes)    # [2n, n]

    def forward(self, data):
        # 输入数据提取
        X_n = data.x                                  # 节点特征 [n, node_features]，例如 [100, 3]
        X_e = data.edge_attr                          # 边特征 [M, edge_features]，例如 [150, 1]
        A_tilde = data.A_tilde                        # 邻接矩阵 [n, n]，例如 [100, 100]
        L1_tilde = data.L1_tilde                      # Hodge 1-Laplacian [M, M]，例如 [150, 150]
        B1 = data.B1                                  # 关联矩阵 [n, M]，例如 [100, 150]

        # 节点特征提取
        H_n = self.gnn(X_n, A_tilde)                  # GNN 输出 [n, d_{L_n}]，例如 [100, 8]

        # 边特征与高阶拓扑建模
        Z_H = self.hosc(X_e, L1_tilde)                # HoSC 输出 [M, L_e]，例如 [150, 3]

        # 边到节点映射
        H_e = torch.sparse.mm(B1, Z_H) if B1.is_sparse else torch.mm(B1, Z_H)  # [n, L_e]，例如 [100, 3]

        # 特征融合
        H = torch.cat([H_n, H_e], dim=1)              # 拼接 [n, d_{L_n} + L_e]，例如 [100, 11]

        # 全局变换
        f = F.relu(self.fc1(H))                       # 第一层全连接 [n, 2n]，例如 [100, 200]
        f = self.fc2(f)                               # 第二层全连接 [n, n]，例如 [100, 100]

        # 概率输出
        z_p = F.sigmoid(f)                            # 独立概率 [n, n]，每个元素在 [0,1]
        return z_p[0]                                 # 单样本输出 [n]，例如 [100]

# 示例运行
n, M = 100, 150  # 节点数和边数
data = type('Data', (), {
    'x': torch.randn(n, 3),          # 节点特征 [100, 3]
    'edge_attr': torch.randn(M, 1),  # 边特征 [150, 1]
    'A_tilde': torch.randn(n, n),    # 邻接矩阵 [100, 100]
    'L1_tilde': torch.randn(M, M),   # Hodge 1-Laplacian [150, 150]
    'B1': torch.randn(n, M)          # 关联矩阵 [100, 150]
})()
model = PPGN_HoSC(n_nodes=n, node_features=3, edge_features=1, node_hidden=[32, 16, 8], edge_hidden=[16, 8, 1])
z_p = model(data)
print(f"Output shape: {z_p.shape}")  # 输出形状：[100]
print(f"Sample probabilities: {z_p[:5]}")  # 示例前五个概率
print(f"Sum of probabilities: {z_p.sum().item()}")  # 概率总和（无约束）

Output shape: torch.Size([100])
Sample probabilities: tensor([2.0985e-09, 3.4853e-11, 1.0000e+00, 3.7305e-16, 4.1692e-01],
       grad_fn=<SliceBackward0>)
Sum of probabilities: 46.11359405517578
