# Co-author Network Link Prediction - Tuning Node2vec model

https://cs.stanford.edu/~jure/pubs/node2vec-kdd16.pdf


## Experiment 1 - Binary Operator

https://stellargraph.readthedocs.io/en/stable/demos/link-prediction/node2vec-link-prediction.html#refs

In [6]:
# this could be rewritten to take in the function as an argument
def binary_operator(name,u,v):
    
    def operator_hadamard(u, v):
        return u * v

    def operator_l1(u, v):
        return np.abs(u - v)

    def operator_l2(u, v):
        return (u - v) ** 2

    def operator_avg(u, v):
        return (u + v) / 2.0

    if name=="hadamard":
        return operator_hadamard(u,v)
    elif name=="l1":
        return operator_l1(u,v)
    elif name=="average":
        return operator_avg(u,v)
    elif name=="l2":
        return operator_l2(u,v)
    

In [8]:
import pandas as pd

## The Coauthorship Graph

https://neo4j.com/blog/cypher-load-json-from-url/

In [9]:
from py2neo import  Graph, Node

graphdb = Graph(scheme="bolt", host="localhost", port=7687, secure=False, auth=('neo4j', 'test'))


## Spark

In [10]:
import findspark
findspark.init()

In [11]:
sc.stop()

NameError: name 'sc' is not defined

In [12]:
import pyspark
sc = pyspark.SparkContext(appName="link_prediction")

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("link_prediction") \
    .getOrCreate()


In [14]:
import pandas as pd 
test_df=pd.read_csv("data/testdf.csv")
training_df=pd.read_csv("data/trainingdf.csv")

In [15]:
training_data=spark.createDataFrame(training_df)

In [16]:
training_data.rdd.getNumPartitions()

4

In [11]:
#training_data = training_data.repartition(2000)
#training_data.rdd.getNumPartitions()

2000

In [12]:
training_data.show(n=5)

+-----+-----+-----+
|node1|node2|label|
+-----+-----+-----+
| 2625| 2628|    1|
| 2175| 2175|    0|
|  115|  116|    1|
| 1542| 1543|    1|
| 1994| 1998|    1|
+-----+-----+-----+
only showing top 5 rows



In [13]:
#training_data.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0| 2400|
|    1| 2400|
+-----+-----+



In [17]:
test_data=spark.createDataFrame(test_df)

In [15]:
#test_data = training_data.repartition(2000)
#test_data.rdd.getNumPartitions()

2000

In [None]:
test_data.groupby("label").count().show()

In [18]:
# create a graph projection
graphdb.run("""CALL gds.graph.create('early_graph',
    'Author', 
    {
        CO_AUTHOR_EARLY: {
                type: 'CO_AUTHOR_EARLY',
                orientation: 'UNDIRECTED'
                }
                }
                )""")

<py2neo.database.Cursor at 0x7fc04beab110>

In [19]:
graphdb.run("""CALL gds.graph.list""").evaluate()

'early_graph'

In [20]:
# create a graph projection
graphdb.run("""CALL gds.graph.create('late_graph',
    'Author', 
    {
        CO_AUTHOR: {
                type: 'CO_AUTHOR',
                orientation: 'UNDIRECTED'
                }
                }
                )""")

<py2neo.database.Cursor at 0x7fc04b28ec50>

In [None]:
# for some reason doesnt have this algorithm
# graphdb.run("""CALL gds.alpha.node2vec.stream('early_graph', {dimensions: 2})""")

In [21]:
import gensim.models
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import StructField, StructType,IntegerType

def apply_node2vec_features(data, graph_name, walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            output_col_name):
    
    params = {
    "pairs": [{"node1": row["node1"], "node2": row["node2"]}
    for row in data.collect()],
    "steps": walk_length,
    "walks": num_walks,
    "size": dimensions,
    "graph_name": graph_name
    }

    query=("""
    UNWIND $pairs as pair
    MATCH (p:Author) WHERE id(p) = pair.node1 OR id(p) = pair.node2
    WITH DISTINCT p
    CALL gds.alpha.randomWalk.stream($graph_name,{
        start: id(p),
        steps: $steps,
        walks: $walks
    })
    YIELD nodeIds
    RETURN [id in nodeIds | toString(id)] as walks
    """)

    random_walks=graphdb.run(query, params).to_series()
    
    model=gensim.models.Word2Vec(random_walks, sg=1, window=window_size, size=dimensions, min_count=1,
                                 workers=workers,iter=num_iter)

    vectors=[{"node1":row["node1"],
            "node2": row["node2"],
            output_col_name: Vectors.dense(
                binary_operator(operator_name, model.wv[str(row["node1"])], model.wv[str(row["node2"])]))
            } for row in data.collect()]
    
    schema = StructType([
        StructField('node1', IntegerType()),
        StructField('node2', IntegerType()),
        StructField(output_col_name, VectorUDT())])

    features=spark.createDataFrame(vectors, schema)
    return data.join(features, ["node1", "node2"])

In [None]:
p = 1.0
q = 1.0
dimensions = 128
num_walks = 10
walk_length = 80
window_size = 10
num_iter = 1
workers = 2
operator_name="hadamard"
training_data = apply_node2vec_features(training_data, 'early_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "hadamard_model")

In [None]:
training_data.show(n=5)

In [None]:
operator_name="average"
training_data = apply_node2vec_features(training_data, 'early_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "average_model")

In [None]:
operator_name="l1"
training_data = apply_node2vec_features(training_data, 'early_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "l1_model")

In [None]:
operator_name="l2"
training_data = apply_node2vec_features(training_data, 'early_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "l2_model")

In [None]:
training_data.show(n=3)

In [None]:
operator_name="hadamard"
test_data = apply_node2vec_features(test_data, 'late_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "hadamard_model")

In [None]:
operator_name="average"
test_data = apply_node2vec_features(test_data, 'late_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "average_model")

In [None]:
operator_name="l1"
test_data = apply_node2vec_features(test_data, 'late_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "l1_model")

In [None]:
operator_name="l2"
test_data = apply_node2vec_features(test_data, 'late_graph', walk_length, num_walks, dimensions, 
                            window_size, p, q, num_iter, workers, operator_name,
                            "l2_model")

In [None]:
test_data.show(n=3)

In [22]:
from pyspark.ml.feature import RFormula

rForm = RFormula()

from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder()\
    .addGrid(rForm.formula, [
    "label ~ hadamard_model",
    "label ~ average_model",
    "label ~ l1_model",
    "label ~ l2_model"])\
    .build()

params

[{Param(parent='RFormula_9b6d36284952', name='formula', doc='R model formula'): 'label ~ hadamard_model'},
 {Param(parent='RFormula_9b6d36284952', name='formula', doc='R model formula'): 'label ~ average_model'},
 {Param(parent='RFormula_9b6d36284952', name='formula', doc='R model formula'): 'label ~ l1_model'},
 {Param(parent='RFormula_9b6d36284952', name='formula', doc='R model formula'): 'label ~ l2_model'}]

In [23]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

rf = RandomForestClassifier(labelCol="label", 
        featuresCol="features",
        numTrees=30, maxDepth=10)

stages=[rForm, rf]
pipeline=Pipeline().setStages(stages)

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()\
    .setMetricName("areaUnderROC")\
    .setRawPredictionCol("prediction")\
    .setLabelCol("label")

In [25]:
import mycrossvalidator as mycv
from importlib import reload  
reload(mycv)

<module 'mycrossvalidator' from '/home/areias/Documents/DataScience/graphs/mycrossvalidator.py'>

In [None]:
cv = mycv.MyCrossValidator(parallelism=2)\
    .setEstimator(pipeline)\
    .setEvaluator(evaluator)\
    .setEstimatorParamMaps(params)\
    .setCollectSubModels(True)

In [26]:
from pyspark.sql.functions import lit

In [27]:
training_data=training_data.withColumn("fold",lit(0))

In [28]:
training_data=training_data.withColumn("test",lit(0))

In [29]:
test_data=test_data.withColumn("fold",lit(0))
test_data=test_data.withColumn("test",lit(1))

In [None]:
all_df=training_data.union(test_data)

In [None]:
mycvfitted, foldstats = cv.fit(all_df)

In [None]:
mycvfitted.avgMetrics

In [None]:
foldstats

In [26]:
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt  
%matplotlib inline

def get_roc_curve(model,test_data):
    
    predictions=model.transform(test_data)
    
    preds = predictions.select('label','probability')\
    .rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))\
    .collect()
     
    y_score, y_true = zip(*preds)
    fpr, tpr, thresholds = roc_curve(y_true, y_score, pos_label = 1)
    
    return fpr,tpr,thresholds

In [None]:
fpr_h,tpr_h,thresholds_h = get_roc_curve(mycvfitted.subModels[0][0], test_data)

In [None]:
fpr_a,tpr_a,thresholds_a = get_roc_curve(mycvfitted.subModels[0][1], test_data)

In [None]:
fpr_l1,tpr_l1,thresholds_l1 = get_roc_curve(mycvfitted.subModels[0][2], test_data)

In [None]:
fpr_l2,tpr_l2,thresholds_l2 = get_roc_curve(mycvfitted.subModels[0][3], test_data)

In [27]:
from cycler import cycler
def create_roc_plot():
    plt.style.use('classic')
    fig = plt.figure(figsize=(8, 6))
    plt.title('ROC Curve')
    plt.xlim([0, 1])
    plt.ylim([0, 1])
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.rc('axes', prop_cycle=(cycler('color',
    ['r', 'g', 'b', 'c', 'm', 'y', 'k'])))
    plt.plot([0, 1], [0, 1], linestyle='--', label='Random score (AUC = 0.50)')
    return plt, fig

def add_curve(plt, title, fpr, tpr, roc):
    plt.plot(fpr, tpr, label=f"{title} (AUC = {roc:0.2})")
    

In [None]:
plt,fig = create_roc_plot()

add_curve(plt, "Hadamard",
    fpr_h, tpr_h,
    mycvfitted.avgMetrics[0])

add_curve(plt, "Average",
    fpr_a, tpr_a,
    mycvfitted.avgMetrics[1])

add_curve(plt, "L1",
    fpr_l1, tpr_l1,
    mycvfitted.avgMetrics[2])

add_curve(plt, "L2",
    fpr_l2, tpr_l2,
    mycvfitted.avgMetrics[3])

plt.legend(loc='lower right')
plt.show()

## Experiment 2 - Return parameter *p* and In-out parameter *q*

In [38]:
training_data.show(n=1)

+-----+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|node1|node2|label|              model0|              model1|              model2|              model3|              model4|              model5|              model6|              model7|              model8|              model9|             model10|             model11|             model12|             model13|             model14|             model15|
+-----+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------

In [None]:
training_data=training_data.drop("hadamard_model", "average_model", "l1_model", "l2_model")
test_data=test_data.drop("hadamard_model", "average_model", "l1_model", "l2_model")

In [30]:
dimensions = 12
num_walks = 10
walk_length = 80
window_size = 10
num_iter = 1
workers = 2
operator_name="l1"

In [31]:
from sklearn.model_selection import ParameterGrid
grid = [{'p': [0.25],'q': [0.25]}]
list(ParameterGrid(grid))

[{'p': 0.25, 'q': 0.25}]

In [32]:
import numpy as np
for item in enumerate(ParameterGrid(grid)):
    model_name="model"+str(item[0])
    training_data = apply_node2vec_features(training_data, 'early_graph', walk_length, num_walks, dimensions, 
                            window_size, item[1]['p'], item[1]['q'], num_iter, workers, operator_name,
                            model_name)
    test_data = apply_node2vec_features(test_data, 'late_graph', walk_length, num_walks, dimensions, 
                            window_size, item[1]['p'], item[1]['q'], num_iter, workers, operator_name,
                            model_name)

In [33]:
["label ~ model"+str(i) for i in range(1)]

['label ~ model0']

In [34]:
params = ParamGridBuilder()\
    .addGrid(rForm.formula, ["label ~ model"+str(i) for i in range(1)])\
    .build()
params

[{Param(parent='RFormula_9b6d36284952', name='formula', doc='R model formula'): 'label ~ model0'}]

In [40]:
training_data=training_data.withColumn("fold",lit(0))
training_data=training_data.withColumn("test",lit(0))

test_data=test_data.withColumn("fold",lit(0))
test_data=test_data.withColumn("test",lit(1))

In [35]:
all_df=training_data.union(test_data)

In [36]:
all_df.show(n=2)

+-----+-----+-----+----+----+--------------------+
|node1|node2|label|fold|test|              model0|
+-----+-----+-----+----+----+--------------------+
|   22|   25|    1|   0|   0|[0.11191907525062...|
|  412| 2326|    0|   0|   0|[0.02781665325164...|
+-----+-----+-----+----+----+--------------------+
only showing top 2 rows



In [42]:
#all_df = all_df.repartition(2000)

In [37]:
all_df.rdd.getNumPartitions()

400

In [38]:
cv = mycv.MyCrossValidator()\
    .setEstimator(pipeline)\
    .setEvaluator(evaluator)\
    .setEstimatorParamMaps(params)\
    .setCollectSubModels(False)

In [39]:
mycvfitted, foldstats = cv.fit(all_df)

In [40]:
mycvfitted.avgMetrics

[0.8271662041319765]

In [41]:
foldstats

[[0.8271662041319765]]

In [43]:
import pickle

with open('/home/areias/Documents/DataScience/graphs/results.pkl', 'wb') as f:
    pickle.dump(mycvfitted.avgMetrics, f)

In [44]:
mycvfitted.bestModel

PipelineModel_3abf13843c5f