In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import networkx as nx
from torch_geometric.data import Data
from torch_geometric.nn import GATConv
from scipy.sparse import coo_matrix
import numpy as np
from torch_geometric.utils import to_networkx
import random
from heapdict import heapdict

# 定义GAT模型
class GAT(torch.nn.Module):
    def __init__(self, num_features, num_heads=8):
        super(GAT, self).__init__()
        self.gat1 = GATConv(num_features, 16, heads=num_heads, dropout=0.6)
        self.gat2 = GATConv(16 * num_heads, 16, heads=1, concat=False, dropout=0.6)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.dropout(x, p=0.6, training=self.training)
        x = self.gat1(x, edge_index)
        x = F.elu(x)
        x = F.dropout(x, p=0.6, training=self.training)
        x = self.gat2(x, edge_index)
        return x

# 定义MLP模型
class MLP(nn.Module):
    def __init__(self, input_size):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, 32)
        self.fc2 = nn.Linear(32, 16)
        self.fc3 = nn.Linear(16, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x
    
class MLPClassifier(nn.Module):
    def __init__(self, input_size):
        super(MLPClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)  # 第一层
        self.fc2 = nn.Linear(64, 32)          # 第二层
        self.fc3 = nn.Linear(32, 1)           # 输出层

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))        # 使用sigmoid确保输出在0到1之间
        return x

# 函数以采样与边数相同数量的不存在的边
def sample_non_edges(dataset):
    non_edges_samples = []

    for data in dataset:
        num_nodes = data.num_nodes
        all_possible_pairs = {(i, j) for i in range(num_nodes) for j in range(num_nodes) if i != j}
        existing_edges = {tuple(edge) for edge in data.edge_index.t().tolist()}
        non_edges = list(all_possible_pairs - existing_edges)
        num_edges = data.edge_index.size(1)
        sampled_non_edges = random.sample(non_edges, num_edges)

        non_edges_samples.append(sampled_non_edges)

    return non_edges_samples

def generate_edge_embeddings(dataset, embeddings):
    combined_embeddings_per_graph = []

    for data, embedding in zip(dataset, embeddings):
        edge_embeddings = []
        non_edge_embeddings = []
        existing_edges = set()

        # 处理存在的边
        for edge in data.edge_index.t().numpy():
            edge = tuple(sorted(edge))  # 确保边的一致方向
            if edge not in existing_edges:
                node1_emb = embedding[edge[0]]
                node2_emb = embedding[edge[1]]
                edge_emb = torch.cat([node1_emb, node2_emb, torch.tensor([1.0])])  # 连接节点嵌入并添加标签1
                edge_embeddings.append(edge_emb)
                existing_edges.add(edge)
        # print(existing_edges)
        # 处理不存在的边
        sampled_non_edges = sample_non_edges([data])[0]
        for non_edge in sampled_non_edges:
            non_edge = tuple(sorted(non_edge))  # 确保边的一致方向
            if non_edge not in existing_edges:
                node1_emb = embedding[non_edge[0]]
                node2_emb = embedding[non_edge[1]]
                non_edge_emb = torch.cat([node1_emb, node2_emb, torch.tensor([0.0])])  # 连接节点嵌入并添加标签0
                non_edge_embeddings.append(non_edge_emb)
        combined_embeddings = edge_embeddings + non_edge_embeddings
        combined_embeddings_per_graph.append(combined_embeddings)

    return combined_embeddings_per_graph

def calculate_MVC(graph, UB=9999999, C=set()):
    """use branch and bound to find out the mvc result"""
    if len(graph.edges()) == 0:
        return C

    v, _ = max(graph.degree(), key=lambda a: a[1])

    # C1 分支：選擇鄰居
    C1 = C.copy()
    neighbors = set(graph.neighbors(v))
    C1.update(neighbors)
    graph_1 = graph.copy()
    graph_1.remove_nodes_from(neighbors)
    if len(C1) < UB:
        C1 = calculate_MVC(graph_1, UB, C1)

    # C2 分支：只選擇該節點
    C2 = C.copy()
    C2.add(v)
    graph_2 = graph.copy()
    graph_2.remove_node(v)
    if len(C2) < UB:
        C2 = calculate_MVC(graph_2, min(UB, len(C1)), C2)

    return min(C1, C2, key=len)

# 计算图级嵌入：对所有节点的嵌入进行平均
def get_graph_embedding(embeddings):
    graph_embeddings = []
    for embedding in embeddings:
        graph_embedding = embedding.mean(dim=0)  # 对所有节点嵌入求平均
        graph_embeddings.append(graph_embedding)
    return torch.stack(graph_embeddings)


創建資料集

In [2]:
# 创建50张图的数据集
dataset = []

for _ in range(50):
    G = nx.erdos_renyi_graph(50, 0.15)
    adj_matrix = nx.adjacency_matrix(G)
    adj_matrix = coo_matrix(adj_matrix)

    row = torch.from_numpy(adj_matrix.row.astype(np.int64))
    col = torch.from_numpy(adj_matrix.col.astype(np.int64))
    edge_index = torch.stack([row, col], dim=0)

    x = torch.eye(G.number_of_nodes())  # 节点特征

    data = Data(x=x, edge_index=edge_index)
    dataset.append(data)

model = GAT(num_features=50)

# 获取节点嵌入（不训练模型）
model.eval()
embeddings = []
with torch.no_grad():
    for data in dataset:
        embedding = model(data) # [num_nodes, num_features] = [50, 16]
        embeddings.append(embedding)

產生edge embedding，並透過MLP決定哪幾個邊要修改

In [3]:
# 获取每张图的边和非边嵌入 
# combined_embeddings_per_graph = [num_graphs, num_edges + num_non_edges, embedding_size] = [50, edge num + non edge num, 33]
combined_embeddings_per_graph = generate_edge_embeddings(dataset, embeddings)

# 假设每个节点嵌入的大小为16
node_embedding_size = 16
embedding_size = 2 * node_embedding_size + 1  # 两个节点嵌入的大小加上一个额外的标签

# 实例化MLP模型
mlp = MLP(input_size=embedding_size)

# 对每个图的嵌入进行预测并使用伯努利分布决定是否修改边
mlp_predictions_per_graph = []
mlp_decisions_per_graph = []

mlp.eval()
with torch.no_grad():
    for graph_embeddings in combined_embeddings_per_graph:
        graph_embeddings_tensor = torch.stack(graph_embeddings)
        probabilities = mlp(graph_embeddings_tensor).squeeze()
        bernoulli = torch.distributions.Bernoulli(probabilities)
        decisions = bernoulli.sample()

        mlp_predictions_per_graph.append(probabilities)
        mlp_decisions_per_graph.append(decisions)



修改圖

In [4]:
# 修改每张图的边
modified_graphs = []

for data, decisions, non_edges_samples in zip(dataset, mlp_decisions_per_graph, sample_non_edges(dataset)):
    G = nx.Graph()
    G.add_nodes_from(range(data.num_nodes))

    # 添加原始存在的边
    for edge in data.edge_index.t().numpy():
        G.add_edge(*edge)

    # 遍历每个边和非边的决策
    for i, decision in enumerate(decisions):
        if decision.item() == 1:  # 如果MLP预测为1，修改边
            if i < data.edge_index.size(1):  # 检查是边还是非边
                # 确定边的方向
                edge = data.edge_index[:, i].numpy()
                edge = tuple(sorted(edge))
                
                # 删除原始存在的边（如果存在）
                if G.has_edge(*edge):
                    G.remove_edge(*edge)
            else:
                # 处理不存在的边
                non_edge = non_edges_samples[i - data.edge_index.size(1)]
                non_edge = tuple(sorted(non_edge))
                
                # 添加原本不存在的边
                if not G.has_edge(*non_edge):
                    G.add_edge(*non_edge)

    modified_graphs.append(G)


將新的圖轉換成embedding

In [5]:
# 生成 modified_graphs 的图数据
modified_dataset = []
for G in modified_graphs:
    # 从 NetworkX 图创建边索引
    edge_index = torch.tensor(list(G.edges)).t().contiguous()
    
    # 使用单位矩阵作为节点特征
    x = torch.eye(G.number_of_nodes())
    
    # 创建 Data 对象
    data = Data(x=x, edge_index=edge_index)
    modified_dataset.append(data)

# 使用 GAT 模型为 modified_graphs 生成嵌入
model.eval()  # 确保模型处于评估模式
modified_embeddings = []
with torch.no_grad():
    for data in modified_dataset:
        embedding = model(data)
        modified_embeddings.append(embedding)

# # 如果还没有为原始 dataset 生成嵌入，则重复该过程
original_embeddings = []
with torch.no_grad():
    for data in dataset:
        embedding = model(data)
        original_embeddings.append(embedding)

# 现在，`modified_embeddings` 包含了修改后图的嵌入，`original_embeddings` 包含了原始图的嵌入

# 获取两组嵌入的图级嵌入
modified_graph_embeddings = get_graph_embedding(modified_embeddings)
original_graph_embeddings = get_graph_embedding(original_embeddings)

計算loss function

In [None]:
# 计算余弦相似度，用來作為similarity loss
cos = nn.CosineSimilarity(dim=1)
cosine_similarities = cos(modified_graph_embeddings, original_graph_embeddings)
# cosine_similarities 包含每对图（修改后的图和原始图）之间的余弦相似度

combined_embeddings = [torch.cat((mod_emb, orig_emb)) for mod_emb, orig_emb in zip(modified_graph_embeddings, original_graph_embeddings)]
labels = []
for mod_graph, orig_graph in zip(modified_graphs, dataset):
    mod_mvc = len(calculate_MVC(to_networkx(mod_graph, to_undirected=True)))
    orig_mvc = len(calculate_MVC(to_networkx(orig_graph, to_undirected=True)))
    label = 1 if mod_mvc == orig_mvc else 0
    labels.append(label)

# 将嵌入和标签转换为张量
combined_embeddings_tensor = torch.stack(combined_embeddings)
labels_tensor = torch.tensor(labels)

# 实例化 MLP 分类器，用來作為classification loss
input_size = combined_embeddings_tensor.size(1)
classifier = MLPClassifier(input_size)

In [None]:
# 代码继续

# 定义损失函数
criterion = nn.BCELoss()  # Binary Cross-Entropy Loss

# 训练模式
classifier.train()

# 假设一些训练超参数
epochs = 10
learning_rate = 0.001
optimizer = torch.optim.Adam(classifier.parameters(), lr=learning_rate)

for epoch in range(epochs):
    total_loss = 0

    for combined_emb, label in zip(combined_embeddings_tensor, labels_tensor):
        combined_emb = combined_emb.unsqueeze(0)  # 添加批次维度
        label = label.unsqueeze(0)  # 添加批次维度

        # 正向传播
        classifier_output = classifier(combined_emb)
        classification_loss = criterion(classifier_output, label.float())

        # 计算 similarity loss
        modified_emb, original_emb = torch.split(combined_emb, split_size_or_sections=combined_emb.size(1)//2, dim=1)
        similarity_loss = 1 - cos(modified_emb, original_emb).mean()

        # 总损失
        loss = classification_loss + similarity_loss
        print(f"loss = {loss:.4f}")
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss / len(combined_embeddings_tensor):.4f}")
