In [None]:
#!/usr/bin/python3
import sys
sys.path.insert(0, '/home/marco_dossena/PHD/KGEmbeddings/codes')
import argparse

import numpy as np
import torch
import os
from collections import defaultdict
import random
from tqdm.notebook import tqdm

from codes.model import KGEModel
from codes.dataloader import TrainDataset, TestDataset
from codes.triplets import TripletsEngine

# --- Configuration ---
# The dimensionality of your embeddings (e.g., 50, 100, 200)
EMBEDDING_DIM = 256
# The number of nearest neighbors to retrieve in each search step
K_NEIGHBORS = 25
K_RESULTS = 10
MODEL_PATH = "/home/marco_dossena/PHD/KGEmbeddings/models/TransE_FB15k_0/"
# MODEL_PATH = "/home/marco_dossena/PHD/KGEmbeddings/models/RotatE_FB15k_0/"
DICTS_DIR = '/home/marco_dossena/PHD/KGEmbeddings/data/FB15k'

random.seed(42)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

number_of_entities = 14951
number_of_relations = 1345

args = {
    "model": "TransE",
    "hidden_dim": EMBEDDING_DIM,
    "gamma": 24.0,
    "double_entity_embedding": False,
    "double_relation_embedding": False,
    "do_train": False,
    "test_batch_size": 512,
    "cpu_num": 16,
    "cuda": True,
    "test_log_steps": 1000,
    "nentity": number_of_entities,
    "nrelation": number_of_relations,
    "mode": "tail-batch"
}

class DictToObject:
    def __init__(self, dictionary):
        for key, value in dictionary.items():
            setattr(self, key, value)

args = DictToObject(args)

In [None]:
kge_model = KGEModel(
    model_name=args.model,
    nentity=number_of_entities,
    nrelation=number_of_relations,
    hidden_dim=args.hidden_dim,
    gamma=args.gamma,
    double_entity_embedding=args.double_entity_embedding,
    double_relation_embedding=args.double_relation_embedding
).to(device)

print("Loading checkpoint...")
checkpoint = torch.load(os.path.join(MODEL_PATH, 'checkpoint'))
init_step = checkpoint['step']
kge_model.load_state_dict(checkpoint['model_state_dict'])

if args.do_train:
    current_learning_rate = checkpoint['current_learning_rate']
    warm_up_steps = checkpoint['warm_up_steps']
    # optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

entity_embedding = torch.from_numpy(np.load(os.path.join(MODEL_PATH, 'entity_embedding.npy')))
relation_embedding = torch.from_numpy(np.load(os.path.join(MODEL_PATH, 'relation_embedding.npy')))

kg = TripletsEngine(os.path.join(DICTS_DIR), from_splits=True)

In [None]:
# chat con interessante discorso su questo: https://chatgpt.com/c/68c0325c-18c8-832b-b7fe-3eb459d9c9b8
# TODO: Implementare predict cont RotatE

def predict(head_id, relation_id, tail_id, entity_embeddings, relation_embeddings, mode = "tail-batch", top_k=10):
    head = entity_embeddings[head_id]
    rel = relation_embeddings[relation_id]
    tail = entity_embeddings[tail_id]

    if mode == "head-batch":
        target = tail - rel
    else:
        target = head + rel

    # L1 distance to all entities
    distances = torch.norm(entity_embeddings - target, p=1, dim=1)

    # scores = -distances

    # # Softmax normalization
    # probs = torch.softmax(scores / 1.0, dim=0)  # (num_entities,)
    # best_ids = torch.topk(probs, top_k).indices
    # return best_ids, distances[best_ids]

    # - to get largest scores
    best_ids = torch.topk(-distances, top_k).indices
    return best_ids, distances[best_ids]

In [None]:
# metrics = kge_model.test_step(kge_model, kg.triplets[kg.train_set][:1000], kg.triplets, args)
# ids = np.random.randint(0, len(kg.triplets[kg.test_set]), size=1)[0]
# target_head, target_relation, target_tail = kg.triplets[kg.test_set][ids]

# metrics = kge_model.single_test_step(kge_model, (target_head, target_relation, target_tail), kg.triplets, args)

# print("Target triplet:", (int(target_head), int(target_relation), int(target_tail)))

# if args.mode == 'head-batch':
#     print("Target: ", int(target_head))
# elif args.mode == 'tail-batch':
#     print("Target: ", int(target_tail))

# print(metrics)

# top_ids, dists = predict(int(target_head), int(target_relation), int(target_tail), entity_embedding, relation_embedding, mode=args.mode, top_k=K_NEIGHBORS)

# try:
#     print(torch.argwhere(top_ids == target_tail if args.mode == 'tail-batch' else top_ids == target_head).item() + 1)
# except:
#     print("Not found in top {} neighbors".format(K_NEIGHBORS))

In [None]:
# (head, relation) -> list of tails
h2t = defaultdict(list)

# (relation, tail) -> list of heads
r2h = defaultdict(list)

for h, r, t in kg.triplets:
    h2t[(h, r)].append(t)
    r2h[(r, t)].append(h)

In [None]:
n = 100
ids = np.random.randint(0, len(kg.triplets), size=n)
mrr = []
recall = []

# TODO: Fare search in vector space batched

for id in tqdm(ids):
    target_head, target_relation, target_tail = kg.triplets[id]

    # print("Target triplet:", (int(target_head), int(target_relation), int(target_tail)))

    if args.mode == 'head-batch':
        targets = r2h[(target_relation, target_tail)]
    else:
        targets = h2t[(target_head, target_relation)]

    # print("All correct answers: ", [int(t) for t in targets])

    metrics = kge_model.single_test_step(kge_model, (target_head, target_relation, target_tail), kg.triplets, args)
    mrr.append(metrics['MRR'])

    # print("Target triplet:", (int(target_head), int(target_relation), int(target_tail)))

    # if args.mode == 'head-batch':
    #     print("Target: ", int(target_head))
    # elif args.mode == 'tail-batch':
    #     print("Target: ", int(target_tail))

    # print(metrics)

    top_ids, dists = predict(int(target_head), int(target_relation), int(target_tail), entity_embedding, relation_embedding, mode=args.mode, top_k=max(K_NEIGHBORS, int(len(targets)*1.5)))

    # print(torch.isin(top_ids, torch.tensor(targets)))
    recall.append(torch.isin(top_ids, torch.tensor(targets)).sum().item() / len(targets))
    # print(torch.isin(top_ids, torch.tensor(targets)).sum().item() / len(targets))

print(f"Average MRR over {n} random triplets: {np.mean(mrr)}")
print(f"Average Recall over {n} random triplets: {np.mean(recall)}")