In [4]:
"""
testing the simple heuristics Personalized PageRank, Adamic Adar and Common Neighbours

This is the directed test graph
   -> 0 -> 1 <-
   |          |
    --- 2 <---
"""

import unittest
import math

import torch
from torch import tensor
import scipy.sparse as ssp
import numpy as np

from heuristics import AA, PPR, CN, RA

In [6]:
edge_index = tensor([[0, 2, 2, 1], [1, 0, 1, 2]]).t()
edge_weight = torch.ones(edge_index.size(0), dtype=torch.int)
test_edges = tensor([[0, 1], [1, 2]]).t()
num_nodes = 3
A = ssp.csr_matrix((edge_weight, (edge_index[:, 0], edge_index[:, 1])),
                        shape=(num_nodes, num_nodes))#, dtype=np.float)
neg_test_edges = tensor([[0, 1], [2, 0]]).t()

# create a graph with 2 isomorphic nodes 2 & 3
iso_edge_index = tensor([[2, 2, 3, 3, 4, 0], [1, 4, 1, 4, 0, 1]]).t()
iso_edge_weight = torch.ones(iso_edge_index.size(0), dtype=int)
iso_test_edges = tensor([[2, 3], [0, 0]]).t()
iso_num_nodes = 5

square1 = np.array([[0, 1, 1, 2, 2, 3, 3, 0],
                    [1, 0, 2, 1, 3, 2, 0, 3]])
square2 = square1 + 4
bridge = np.array([[0, 4],
                    [4, 0]])

common_neigbours = np.array([[0, 9, 9, 4, 0, 8, 4, 8],
                                [9, 0, 4, 9, 8, 0, 8, 4]])

ei = torch.tensor(np.concatenate([square1, square2, bridge, common_neigbours], axis=1), dtype=torch.long)
ew = torch.ones(ei.size(1), dtype=int)
num_nodes = 10
A1 = ssp.csr_matrix((ew, (ei[0], ei[1])), shape=(num_nodes, num_nodes))

In [19]:
def test_CN(A, edge_index, neg_test_edges, test_edges):
    train_scores, edge_index = CN(A, edge_index)
    t1 = np.array_equal(train_scores, np.array([0, 1, 0, 0]))
    neg_scores, edge_index = CN(A, neg_test_edges)
    t2 = np.array_equal(neg_scores, np.array([1, 0]))
    pos_scores, edge_index = CN(A, test_edges)
    t3 = np.array_equal(pos_scores, np.array([0, 0]))
    return t1, t2, t3

def test_AA(A, edge_index, neg_test_edges, test_edges):
    train_scores, edge_index = AA(A, edge_index)
    print(train_scores)
    a1 = np.allclose(train_scores, np.array([0, 1 / math.log(2), 0, 0]))
    neg_scores, edge_index = AA(A, neg_test_edges)
    a2 = np.allclose(neg_scores, np.array([1 / math.log(2), 0]))
    pos_scores, edge_index = AA(A, test_edges)
    a3 = np.allclose(pos_scores, np.array([0, 0]))
    return a1, a2, a3

def test_RA(A, edge_index, neg_test_edges, test_edges):
    train_scores, edge_index = RA(A, edge_index)
    print(train_scores)
    r1 = np.allclose(train_scores, np.array([0, 1 / 2, 0, 0]))
    neg_scores, edge_index = RA(A, neg_test_edges)
    r2 = np.allclose(neg_scores, np.array([1 / 2, 0]))
    pos_scores, edge_index = RA(A, test_edges)
    r3 = np.allclose(pos_scores, np.array([0, 0]))
    return r1, r2, r3

def test_iso_graph(A, iso_edge_weight, iso_edge_index, iso_num_nodes):
    A = ssp.csr_matrix((iso_edge_weight, (iso_edge_index[:, 0], iso_edge_index[:, 1])),
                        shape=(iso_num_nodes, iso_num_nodes))
    aa_test_scores, edge_index = AA(A, iso_test_edges)
    print(aa_test_scores)
    s1 = np.array_equal(aa_test_scores[0], aa_test_scores[1])
    cn_test_scores, edge_index = CN(A, iso_test_edges)
    print(cn_test_scores)
    s2 = np.array_equal(cn_test_scores[0], cn_test_scores[1])
    ppr_test_scores, edge_index = PPR(A, iso_test_edges)
    print(ppr_test_scores)
    s3 = np.array_equal(ppr_test_scores[0], ppr_test_scores[1])
    return s1, s2, s3

In [23]:
t_CN = test_CN(A, edge_index, neg_test_edges, test_edges)
t_AA = test_AA(A, edge_index, neg_test_edges, test_edges)
t_RA = test_CN(A, edge_index, neg_test_edges, test_edges)
t_iso = test_iso_graph(A, iso_edge_weight, iso_edge_index, iso_num_nodes)
print(t_CN, '------------------', t_AA, '------------------', t_RA, '------------------', t_iso, sep='\n')

100%|██████████| 1/1 [00:00<00:00, 1002.22it/s]


evaluated Common Neighbours for 4 edges


100%|██████████| 1/1 [00:00<00:00, 1002.46it/s]


evaluated Common Neighbours for 2 edges


100%|██████████| 1/1 [00:00<00:00, 1002.94it/s]


evaluated Common Neighbours for 2 edges


100%|██████████| 1/1 [00:00<00:00, 500.93it/s]


evaluated Adamic Adar for 4 edges
tensor([0.0000, 1.4427, 0.0000, 0.0000])


100%|██████████| 1/1 [00:00<00:00, 1003.66it/s]


evaluated Adamic Adar for 2 edges


100%|██████████| 1/1 [00:00<00:00, 1003.42it/s]


evaluated Adamic Adar for 2 edges


100%|██████████| 1/1 [00:00<00:00, 1004.86it/s]


evaluated Common Neighbours for 4 edges


100%|██████████| 1/1 [00:00<00:00, 501.59it/s]


evaluated Common Neighbours for 2 edges


100%|██████████| 1/1 [00:00<00:00, 1001.27it/s]


evaluated Common Neighbours for 2 edges


100%|██████████| 1/1 [00:00<00:00, 501.41it/s]


evaluated Adamic Adar for 2 edges
tensor([0.9102, 0.9102])


100%|██████████| 1/1 [00:00<00:00, 501.47it/s]


evaluated Common Neighbours for 2 edges
tensor([1., 1.])


100%|██████████| 2/2 [00:00<00:00, 501.53it/s]

evaluated PPR for 2 edges
tensor([0.1434, 0.1434])
(True, True, True)
------------------
(True, True, True)
------------------
(True, True, True)
------------------
(True, True, True)





In [None]:
# class HeuristicTests(unittest.TestCase):
#     def setUp(self):
#         self.edge_index = tensor([[0, 2, 2, 1], [1, 0, 1, 2]]).t()
#         self.edge_weight = torch.ones(self.edge_index.size(0), dtype=torch.int)
#         self.test_edges = tensor([[0, 1], [1, 2]]).t()
#         self.num_nodes = 3
#         self.neg_test_edges = tensor([[0, 1], [2, 0]]).t()
#         self.A = ssp.csr_matrix((self.edge_weight, (self.edge_index[:, 0], self.edge_index[:, 1])),
#                                 shape=(self.num_nodes, self.num_nodes), dtype=np.float)
#         # create a graph with 2 isomorphic nodes 2 & 3
#         self.iso_edge_index = tensor([[2, 2, 3, 3, 4, 0], [1, 4, 1, 4, 0, 1]]).t()
#         self.iso_edge_weight = torch.ones(self.iso_edge_index.size(0), dtype=int)
#         self.iso_test_edges = tensor([[2, 3], [0, 0]]).t()
#         self.iso_num_nodes = 5

#         square1 = np.array([[0, 1, 1, 2, 2, 3, 3, 0],
#                             [1, 0, 2, 1, 3, 2, 0, 3]])
#         square2 = square1 + 4
#         bridge = np.array([[0, 4],
#                            [4, 0]])

#         common_neigbours = np.array([[0, 9, 9, 4, 0, 8, 4, 8],
#                                      [9, 0, 4, 9, 8, 0, 8, 4]])

#         self.ei = torch.tensor(np.concatenate([square1, square2, bridge, common_neigbours], axis=1), dtype=torch.long)
#         ew = torch.ones(self.ei.size(1), dtype=int)
#         num_nodes = 10
#         self.A1 = ssp.csr_matrix((ew, (self.ei[0], self.ei[1])), shape=(num_nodes, num_nodes))

#     def test_CN(self):
#         train_scores, edge_index = CN(self.A, self.edge_index)
#         self.assertTrue(np.array_equal(train_scores, np.array([0, 1, 0, 0])))
#         neg_scores, edge_index = CN(self.A, self.neg_test_edges)
#         self.assertTrue(np.array_equal(neg_scores, np.array([1, 0])))
#         pos_scores, edge_index = CN(self.A, self.test_edges)
#         self.assertTrue(np.array_equal(pos_scores, np.array([0, 0])))

#     def test_AA(self):
#         train_scores, edge_index = AA(self.A, self.edge_index)
#         print(train_scores)
#         self.assertTrue(np.allclose(train_scores, np.array([0, 1 / math.log(2), 0, 0])))
#         neg_scores, edge_index = AA(self.A, self.neg_test_edges)
#         self.assertTrue(np.allclose(neg_scores, np.array([1 / math.log(2), 0])))
#         pos_scores, edge_index = AA(self.A, self.test_edges)
#         self.assertTrue(np.allclose(pos_scores, np.array([0, 0])))

#     def test_RA(self):
#         train_scores, edge_index = RA(self.A, self.edge_index)
#         print(train_scores)
#         self.assertTrue(np.allclose(train_scores, np.array([0, 1 / 2, 0, 0])))
#         neg_scores, edge_index = RA(self.A, self.neg_test_edges)
#         self.assertTrue(np.allclose(neg_scores, np.array([1 / 2, 0])))
#         pos_scores, edge_index = RA(self.A, self.test_edges)
#         self.assertTrue(np.allclose(pos_scores, np.array([0, 0])))

#     def test_iso_graph(self):
#         A = ssp.csr_matrix((self.iso_edge_weight, (self.iso_edge_index[:, 0], self.iso_edge_index[:, 1])),
#                            shape=(self.iso_num_nodes, self.iso_num_nodes))
#         aa_test_scores, edge_index = AA(A, self.iso_test_edges)
#         print(aa_test_scores)
#         self.assertTrue(aa_test_scores[0] == aa_test_scores[1])
#         cn_test_scores, edge_index = CN(A, self.iso_test_edges)
#         print(cn_test_scores)
#         self.assertTrue(cn_test_scores[0] == cn_test_scores[1])
#         ppr_test_scores, edge_index = PPR(A, self.iso_test_edges)
#         print(ppr_test_scores)
#         self.assertTrue(ppr_test_scores[0] == ppr_test_scores[1])