In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
filename = "email-Eu-core-temporal-Dept3.txt"
# filename = "email-Eu-core-temporal.txt"

In [3]:
import pyspark
import itertools
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Window 
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix, RowMatrix
import csv
import numpy as np
import pandas as pd

# Read data

In [4]:

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

# create a spark session
spark = SparkSession.builder.appName('link-prediction').getOrCreate()


In [5]:
# read data as list
with open(filename) as csvfile:
    data = [list(map(int, row)) for row in csv.reader(csvfile, delimiter=' ')]

# make a spark dataframe
columns = ["u", "v", "t"]
df = spark.createDataFrame(data=data, schema=columns)

df.toPandas()

Unnamed: 0,u,v,t
0,11,39,0
1,48,54,2635
2,69,60,4249
3,69,60,13362
4,33,84,16943
...,...,...,...
12211,26,46,45301515
12212,46,26,45301553
12213,16,73,45304269
12214,30,60,45305184


# Select based on timeframe

In [6]:
train_df = df.filter(F.col("t") < 20_000_000)
test_df = df.filter(F.col("t") >= 20_000_000).filter(F.col("t") < 22_000_000)

train_df.toPandas(), test_df.toPandas()

(       u   v         t
 0     11  39         0
 1     48  54      2635
 2     69  60      4249
 3     69  60     13362
 4     33  84     16943
 ...   ..  ..       ...
 5152  23  88  19987782
 5153  87  56  19988698
 5154  43  67  19989289
 5155  16  15  19989592
 5156  64  85  19993413
 
 [5157 rows x 3 columns],
       u   v         t
 0    54  49  20045846
 1    54  63  20045902
 2    67  43  20049149
 3    87  56  20051568
 4     2  24  20051831
 ..   ..  ..       ...
 653  60  16  21899410
 654  60  67  21899410
 655  60  54  21899410
 656  60  52  21899410
 657  60  15  21899410
 
 [658 rows x 3 columns])

In [7]:

edges = train_df\
  .drop("t")\
  .groupby("u", "v")\
  .count()

maxCount = edges\
  .select(
      F.max(edges["count"]).alias("maxCount")
  ).collect()[0]["maxCount"]

edges.orderBy("u", "v").toPandas()

Unnamed: 0,u,v,count
0,0,1,8
1,0,39,22
2,0,64,6
3,0,70,9
4,0,71,50
...,...,...,...
970,88,77,1
971,88,78,1
972,88,79,1
973,88,84,2


In [8]:
nodes = train_df\
  .select("u")\
  .intersect(edges.select("v"))\
  .withColumnRenamed("u", "node")\
  .orderBy("node")


nodes.write.mode("ignore").csv(f"{filename}_nodes.csv")

num_nodes = nodes.count()
nodes.toPandas()

Unnamed: 0,node
0,0
1,1
2,2
3,3
4,4
...,...
63,84
64,85
65,86
66,87


# Shortest Length
Bellman Ford algorithm

In [9]:
result = np.zeros((num_nodes, num_nodes))

for j, row in enumerate(nodes.collect()):
  node = row["node"]
  shortest_length = initial_shortest_length = nodes\
    .withColumn("shortest_length", F.when(nodes["node"] != node, F.lit(10000000)).otherwise(0))\
    .orderBy("node")
  for i in range(shortest_length.count()):
    temp = shortest_length\
      .join(edges.select("u", "v"), shortest_length["node"] == edges["u"])\
      .drop("node")\
      .withColumnRenamed("v", "_node")\
      .groupBy("_node")\
      .agg(F.min("shortest_length").alias("temp")) # join(-).groupBy(-).aggregate(min)
    shortest_length = shortest_length\
          .join(temp, shortest_length["node"] == temp["_node"])\
          .withColumn(
              "new_shortest_length", 
              F.when(shortest_length["shortest_length"] < temp["temp"] + 1, shortest_length["shortest_length"]).otherwise(temp["temp"] + 1)
          )\
          .drop("shortest_length")\
          .drop("temp")\
          .drop("_node")\
          .withColumnRenamed("new_shortest_length", "shortest_length")
  print(node)
  print(shortest_length.orderBy("node").select("shortest_length").toPandas())
  result[j] = np.array(shortest_length.orderBy("node").select("shortest_length").collect()).squeeze(1)
  print((j, node), result[j])

np.save(f"{filename.split('.')[0]}_shortest_length.npy", result)

# Katz

In [10]:
_katz_adj = nodes.withColumnRenamed("node", "_u")\
  .join(nodes.withColumnRenamed("node", "_v"))\
  .orderBy("_u", "_v")

beta = 1 / (1.5 * maxCount)

katz_edges = edges\
  .withColumn("value", edges["count"] * beta)\
  .drop("count")

katz_adj = _katz_adj\
  .join(
    katz_edges, 
    (_katz_adj._u == katz_edges.u)
      & (_katz_adj._v == katz_edges.v),
    'left'
  )\
  .select("_u", "_v", "value")\
  .fillna(0)\
  .withColumnRenamed("_u", "u")\
  .withColumnRenamed("_v", "v")

katz_adj.orderBy("u", "v").toPandas()

Unnamed: 0,u,v,value
0,0,0,0.000000
1,0,1,0.036782
2,0,2,0.000000
3,0,3,0.000000
4,0,4,0.000000
...,...,...,...
4619,88,84,0.009195
4620,88,85,0.000000
4621,88,86,0.004598
4622,88,87,0.000000


In [11]:
katz_w = Window.partitionBy("u").orderBy("v")
_katz_A = katz_adj\
  .withColumn("sorted_list", F.collect_list("value").over(katz_w))\
  .groupBy("u")\
  .agg(F.max("sorted_list").alias('row'))\
  .orderBy('u')\
  .withColumn("id", F.monotonically_increasing_id())
_katz_A.toPandas()

Unnamed: 0,u,row,id
0,0,"[0.0, 0.0367816091954023, 0.0, 0.0, 0.0, 0.0, ...",0
1,1,"[0.0, 0.0, 0.0, 0.0, 0.013793103448275862, 0.0...",1
2,2,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",2
3,3,"[0.0, 0.0, 0.0, 0.0, 0.004597701149425287, 0.0...",3
4,4,"[0.0, 0.03218390804597701, 0.0, 0.013793103448...",4
...,...,...,...
63,84,"[0.0, 0.0, 0.0, 0.0, 0.013793103448275862, 0.0...",63
64,85,"[0.0, 0.027586206896551724, 0.0, 0.0, 0.013793...",64
65,86,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",65
66,87,"[0.004597701149425287, 0.022988505747126436, 0...",66


In [12]:

katz_A = IndexedRowMatrix(_katz_A.select("id", "row").rdd.map(lambda row: IndexedRow(*row)))\
  .toBlockMatrix(num_nodes, num_nodes)
katz_matrix = IndexedRowMatrix(_katz_A.select("id", "row").rdd.map(lambda row: IndexedRow(*row)))\
  .toBlockMatrix(num_nodes, num_nodes)
katz_scores = IndexedRowMatrix(sc.parallelize([IndexedRow(_, [0] * num_nodes) for _ in range(num_nodes)]))\
  .toBlockMatrix(num_nodes, num_nodes)
katz_A.toLocalMatrix().toArray()

array([[0.        , 0.03678161, 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.0045977 ],
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.0045977 ],
       ...,
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.0045977 , 0.02298851, 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.0045977 , ..., 0.0045977 , 0.        ,
        0.        ]])

In [13]:

for i in range(100):
  katz_scores = katz_scores.add(katz_matrix)
  katz_matrix = katz_matrix.multiply(katz_A)
print(katz_scores.toLocalMatrix().toArray())


[[1.47664494e-01 4.74668715e-02 3.50135226e-04 ... 1.11246170e-03
  3.33094743e-03 9.81781564e-03]
 [4.51172153e-04 9.72238316e-03 2.44237483e-04 ... 3.00971279e-04
  3.13465094e-03 7.72989703e-03]
 [1.88755936e-04 4.89693189e-04 8.73085328e-04 ... 3.14241082e-04
  6.15268937e-04 5.84102148e-03]
 ...
 [7.14512013e-06 1.65050412e-05 6.60635301e-06 ... 6.76024601e-05
  8.50443858e-06 2.37824313e-04]
 [6.73279570e-03 4.32413365e-02 6.94024793e-04 ... 9.17286527e-04
  1.00838080e-02 4.21860374e-03]
 [2.17270227e-03 5.51505475e-03 6.70941441e-03 ... 7.00833240e-03
  2.39966614e-03 3.08617669e-02]]


In [14]:
np.save(f"{filename.split('.')[0]}_katz_scores.npy", katz_scores.toLocalMatrix().toArray())

# Hitting Time

In [15]:
counts = edges\
  .groupBy("u")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "total")\
  .orderBy("u")

counts.toPandas()

Unnamed: 0,u,total
0,0,136
1,1,45
2,2,18
3,3,11
4,4,116
...,...,...
64,84,30
65,85,90
66,86,3
67,87,86


In [16]:
_hitting_adj = nodes.withColumnRenamed("node", "_u")\
  .join(nodes.withColumnRenamed("node", "_v"))

hitting_adj = _hitting_adj\
  .join(
    edges, 
    (_hitting_adj._u == edges.u)
      & (_hitting_adj._v == edges.v),
    'left'
  )\
  .select("_u", "_v", "count")\
  .fillna(0)

hitting_adj = hitting_adj\
  .join(
      counts,
      counts.u == hitting_adj._u,
      'left'
  )\
  .fillna(0)\
  .withColumn("prob", F.expr("count / (total + 1e-15)"))\
  .select("_u", "_v", "prob")\
  .withColumnRenamed("_u", "u")\
  .withColumnRenamed("_v", "v")

hitting_adj.orderBy("u", "v").toPandas()

Unnamed: 0,u,v,prob
0,0,0,0.000000
1,0,1,0.058824
2,0,2,0.000000
3,0,3,0.000000
4,0,4,0.000000
...,...,...,...
4619,88,84,0.012500
4620,88,85,0.000000
4621,88,86,0.006250
4622,88,87,0.000000


In [17]:
%%time
n = 1000
w = Window.partitionBy('u').orderBy('v')
for j, row in enumerate(nodes.collect()):
  
  node = row["node"]
  print(j, node)

  hitting_T_df = hitting_adj\
    .withColumn(
      "_prob",
      F.when(hitting_adj["u"] != node, hitting_adj["prob"])\
        .otherwise(F.when(hitting_adj["v"] != node, 0.0)\
          .otherwise(1.0)\
        )\
    )\

  hitting_T_df = hitting_T_df\
    .withColumn("sorted_list", F.collect_list("_prob").over(w))\
    .groupBy("u")\
    .agg(F.max("sorted_list").alias("row"))\
    .orderBy("u")\
    .withColumn("id", F.monotonically_increasing_id())

  hitting_T = IndexedRowMatrix(hitting_T_df.select("id", "row").rdd.map(lambda row: IndexedRow(*row)))\
    .toBlockMatrix(num_nodes, num_nodes)
  hitting_matrix = IndexedRowMatrix(hitting_T_df.select("id", "row").rdd.map(lambda row: IndexedRow(*row)))\
    .toBlockMatrix(num_nodes, num_nodes)
  hitting_scores = IndexedRowMatrix(sc.parallelize([IndexedRow(_, [0] * num_nodes) for _ in range(num_nodes)]))\
    .toBlockMatrix(num_nodes, num_nodes)

  print(hitting_T.toLocalMatrix().toArray().sum(axis=1))
  for i in range(n):
    print(i, end=" ")
    hitting_scores = hitting_scores.add(hitting_matrix)
    hitting_matrix = hitting_matrix.multiply(hitting_T)
  
  
  print(hitting_matrix.toLocalMatrix().toArray()[:, j])
  scores = n - hitting_scores.toLocalMatrix().toArray()[:, j]
  np.save(f"{filename.split('.')[0]}_hitting-time_{j}.npy", scores)
  print(scores)

0
       u   v      prob    _prob
0      0   0  0.000000  1.00000
1      0   1  0.058824  0.00000
2      0   2  0.000000  0.00000
3      0   3  0.000000  0.00000
4      0   4  0.000000  0.00000
...   ..  ..       ...      ...
4619  88  84  0.012500  0.01250
4620  88  85  0.000000  0.00000
4621  88  86  0.006250  0.00625
4622  88  87  0.000000  0.00000
4623  88  88  0.000000  0.00000

[4624 rows x 4 columns]
[1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
[1.         1.         1.         1.         1.         1.
 1.         1.         1.         1.         0.90721649 1.
 1.         1.         0.9375     0.98611111 0.92307692 1.
 0.84269663 1.         0.98305085 0.89032258 0.91666667 0.94366197
 0. 

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


[1.         0.21797345 0.21357992 0.25268439 0.21064326 0.23283197
 0.23834266 0.23177025 0.22334217 0.21396695 0.21345273 0.36334185
 0.22680876 0.20591163 0.19312132 0.23925426 0.20810137 0.23513948
 0.1852749  0.20523831 0.22353125 0.17945383 0.20481837 0.21386742
 0.         0.21764102 0.2110696  0.21393701 0.19205406 0.20956656
 0.2575824  0.22238992 0.21244182 0.20222406 0.21195461 0.22320126
 0.21043026 0.17746809 0.20275183 0.20955071 0.21340157 0.22894207
 0.22339101 0.19765759 0.21846243 0.23743415 0.19428527 0.19404709
 0.20624405 0.2225796  0.20328655 0.24755638 0.21568098 0.21137503
 0.36484406 0.38022483 0.22827143 0.21110917 0.23334789 0.22041829
 0.2407165  0.75194502 0.23137803 0.20821815 0.2000186  0.31181213
 0.23497686 0.24849188]


KeyboardInterrupt: ignored

In [18]:
  
  
  print(hitting_matrix.toLocalMatrix().toArray()[:, j])
  scores = n - hitting_scores.toLocalMatrix().toArray()[:, j]
  np.save(f"{filename.split('.')[0]}_hitting-time_{j}.npy", scores)
  print(scores)

[1.         0.21797345 0.21357992 0.25268439 0.21064326 0.23283197
 0.23834266 0.23177025 0.22334217 0.21396695 0.21345273 0.36334185
 0.22680876 0.20591163 0.19312132 0.23925426 0.20810137 0.23513948
 0.1852749  0.20523831 0.22353125 0.17945383 0.20481837 0.21386742
 0.         0.21764102 0.2110696  0.21393701 0.19205406 0.20956656
 0.2575824  0.22238992 0.21244182 0.20222406 0.21195461 0.22320126
 0.21043026 0.17746809 0.20275183 0.20955071 0.21340157 0.22894207
 0.22339101 0.19765759 0.21846243 0.23743415 0.19428527 0.19404709
 0.20624405 0.2225796  0.20328655 0.24755638 0.21568098 0.21137503
 0.36484406 0.38022483 0.22827143 0.21110917 0.23334789 0.22041829
 0.2407165  0.75194502 0.23137803 0.20821815 0.2000186  0.31181213
 0.23497686 0.24849188]
[ 942.          992.36841053  992.72444219  990.50282824  992.90948158
  991.29116528  990.90965323  991.57960357  992.08667042  992.76792885
  992.19627218  983.4329155   991.93341672  993.11824723  993.36525868
  991.05492617  992.379843

# SimRank

Matrix multiplication: https://en.wikipedia.org/wiki/SimRank#:~:text=It%20is%20important%20to%20note,notion%20of%20similarity%20on%20relationships.