In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import TransformerConv
from torch_geometric.data import Data
from torch_geometric.utils import dense_to_sparse, get_laplacian, to_scipy_sparse_matrix
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from torch.utils.tensorboard import SummaryWriter
from scipy.sparse.linalg import eigs

In [2]:
# --- 1. 超参数配置 (新增 pos_encoding_dim) ---
hparams = {
    'dataset': 'car',
    'threshold_pos': 500,
    'threshold_neg': 20000,
    'hidden_channels': 16,
    'heads': 4,
    'pos_encoding_dim': 8,  # 【新增】为每个图计算8维的位置编码
    'learning_rate': 0.005,
    'weight_decay': 5e-4,
    'epochs': 150,
    'dropout': 0.5
}

In [3]:
# --- 2. TensorBoard 设置 ---
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
log_dir_name = f"../runs/{hparams['dataset']}_transformer_pe_d={hparams['pos_encoding_dim']}_{timestamp}"
writer = SummaryWriter(log_dir_name)
print(f"TensorBoard 日志将保存在: {log_dir_name}")

TensorBoard 日志将保存在: ../runs/car_transformer_pe_d=8_20250926-145830


In [4]:
# --- 3. 【核心修改】新增位置编码计算函数 ---
def get_laplacian_positional_encoding(edge_index, num_nodes, pos_encoding_dim):
    """计算拉普拉斯特征向量位置编码"""
    # 转换为 SciPy 稀疏矩阵
    adj = to_scipy_sparse_matrix(edge_index, num_nodes=num_nodes)
    
    # 计算归一化拉普拉斯矩阵
    from sklearn.utils.extmath import safe_sparse_dot
    from scipy.sparse import spdiags
    
    n_nodes = adj.shape[0]
    adj = adj.astype(float)
    
    # 计算度矩阵的逆平方根
    d = np.array(adj.sum(axis=1)).flatten()
    d_inv_sqrt = 1. / np.sqrt(d)
    d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
    d_inv_sqrt_matrix = spdiags(d_inv_sqrt, 0, n_nodes, n_nodes)
    
    # I - D^-1/2 * A * D^-1/2
    laplacian = spdiags(np.ones(n_nodes), 0, n_nodes, n_nodes) - d_inv_sqrt_matrix @ adj @ d_inv_sqrt_matrix

    # 特征分解，寻找最小的 k 个特征值对应的特征向量
    # k 设为 pos_encoding_dim + 1 是因为第一个特征向量（对应0特征值）通常是常数，需要忽略
    try:
        # 'SM' 表示寻找绝对值最小的特征值
        _, eigvecs = eigs(laplacian, k=pos_encoding_dim + 1, which='SM', tol=1e-5)
        eigvecs = np.real(eigvecs[:, 1:]) # 忽略第一个特征向量
    except Exception as e:
        print(f"特征分解失败: {e}。使用随机向量作为备用。")
        eigvecs = np.random.rand(num_nodes, pos_encoding_dim)

    return torch.from_numpy(eigvecs.astype(np.float32))

In [5]:
# --- 4. 数据加载与预处理函数 (修改以加入位置编码) ---
def load_and_prepare_data(dataset_name, threshold_pos, threshold_neg, pos_encoding_dim):
    base_path = f'../data/{dataset_name}/'
    
    # ... (加载 x_features, a_plus_pos, a_plus_neg 的代码与之前相同) ...
    features_path = f"{base_path}{dataset_name}.data.cleaned.csv"
    x_numpy = np.loadtxt(features_path, delimiter=',')
    x_features = torch.tensor(x_numpy, dtype=torch.float)
    num_nodes = x_features.shape[0]

    adj_matrix_pos_path = f"{base_path}{dataset_name}_A_plus_UG.csv"
    a_plus_pos_numpy = np.loadtxt(adj_matrix_pos_path, delimiter=',')
    a_plus_pos = torch.tensor(a_plus_pos_numpy, dtype=torch.float)
    a_plus_pos[a_plus_pos <= threshold_pos] = 0
    a_plus_pos.fill_diagonal_(0)
    edge_index_pos, _ = dense_to_sparse(a_plus_pos)

    adj_matrix_neg_path = f"{base_path}{dataset_name}_A_negative_UG.csv"
    a_plus_neg_numpy = np.loadtxt(adj_matrix_neg_path, delimiter=',')
    a_plus_neg = torch.tensor(a_plus_neg_numpy, dtype=torch.float)
    a_plus_neg[a_plus_neg <= threshold_neg] = 0
    a_plus_neg.fill_diagonal_(0)
    edge_index_neg, _ = dense_to_sparse(a_plus_neg)

    # --- 【核心修改】计算并拼接位置编码 ---
    print("正在计算正概念图的位置编码...")
    pos_enc_pos = get_laplacian_positional_encoding(edge_index_pos, num_nodes, pos_encoding_dim)
    
    print("正在计算负概念图的位置编码...")
    pos_enc_neg = get_laplacian_positional_encoding(edge_index_neg, num_nodes, pos_encoding_dim)

    # 将原始特征与两种位置编码拼接
    x_enhanced = torch.cat([x_features, pos_enc_pos, pos_enc_neg], dim=1)
    print(f"原始特征维度: {x_features.shape[1]}")
    print(f"增强后特征维度: {x_enhanced.shape[1]}")

    # ... (加载标签 y 和数据划分的代码与之前相同) ...
    labels_path = f"{base_path}{dataset_name}.data"
    if dataset_name == 'car':
        column_names = ["buying", "maint", "doors", "persons", "lug_boot", "safety", "class"]
        df = pd.read_csv(labels_path, header=None, names=column_names)
        labels_numpy = df['class'].values
    else:
        df = pd.read_csv(f"{base_path}{dataset_name}.data.csv")
        labels_numpy = df.iloc[:, -1].values
        
    encoder = LabelEncoder()
    y_numpy = encoder.fit_transform(labels_numpy)
    y = torch.tensor(y_numpy, dtype=torch.long)
    if num_nodes != len(y): y = y[:num_nodes]

    # 【修改】Data对象现在使用增强后的特征 x_enhanced
    data = Data(x=x_enhanced, y=y,
                edge_index_pos=edge_index_pos,
                edge_index_neg=edge_index_neg)

    num_train = int(num_nodes * 0.6)
    num_val = int(num_nodes * 0.2)
    indices = torch.randperm(num_nodes)
    data.train_mask = torch.zeros(num_nodes, dtype=torch.bool); data.train_mask[indices[:num_train]] = True
    data.val_mask = torch.zeros(num_nodes, dtype=torch.bool); data.val_mask[indices[num_train:num_train + num_val]] = True
    data.test_mask = torch.zeros(num_nodes, dtype=torch.bool); data.test_mask[indices[num_train + num_val:]] = True
    
    return data, len(np.unique(y_numpy))

In [6]:
# --- 5. 定义模型 (输入维度 in_channels 会改变) ---
class DualConceptTransformer(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, heads=1, dropout=0.5):
        super(DualConceptTransformer, self).__init__()
        self.dropout = dropout
        self.pos_conv = TransformerConv(in_channels, hidden_channels, heads=heads)
        self.neg_conv = TransformerConv(in_channels, hidden_channels, heads=heads)
        self.fusion_layer = nn.Linear(hidden_channels * heads * 2, out_channels)

    def forward(self, x, edge_index_pos, edge_index_neg):
        h_pos = self.pos_conv(x, edge_index_pos)
        h_pos = F.relu(h_pos)
        h_pos = F.dropout(h_pos, p=self.dropout, training=self.training)
        
        h_neg = self.neg_conv(x, edge_index_neg)
        h_neg = F.relu(h_neg)
        h_neg = F.dropout(h_neg, p=self.dropout, training=self.training)
        
        h_combined = torch.cat([h_pos, h_neg], dim=1)
        out = self.fusion_layer(h_combined)
        return out


In [7]:
# --- 6. 实例化数据和模型 ---
data, num_classes = load_and_prepare_data(hparams['dataset'], 
                                          hparams['threshold_pos'], 
                                          hparams['threshold_neg'],
                                          hparams['pos_encoding_dim'])

# 【修改】模型的 in_channels 现在是增强后的特征维度
model = DualConceptTransformer(in_channels=data.num_node_features, 
                               hidden_channels=hparams['hidden_channels'], 
                               out_channels=num_classes,
                               heads=hparams['heads'],
                               dropout=hparams['dropout'])

optimizer = torch.optim.Adam(model.parameters(), lr=hparams['learning_rate'], weight_decay=hparams['weight_decay'])
criterion = torch.nn.CrossEntropyLoss()

正在计算正概念图的位置编码...
正在计算负概念图的位置编码...
原始特征维度: 25
增强后特征维度: 41


  d_inv_sqrt = 1. / np.sqrt(d)


In [8]:
# --- 7. 训练与评估函数 (与之前相同) ---
def train(epoch):
    model.train()
    optimizer.zero_grad()
    out = model(data.x, data.edge_index_pos, data.edge_index_neg)
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    writer.add_scalar('Loss/train', loss.item(), epoch)
    return loss.item()

def evaluate(epoch):
    model.eval()
    with torch.no_grad():
        out = model(data.x, data.edge_index_pos, data.edge_index_neg)
        pred = out.argmax(dim=1)
        
        train_acc = (pred[data.train_mask] == data.y[data.train_mask]).sum().item() / data.train_mask.sum().item()
        val_acc = (pred[data.val_mask] == data.y[data.val_mask]).sum().item() / data.val_mask.sum().item()
        test_acc = (pred[data.test_mask] == data.y[data.test_mask]).sum().item() / data.test_mask.sum().item()

        writer.add_scalar('Accuracy/train', train_acc, epoch)
        writer.add_scalar('Accuracy/validation', val_acc, epoch)
        writer.add_scalar('Accuracy/test', test_acc, epoch)
        
        return train_acc, val_acc, test_acc

In [9]:
# --- 8. 主训练循环 (与之前相同) ---
print("\n--- 开始训练 (带位置编码的双概念格 Graph Transformer) ---")
for epoch in range(1, hparams['epochs'] + 1):
    loss = train(epoch)
    if epoch % 1 == 0:
        train_acc, val_acc, test_acc = evaluate(epoch)
        print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Test Acc: {test_acc:.4f}')

# --- 训练完成后 ---
final_train_acc, final_val_acc, final_test_acc = evaluate(hparams['epochs'])
print(f'--- 训练完成 ---')
print(f'最终测试集准确率: {final_test_acc:.4f}')

metrics = {'accuracy/final_train': final_train_acc, 'accuracy/final_validation': final_val_acc, 'accuracy/final_test': final_test_acc}
writer.add_hparams(hparams, metrics)
writer.close()


--- 开始训练 (带位置编码的双概念格 Graph Transformer) ---
Epoch: 001, Loss: 1.4115, Train Acc: 0.8427, Val Acc: 0.8058, Test Acc: 0.8588
Epoch: 002, Loss: 1.2205, Train Acc: 0.8388, Val Acc: 0.7942, Test Acc: 0.8617
Epoch: 003, Loss: 1.0470, Train Acc: 0.8282, Val Acc: 0.7739, Test Acc: 0.8530
Epoch: 004, Loss: 0.8856, Train Acc: 0.8195, Val Acc: 0.7507, Test Acc: 0.8415
Epoch: 005, Loss: 0.7484, Train Acc: 0.8147, Val Acc: 0.7449, Test Acc: 0.8357
Epoch: 006, Loss: 0.6325, Train Acc: 0.8272, Val Acc: 0.7681, Test Acc: 0.8444
Epoch: 007, Loss: 0.5433, Train Acc: 0.8504, Val Acc: 0.8000, Test Acc: 0.8646
Epoch: 008, Loss: 0.4829, Train Acc: 0.8639, Val Acc: 0.8348, Test Acc: 0.8876
Epoch: 009, Loss: 0.4429, Train Acc: 0.8678, Val Acc: 0.8435, Test Acc: 0.8876
Epoch: 010, Loss: 0.3909, Train Acc: 0.8736, Val Acc: 0.8435, Test Acc: 0.8905
Epoch: 011, Loss: 0.3630, Train Acc: 0.9044, Val Acc: 0.8754, Test Acc: 0.9135
Epoch: 012, Loss: 0.3306, Train Acc: 0.9218, Val Acc: 0.9014, Test Acc: 0.9395
Epoch: 