In [None]:
import numpy as np
from tqdm import tqdm
import hnswlib
import torch

# set all seed
def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

## Load data

In [None]:
def load_glove_embeddings(file_path):
    embeddings = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            word = parts[0]
            vector = np.array(parts[1:], dtype=np.float32)
            embeddings[word] = vector
    return embeddings

glove_path = '../data/glove.6B/glove.6B.300d.txt'  # Update this path as needed
embeddings = load_glove_embeddings(glove_path)

# Create word list and corresponding vectors
words = list(embeddings.keys())
vectors = np.array([embeddings[word] for word in words])

## Build HNSW index

In [None]:
import hnswlib

dim = vectors.shape[1]
num_elements = vectors.shape[0]

# Initialize hnswlib index
index = hnswlib.Index(space='cosine', dim=dim)
index.init_index(max_elements=num_elements, M=16, ef_construction=200, random_seed=42)
index.add_items(vectors, np.arange(num_elements))
index.set_ef(50)

# Retrieve neighbors for each node
k = 10  # Number of neighbors
labels, distances = index.knn_query(vectors, k=k)

# Build edge list
edge_index = []
for i in range(num_elements):
    for j in labels[i]:
        edge_index.append([i, j])

edge_index = np.array(edge_index).T  # Shape: [2, num_edges]

## Prepare dataset

In [None]:
import torch
from torch_geometric.data import Data
from torch_geometric.utils import negative_sampling

# Convert to tensors
x = torch.tensor(vectors, dtype=torch.float)
edge_index = torch.tensor(edge_index, dtype=torch.long)

# Create PyTorch Geometric data object
data = Data(x=x, edge_index=edge_index)

# Generate negative samples
neg_edge_index = negative_sampling(edge_index=edge_index, num_nodes=num_elements, num_neg_samples=edge_index.size(1))


In [None]:
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv

class GCNLinkPredictor(nn.Module):
    def __init__(self, in_channels, hidden_channels):
        super(GCNLinkPredictor, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.link_predictor = nn.Sequential(
            nn.Linear(2 * hidden_channels, hidden_channels),
            nn.ReLU(),
            nn.Linear(hidden_channels, 1)
        )

    def encode(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return x

    def decode(self, z, edge_index):
        src, dst = edge_index
        z_src = z[src]
        z_dst = z[dst]
        z_concat = torch.cat([z_src, z_dst], dim=1)
        return torch.sigmoid(self.link_predictor(z_concat)).squeeze()


In [None]:

# Initialize model, optimizer, and loss function
model = GCNLinkPredictor(in_channels=dim, hidden_channels=128)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = nn.BCELoss()

# Prepare training data
pos_edge_index = edge_index
neg_edge_index = neg_edge_index

# Combine positive and negative samples
train_edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=1)
train_labels = torch.cat([torch.ones(pos_edge_index.size(1)), torch.zeros(neg_edge_index.size(1))])


In [None]:

# Training loop
model.train()
for epoch in range(1, 101):
    optimizer.zero_grad()
    z = model.encode(data.x, data.edge_index)
    preds = model.decode(z, train_edge_index)
    loss = criterion(preds, train_labels)
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item():.4f}')


In [None]:
# Evaluation
model.eval()
with torch.no_grad():
    z = model.encode(data.x, data.edge_index)
    preds = model.decode(z, train_edge_index)
    predicted = (preds > 0.5).float()
    accuracy = (predicted == train_labels).sum().item() / train_labels.size(0)
    print(f'Accuracy: {accuracy:.4f}')