In [1]:
# processing pipeline
# (1) create feature vector for each vertex-vertex pair
# (2) data set is entire list of vertex_i-vertex_j pairs labled 0 (not connected) or 1 (connected)
# (3) split data into test and training data
# (4) train classifier on test data (feature vector plus label)
# (5) evaluate classifier on training data
#
# HERE we actually solve the following problem: given some features, is there an edge vertex_i-vertex_j 
# or not in the graph
# BUT in the original sense, link prediction tries to identify future vertex_i-vertex_j edges, that do not exist yet;
# this would require time-dependent graph data showing how the graph evolves over time; 
# and because feature values change with the graph, they would require being continuously updated 
# for training and testing data

In [8]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.2-spark3.0-s_2.12 pyspark-shell'

In [37]:
import findspark
findspark.init()

In [38]:
import pyspark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField

In [13]:
spark = SparkSession.builder \
            .master("local[4]") \
            .appName("linkprediction") \
            .getOrCreate() 



:: loading settings :: url = jar:file:/home/sarah/miniconda3/envs/Ml_Base/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/sarah/.ivy2/cache
The jars for the packages stored in: /home/sarah/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-811998f2-5336-427e-8c54-1ea0bba71c99;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.0-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
downloading https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar ...
	[SUCCESSFUL ] graphframes#graphframes;0.8.2-spark3.0-s_2.12!graphframes.jar (96ms)
:: resolution report :: resolve 3931ms :: artifacts dl 108ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.0-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number|

In [39]:
# make sure to create this dir first
spark.sparkContext.setCheckpointDir('/home/sarah/spark/_checkpoints')

In [40]:
from graphframes import GraphFrame

# Read Data

## Extract Edges


In [43]:
edgesDF = spark.read.csv("/home/sarah/spark/Documents/facebook/0.edges", header=False, sep=' ')
edgesDF = edgesDF.withColumnRenamed("_c0", "src")
edgesDF = edgesDF.withColumnRenamed("_c1", "dst")
# convert from string to long
edgesDF = edgesDF.withColumnRenamed("src", "src_old")
edgesDF = edgesDF.withColumn("src", col("src_old").cast("int"))
edgesDF = edgesDF.withColumnRenamed("dst", "dst_old")
edgesDF = edgesDF.withColumn("dst", col("dst_old").cast("int"))
edgesDF = edgesDF.selectExpr("src", "dst")

edgesDF.printSchema()

rows = edgesDF.take(5)
for row in rows:
    print(row)

print(edgesDF.count())

root
 |-- src: integer (nullable = true)
 |-- dst: integer (nullable = true)

Row(src=236, dst=186)
Row(src=122, dst=285)
Row(src=24, dst=346)
Row(src=271, dst=304)
Row(src=176, dst=9)
5038


## Extract Vertices

Selects the 'src' column from edgesDF and the 'dst' column from edgesDF, combines them using the union() method, and then removes duplicate values using the distinct() method

In [45]:

verticesDF = edgesDF.select('src').union(edgesDF.select('dst')).distinct() 

verticesDF = verticesDF.withColumnRenamed('src','id') # renames source to id
verticesDF.printSchema()

rows = verticesDF.take(5)
for row in rows:
    print(row)


print(verticesDF.count())

nbrVertices = verticesDF.agg({'id': 'max'}).collect()[0]['max(id)']

print("highest vertex id",nbrVertices)

Row(src=148)
Row(src=243)
Row(src=31)
Row(src=85)
Row(src=137)
root
 |-- id: integer (nullable = true)

Row(id=148)
Row(id=243)
Row(id=31)
Row(id=85)
Row(id=137)
333
highest vertex id 347


## Combine to graph

In [29]:
graph = GraphFrame(verticesDF, edgesDF)
print("network with",graph.vertices.count(),"vertices and",graph.edges.count(),"edges")
print("only",graph.edges.count()/(graph.vertices.count()*graph.vertices.count())*100,"% of all potential edges exist in this network!")

network with 333 vertices and 5038 edges
only 4.543282020759499 % of all potential edges exist in this network!


# Feature Engineering

In [None]:
# TODO implement additional features

## Common Neighbors

The code is intended to find the neighbors of each vertex and store them in the neighbors dictionary. The length of the neighbors dictionary would indicate the number of vertices for which neighbors were found

In [30]:
neighbors = {}
# Find all paths in graph from v to n
neighbors_df = graph.find('(v)-[e]->(n)').select('v','n')
neighbors_df.printSchema()

for v in range(1, nbrVertices + 1):
    neighbors_v_df = neighbors_df.filter(col('v')['id'] == v).select('n')
    if neighbors_v_df == None:
        neighbors[v] = []
    else:
        neighbors[v] = [n['n']['id'] for n in neighbors_v_df.collect()]
        
print(len(neighbors))

root
 |-- v: struct (nullable = false)
 |    |-- id: integer (nullable = true)
 |-- n: struct (nullable = false)
 |    |-- id: integer (nullable = true)

347


Broadcast variables allow you to efficiently share data across all nodes (executors) in a Spark cluster. By broadcasting the neighbors dictionary, you make it available to all tasks running on the cluster without the need to transfer the data over the network for each task.

In [31]:
neighbors_bc = spark.sparkContext.broadcast(neighbors)

In [32]:
def feature_neighbors(node):
    global neighbors_bc 
    if node in neighbors_bc.value:
        return neighbors_bc.value[node]
    return []

udf_feature_neighbors = udf(feature_neighbors, ArrayType(IntegerType()))

In [33]:
def feature_common_neighbors(neighbors_node1, neighbors_node2):
    common_neighbors = [node for node in neighbors_node1 if node in neighbors_node2] 
    return len(common_neighbors)

udf_feature_common_neighbors = udf(feature_common_neighbors, IntegerType())

# Train and Test Data

In [34]:
# create DataFrame with all (existing and non-existing) edges 

# existing edges are labeled as 1
edgesDF = edgesDF.withColumn("label", lit(1.0))
#edgesDF = edgesDF.withColumnRenamed("label", "label_old")
#edgesDF = edgesDF.withColumn("label", col("label_old").cast("float"))
edgesDF = edgesDF.selectExpr("src", "dst", "label")

# non-existing edges are labeled as 0
# TODO develop a pure dataframe solution
nonEdges = []

vertices = verticesDF.collect()
for v1 in vertices:
    for v2 in vertices:
        nonEdges.append((v1['id'],v2['id']))
            
nonEdgesDF = spark.createDataFrame(nonEdges, ['src','dst'])
nonEdgesDF = nonEdgesDF.exceptAll(edgesDF.drop('label')) # sets all edges to 0 except the ones that actually exist
nonEdgesDF = nonEdgesDF.withColumn("label", lit(0.0))
#nonEdgesDF = nonEdgesDF.withColumnRenamed("label", "label_old")
#nonEdgesDF = nonEdgesDF.withColumn("label", col("label_old").cast("int"))
nonEdgesDF = nonEdgesDF.selectExpr("src", "dst", "label")
                        
edgesDF = edgesDF.union(nonEdgesDF)
edgesDF.printSchema()

root
 |-- src: long (nullable = true)
 |-- dst: long (nullable = true)
 |-- label: double (nullable = false)



In [35]:
# add feature vector for each edge
# feature 1: common neighbors

edgesDF = edgesDF.withColumn("src_neighbors", udf_feature_neighbors("src"))
edgesDF = edgesDF.withColumn("dst_neighbors", udf_feature_neighbors("dst"))
edgesDF = edgesDF.withColumn("feature_common_neighbors", udf_feature_common_neighbors("src_neighbors", "dst_neighbors"))

assembler = VectorAssembler(
    inputCols=[x for x in edgesDF.columns if x.startswith("feature")],
    outputCol='features')

edgesDF = assembler.transform(edgesDF)

edgesDF.printSchema()
print(edgesDF.take(5))

root
 |-- src: long (nullable = true)
 |-- dst: long (nullable = true)
 |-- label: double (nullable = false)
 |-- src_neighbors: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- dst_neighbors: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- feature_common_neighbors: integer (nullable = true)
 |-- features: vector (nullable = true)



[Stage 2472:>                                                       (0 + 1) / 1]

[Row(src=236, dst=186, label=1.0, src_neighbors=[30, 84, 248, 318, 200, 133, 25, 26, 1, 67, 252, 314, 141, 315, 322, 21, 213, 276, 224, 122, 304, 272, 303, 13, 257, 121, 280, 69, 271, 88, 62, 105, 297, 169, 142, 186], dst_neighbors=[25, 88, 271, 331, 122, 223, 113, 272, 98, 325, 303, 252, 123, 26, 56, 277, 222, 213, 142, 104, 323, 188, 322, 239, 221, 128, 236, 21, 170, 200, 67, 285, 62, 45, 203, 59, 9, 345, 178, 199, 55, 109, 341], feature_common_neighbors=15, features=DenseVector([15.0])), Row(src=122, dst=285, label=1.0, src_neighbors=[251, 136, 239, 281, 67, 232, 109, 119, 213, 276, 45, 98, 66, 9, 322, 315, 104, 62, 156, 297, 113, 345, 176, 5, 277, 26, 186, 169, 21, 271, 304, 332, 325, 188, 236, 235, 224, 252, 344, 161, 342, 170, 272, 3, 56, 303, 55, 25, 261, 200, 203, 31, 123, 280, 142, 128, 284, 274, 141, 248, 323, 285], dst_neighbors=[188, 332, 322, 203, 252, 239, 26, 25, 113, 272, 67, 232, 345, 142, 98, 10, 109, 261, 342, 56, 211, 122, 119, 60, 186, 9, 304, 221, 196, 185, 303, 2

                                                                                

In [36]:
# split in train and test data
# TODO should be stratified
edgesDF = edgesDF.select("features","label")
edgesTrainDF, edgesTestDF = edgesDF.randomSplit([0.90, 0.10])
edgesTrainDF.count(), edgesTestDF.count()

                                                                                

(99712, 11177)

In [None]:
edgesTrainDF.printSchema()
edgesTrainDF.take(5)

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



[Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0)]

In [None]:
edgesTestDF.printSchema()
edgesTestDF.take(5)

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



[Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0),
 Row(features=DenseVector([0.0]), label=1.0)]

# Link Prediction Classifier

In [None]:
random_forest = RandomForestClassifier(featuresCol = "features", labelCol = "label")
random_forest_model = random_forest.fit(edgesTrainDF)

# Evaluate

In [None]:
test_random_forest_predictions_df = random_forest_model.transform(edgesTestDF)

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol='prediction', 
    metricName='accuracy')
result = evaluator.evaluate(test_random_forest_predictions_df)
print("overall accuracy",result)

overall accuracy 0.9640307523338825


In [None]:
test_random_forest_predictions_df.printSchema()
test_random_forest_predictions_df.take(5)

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



[Row(features=DenseVector([0.0]), label=1.0, rawPrediction=DenseVector([19.6964, 0.3036]), probability=DenseVector([0.9848, 0.0152]), prediction=0.0),
 Row(features=DenseVector([0.0]), label=1.0, rawPrediction=DenseVector([19.6964, 0.3036]), probability=DenseVector([0.9848, 0.0152]), prediction=0.0),
 Row(features=DenseVector([0.0]), label=1.0, rawPrediction=DenseVector([19.6964, 0.3036]), probability=DenseVector([0.9848, 0.0152]), prediction=0.0),
 Row(features=DenseVector([0.0]), label=1.0, rawPrediction=DenseVector([19.6964, 0.3036]), probability=DenseVector([0.9848, 0.0152]), prediction=0.0),
 Row(features=DenseVector([0.0]), label=1.0, rawPrediction=DenseVector([19.6964, 0.3036]), probability=DenseVector([0.9848, 0.0152]), prediction=0.0)]

## evaluation per class

In [None]:
labels_and_predictions_rdd = test_random_forest_predictions_df.selectExpr("label", "prediction").rdd
print(labels_and_predictions_rdd.first())

Row(label=1.0, prediction=0.0)


In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(labels_and_predictions_rdd)
for label in [0, 1]:
    print("class %s precision = %s" % (label, metrics.precision(label)))
    print("class %s recall = %s" % (label, metrics.recall(label)))

class 0 precision = 0.9921418303785338
class 0 recall = 0.9708364591147787
class 1 precision = 0.3665987780040733
class 1 recall = 0.6870229007633588


In [None]:
# TODO class based evaluation
# TODO compare with baseline / random predict
# TODO try different classifier algorithms