In [1]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GATConv
from torch_geometric.data import Data, DataLoader
import networkx as nx
import numpy as np
import random
import wandb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score, f1_score
import copy
import pandas as pd
from transformers import AutoTokenizer, AutoModel

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [None]:
# 加载语言模型
tokenizer = AutoTokenizer.from_pretrained("./chatglm2-6b", trust_remote_code=True)
model = AutoModel.from_pretrained("./chatglm2-6b", trust_remote_code=True, device_map="auto")
model.eval()

In [None]:
def generate_paths_and_expansion(G, nodes):
    paths = set(nodes)
    for i in range(len(nodes)):
        for j in range(i + 1, len(nodes)):
            try:
                if nodes[i] in G and nodes[j] in G:
                    try:
                        path = nx.shortest_path(G, source=nodes[i], target=nodes[j])
                    except nx.NetworkXNoPath:
                        path = nx.shortest_path(G, source=nodes[j], target=nodes[i])
                    paths.update(path)
            except nx.NetworkXNoPath:
                paths.update([nodes[i], nodes[j]])

    # 去重并扩展
    expanded_nodes = set(paths)
    
    for node in paths:
        neighbors = nx.single_source_shortest_path_length(G, node, cutoff=2).keys()
        expanded_nodes.update(neighbors)

    subgraph = G.subgraph(expanded_nodes).copy()
    edge_index = torch.tensor(list(subgraph.edges), dtype=torch.long).t().contiguous()
    if edge_index.numel() == 0:  # 如果没有边，返回一个空的边列表
        edge_index = torch.empty((2, 0), dtype=torch.long)
    return edge_index

In [3]:
# step1：存储
df = pd.read_csv(r'./data/trademark_KG.csv',encoding='utf-8')
entities = {}
entity_counter = 0

for entity in pd.concat([df['head_entity'], df['tail_entity']]).unique():
    entities[entity] = entity_counter
    entity_counter += 1

# 创建编号到实体的反向映射
id_to_entity = {v: k for k, v in entities.items()}

# 创建关系字典
relationships = {}
edges = []

for index, row in df.iterrows():
    # print(index)
    head_id = entities[row['head_entity']]
    tail_id = entities[row['tail_entity']]
    
    rel_law = row['rel_law']
    if pd.notna(rel_law) and rel_law.strip():
        relationships[(head_id, tail_id)] = f"{row['relationship']}({rel_law})"
    else:
        relationships[(head_id, tail_id)] = f"{row['relationship']}"
        
    edges.append((head_id, tail_id))

x = torch.zeros(len(entities), 1)
edge_index = torch.tensor(edges, dtype=torch.long).t()
G_data = Data(x=x,edge_index=edge_index)

# 通过键查询实体名称
def get_entity_name(entity_id):
    return id_to_entity.get(entity_id, "Entity not found")

# 通过实体名称查询实体编号
def get_entity_id(entity_name):
    return entities.get(entity_name, "Entity not found")

# 通过两个实体编号查询两个实体之间的关系
def get_relationship(entity_id1, entity_id2):
    return relationships.get((entity_id1, entity_id2), "No relationship found")
    
print(len(entities))
print(len(relationships))
print(len(edges))

426
449
449


In [5]:
#step2：加载图神经网络模型
class GAT(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, heads, num_layers, dropout):
        super(GAT, self).__init__()
        self.num_layers = num_layers
        self.dropout = dropout

        self.convs = torch.nn.ModuleList()
        self.convs.append(GATConv(in_channels, hidden_channels, heads=heads))
        for _ in range(num_layers - 2):
            self.convs.append(GATConv(hidden_channels * heads, hidden_channels, heads=heads))
        self.convs.append(GATConv(hidden_channels * heads, out_channels, heads=1))

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        for conv in self.convs[:-1]:
            x = conv(x, edge_index)
            x = x.relu()
            x = F.dropout(x, p=self.dropout, training=self.training)
        x = self.convs[-1](x, edge_index)
        return x
    
    def edge_prediction(self, data):
        x = self.forward(data)
        edge_index = data.edge_index.long()
        edge_scores = (x[edge_index[0]] * x[edge_index[1]]).sum(dim=1)
        return edge_scores
    
    def edge_index_prediction(self,data):
        pred = self.edge_prediction(data)
        pred = torch.round(torch.sigmoid_(pred));
        indices = torch.nonzero(pred == 1).flatten().tolist()
        return indices
    
# 超参数
in_channels = 1
hidden_channels = 16
out_channels = 8
heads = 4
num_layers = 4
dropout = 0.3

GAT_model = GAT(in_channels=in_channels, hidden_channels=hidden_channels, out_channels=out_channels, heads=heads,num_layers=num_layers,dropout=dropout).to(device)

if(device=="cuda"):
    GAT_model.load_state_dict(torch.load('model.pth'))
else:
    GAT_model.load_state_dict(torch.load('model.pth',map_location='cpu'))

In [12]:
# step3：检索
#构建图
edgelist = [(row[0], row[1]) for row in G_data.edge_index.numpy().T]
G = nx.DiGraph(edgelist)
#选定特定点
points = [0,1]
#传统图算法
subgraph_edge_index = generate_paths_and_expansion(G, points)
sub_numpy = subgraph_edge_index.numpy()
print(sub_numpy)

[[  0   0   0   0   0   0   0   0   0   0   1   1   1   1   1   1   3   3
    3   3   5   6  18  18  18  18  18  18  18  18  18  18  25  25  25  25
   25  25  25  25  25  25  25  25  25  25  23  64]
 [133 134 135 136 137 277   5   6 424 425 138 139 140   3  25  18 141 142
  143 144 146 147 169 170 168 171  20  76 172 173 174  19 181  26  27  28
   29 255  58  59  23  60 273  64  92  96  20 277]]
