# Setup

Install the necessary libraries in your Colab notebook environment and connect to your hosted Neo4J Sandbox.

In [181]:
!pip install neo4j pyspark



In [153]:
ip = "54.172.14.140"
bolt_port = "7687"
username = "neo4j"
password = "rifle-sponsor-beliefs"

In [154]:
from neo4j import GraphDatabase

driver = GraphDatabase.driver("bolt://" + ip + ":" + bolt_port, auth=(username, password))

print(driver.address) # your-sandbox-ip:your-sandbox-bolt-port

54.172.14.140:7687


In [182]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [183]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [184]:
save_folder = '/content/gdrive/My Drive/IASD_vacations/IASD_link_prediction/link-prediction/notebooks/data/'

# Objective

We are going to train a binary classifier to predict wether a link should exist between two *Author* nodes.

Each pair of *Author*s will be described with a feature vector and labeled with either 1 (if these two authors have collaborated) or 0 (if they have not).

# Feature Engineering

Let's generate features for our link prediction classifier. These features will describe a pair of *Author*s by using:
- graph topology measures
- community detection measures


We will identify the nodes by their ID, compute graph measures for these nodes in Neo4J and return a DataFrame with these new features describing each pair of nodes in the train and test set.

Load the CSV files saved in the train/test notebook.

In [146]:
df_train_under = spark.read.csv(save_folder + 'df_train_under.csv/*.csv', header=True, inferSchema=True).cache()
df_test_under = spark.read.csv(save_folder + 'df_test_under.csv/*.csv', header=True, inferSchema=True).cache()

Firstly, for Neo4J to be able to manipulate our train and test pairs, we need to feed them as lists of dictionaries. This will enable us to consider each element in this list as a parameter for a Neo4J query. This element's attributes will be accessible to the query.


- Transform each data frame into a list of dictionaries in the following form

```
[
  {
  "node1": 15589,
  "node2": 2567,
  "label": 1
  } ,
  ... ,
  {
  "node1": 5466,
  "node2": 78122,
  "label": 0
  }
]

```

In [None]:
### Using Python's list comprehension syntax

df_train_under_pairs = [{"node1": ... , "node2": ... , "label": ...}  for ... , ... , ... in df_train_under. ...]
df_test_under_pairs = [{"node1": ... , "node2": ... , "label": ...}  for ... , ... , ... in df_test_under. ...]

In [147]:
#@title Solution

df_train_under_pairs = [{"node1": node1, "node2": node2, 'label':label}  for node1, node2, label in df_train_under.select("node1", "node2", "label").collect()]
df_test_under_pairs = [{"node1": node1, "node2": node2, 'label':label}  for node1, node2, label in df_test_under.select("node1", "node2", "label").collect()]

## Generating graphy features

We will start by creating 3 features extracted from the graph topology to describe each pair of nodes: 
- [common neighbors](https://neo4j.com/docs/graph-data-science/current/alpha-algorithms/common-neighbors/)
- [preferential attachment](https://neo4j.com/docs/graph-data-science/current/alpha-algorithms/preferential-attachment/)
- [total neighbors](https://neo4j.com/docs/graph-data-science/current/alpha-algorithms/total-neighbors/)

We want a final Data Frame with the following structure:

| node1 | node2 | label | cn | pa | tn |

By using the *UNWIND* clause, we can manipulate each element in a list as an individual row in Cypher.

For example:
```
# A list of students dictionaries
my_list = [{ "id": '0001', "age": 28}, {"id": 0002, "age": 35}]

# A parameterized query to retrieve each students' name from its attributes
query = 
""" 
  UNWIND $list_of_students as student // We use a dollar sign to denote variables
    MATCH (s:Student) 
    WHERE ID(s) = student.id AND s.age = student.age
    RETURN s.name
"""

with driver.session() as session:
    result = session.run(query=query, parameters={"list_of_students": my_list})
```

- Complete the following function to compute the 3 graph measures for pairs of nodes.





In [136]:
def add_graphy_features(pairs, rel_type):
    # Common neighbors
    query = """
      UNWIND $... AS ...
        MATCH (p1) WHERE ... = ...
        MATCH (p2) WHERE ... = ...
        RETURN 
          ... AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.commonNeighbors(... , ... , {relationshipQuery: $...}) AS cn
    """
    params = {
        "...": ... , 
        "...": ...
    }
    with driver.session() as session:
        result = session.run(query, params)
        features_cn = spark.createDataFrame([dict(record) for record in result]) 

    # Preferential attachment
    query = """
      UNWIND $... AS ...
        MATCH (p1) WHERE ... = ...
        MATCH (p2) WHERE ... = ...
        RETURN 
          ... AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.preferentialAttachment(... , ... , {relationshipQuery: $...}) AS pa
    """
    params = {
        "...": ... , 
        "...": ...
    }
    with driver.session() as session:
        result = session.run(query, params)
        features_pa = spark.createDataFrame([dict(record) for record in result])

    # Total neighbors
    query = """
      UNWIND $... AS ...
        MATCH (p1) WHERE ... = ...
        MATCH (p2) WHERE ... = ...
        RETURN 
          ... AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.totalNeighbors(... , ... , {relationshipQuery: $...}) AS tn
    """
    params = {
        "...": ... , 
        "...": ...
    }
    with driver.session() as session:
        result = session.run(quer, params)
        features_tn = spark.createDataFrame([dict(record) for record in result])  

    # Join the three feature dfs
    final_df = ... .join(
        ... , on=[...], how='...'
        ).join(
        ... , on=[...], how='...'
        )

    return final_df

In [51]:
#@title Hint

def add_graphy_features(pairs, rel_type):
    # Common neighbors
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = ...
        MATCH (p2) WHERE ID(p2) = ...
        RETURN 
          ... AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.commonNeighbors(... , ... , {relationshipQuery: $relType}) AS cn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_cn = spark.createDataFrame([dict(record) for record in result]) 

    # Preferential attachment
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = ...
        MATCH (p2) WHERE ID(p2) = ...
        RETURN 
          ... AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.preferentialAttachment(... , ... , {relationshipQuery: $relType}) AS pa
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_pa = spark.createDataFrame([dict(record) for record in result])

    # Total neighbors
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = ...
        MATCH (p2) WHERE ID(p2) = ...
        RETURN 
          ... AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.totalNeighbors(... , ... , {relationshipQuery: $relType}) AS tn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_tn = spark.createDataFrame([dict(record) for record in result])  

    # Join the three feature dfs
    final_df = ... .join(
        ... , on=["node1", "node2", "label"], how='inner'
        ).join(
        ... , on=["node1", "node2", "label"], how='inner'
        )

    return final_df

In [171]:
#@title Solution

def add_graphy_features(pairs, rel_type):
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = pair.node2
        RETURN 
          pair.node1 AS node1,
          pair.node2 AS node2,
          pair.label AS label,
          gds.alpha.linkprediction.commonNeighbors(p1, p2, {relationshipQuery: $relType}) AS cn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_cn = spark.createDataFrame([dict(record) for record in result]) 

    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = pair.node2
        RETURN 
          pair.node1 AS node1,
          pair.node2 AS node2,
          pair.label AS label,
          gds.alpha.linkprediction.preferentialAttachment(p1, p2, {relationshipQuery: $relType}) AS pa
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_pa = spark.createDataFrame([dict(record) for record in result])

    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = pair.node2
        RETURN 
          pair.node1 AS node1,
          pair.node2 AS node2,
          pair.label AS label,
          gds.alpha.linkprediction.totalNeighbors(p1, p2, {relationshipQuery: $relType}) AS tn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_tn = spark.createDataFrame([dict(record) for record in result])  

    final_df = features_cn.join(
        features_pa, on=["node1", "node2", "label"], how='inner'
        ).join(
        features_tn, on=["node1", "node2", "label"], how='inner'
        )

    return final_df

Let's apply the function to our training DataFrame and do a quick sanity check of the number of resulting rows (that should be the same).



In [172]:
print('Counts before applying graph features engineering: ')
print(df_train_under.count())
print(df_test_under.count())

df_train_under_graph_f = add_graphy_features(df_train_under_pairs, "CO_AUTHOR_EARLY")
df_test_under_graph_f = add_graphy_features(df_test_under_pairs, "CO_AUTHOR_LATE")

print('Counts after applying graph features engineering: ')
print(df_train_under_graph_f.count())
print(df_test_under_graph_f.count())

Counts before applying graph features engineering: 
162234
121380
Counts after applying graph features engineering: 
162234
121380


Let's see how it looks:

In [173]:
df_train_under_graph_f.filter(F.col('label') == 0).show(5)
df_test_under_graph_f.filter(F.col('label') == 1).show(5)

+-----+------+-----+---+----+----+
|node1| node2|label| cn|  pa|  tn|
+-----+------+-----+---+----+----+
|  975|224292|    0|0.0|36.0|20.0|
|  979|205473|    0|2.0|65.0|16.0|
|  980| 58023|    0|1.0|27.0|11.0|
|  980|191694|    0|0.0| 9.0| 6.0|
| 1028|  9854|    0|1.0|24.0|10.0|
+-----+------+-----+---+----+----+
only showing top 5 rows

+------+------+-----+---+----+----+
| node1| node2|label| cn|  pa|  tn|
+------+------+-----+---+----+----+
| 84788| 84790|    1|2.0| 9.0| 4.0|
|117647|117649|    1|5.0|48.0| 9.0|
|139973|236514|    1|1.0|10.0| 6.0|
| 77249|188365|    1|4.0|60.0|13.0|
|129886|129887|    1|1.0| 6.0| 4.0|
+------+------+-----+---+----+----+
only showing top 5 rows



## Generating community features

Community detection algorithms evaluate how a group is clustered or partitioned. Nodes are considered more similar to nodes that fall in their community than to nodes in other communities.

We will extract one feature based on a community detection algorithm:

- [Louvain community](https://neo4j.com/docs/graph-data-science/current/algorithms/louvain/)

The Louvain algorithm returns intermediate communities found in the graph. We will add a property to each node containing the first community that the algorithm found for this specific node. Thus, it will constitue a categorical feature : the first community to which each node belongs. As we are considering pairs of nodes, we will derive a *'same_community_louvain'* binary feature (True or False) to further describe each pair of nodes. 

Note that we need to restrict the execution of the Louvain community detection algorithm to the train and test subgraphs separately.

- Set a property on each node in the *CO_AUTHOR_EARLY* subgraph, containing the first community ID that the Louvain algorithm found for this node.

In [158]:
query = """
  CALL gds.louvain.stream({
    nodeProjection: 'Author',
    relationshipProjection: {
      CO_AUTHOR_EARLY: {
        type: 'CO_AUTHOR_EARLY',
        orientation: 'UNDIRECTED'
      }
    },
    includeIntermediateCommunities: true
  })
  YIELD nodeId, communityId, intermediateCommunityIds
  WITH gds.util.asNode(nodeId) AS node, intermediateCommunityIds[0] AS smallestCommunity
  SET node.louvainTrain = smallestCommunity;
  """

with driver.session() as session:
    display(session.run(query).consume().counters)

{'properties_set': 80299}

- Similarly, set a property on each node in the *CO_AUTHOR_LATE* subgraph, containing the first community ID that the Louvain algorithm found for this node.

In [159]:
query = """
  CALL gds.louvain.stream({
    nodeProjection: 'Author',
    relationshipProjection: {
      CO_AUTHOR_LATE: {
        type: 'CO_AUTHOR_LATE',
        orientation: 'UNDIRECTED'
      }
    },
    includeIntermediateCommunities: true
  })
  YIELD nodeId, communityId, intermediateCommunityIds
  WITH gds.util.asNode(nodeId) AS node, intermediateCommunityIds[0] AS smallestCommunity
  SET node.louvainTest = smallestCommunity;
  """

with driver.session() as session:
    display(session.run(query).consume().counters)

{'properties_set': 80299}

Now, each node in our graph contains 2 new properties. What are the names of these properties? 


**Hint:** Feel free to use the Neo4J Browser if it doesn't look intuitive from the code blocks above.

In [None]:
#@title Solution

"""
- louvainTrain: the ID of the first community found for the node when using CO_AUTHOR_EARLY edges only
- louvainTest:  the ID of the first community found for the node when using CO_AUTHOR_LATE edges only
"""

Let's now build a derived feature to express for each pair wether the nodes belong to the same Louvain community or not.

- Complete the function below to create this derived feature for each pair of nodes

In [None]:
def add_community_feature(pairs, louvain_prop):
    query = """
      UNWIND ... AS ...
      MATCH (p1) WHERE ...
      MATCH (p2) WHERE ...
      RETURN 
        ... AS node1,
        ... AS node2,
        ... AS label,
        gds.alpha.linkprediction.sameCommunity(... , ... , ...) AS sl
    """
    params = {
    "...": ... ,
    "...": ...
    }
    with driver.session() as session:
        result = session.run(query, params)
        features_sl = spark.createDataFrame([dict(record) for record in result])
    
    return features_sl

In [164]:
#@title Solution

def add_community_feature(pairs, louvain_prop):
    query = """
      UNWIND $pairs AS pair
      MATCH (p1) WHERE ID(p1) = pair.node1
      MATCH (p2) WHERE ID(p2) = pair.node2
      RETURN 
        pair.node1 AS node1,
        pair.node2 AS node2,
        pair.label AS label,
        gds.alpha.linkprediction.sameCommunity(p1, p2, $louvainProp) AS sl
    """
    params = {
    "pairs": pairs,
    "louvainProp": louvain_prop
    }
    with driver.session() as session:
        result = session.run(query, params)
        features_sl = spark.createDataFrame([dict(record) for record in result])
    
    return features_sl

Let's apply the function to our training DataFrame and do a quick sanity check of the number of resulting rows (that should be the same).

In [174]:
print('Counts before applying community feature engineering: ')
print(df_train_under.count())
print(df_test_under.count())

df_train_under_community_f = add_community_feature(df_train_under_pairs, "louvainTrain")
df_test_under_community_f = add_community_feature(df_test_under_pairs, "louvainTest")

print('Counts after applying community feature engineering: ')
print(df_train_under_community_f.count())
print(df_test_under_community_f.count())

Counts before applying community feature engineering: 
162234
121380
Counts after applying community feature engineering: 
162234
121380


Let's see how it looks:

In [178]:
df_train_under_community_f.filter(F.col('label') == 0).show(5)
df_test_under_community_f.filter(F.col('label') == 1).show(5)

+-----+-----+------+---+
|label|node1| node2| sl|
+-----+-----+------+---+
|    0|  974| 36332|0.0|
|    0|  980|258610|0.0|
|    0| 1004|209887|0.0|
|    0| 1005|191172|0.0|
|    0| 1028|  9854|0.0|
+-----+-----+------+---+
only showing top 5 rows

+-----+------+------+---+
|label| node1| node2| sl|
+-----+------+------+---+
|    1|  1606|  1611|1.0|
|    1| 78935| 78936|1.0|
|    1|117647|117649|1.0|
|    1|140285|140286|1.0|
|    1|187648|187649|1.0|
+-----+------+------+---+
only showing top 5 rows



# Save train and test DataFrames

- Join the graph topology features df with the community feature df.

In [179]:
#@title Solution

print('Counts before joining df: ')
print(df_train_under_graph_f.count())
print(df_train_under_community_f.count())

print(df_test_under_graph_f.count())
print(df_test_under_community_f.count())

df_train_under = df_train_under_graph_f.join(df_train_under_community_f, on=['node1', 'node2', 'label'], how='inner')
df_test_under = df_test_under_graph_f.join(df_test_under_community_f, on=['node1', 'node2', 'label'], how='inner')

print('Counts after joining df: ')
print(df_train_under.count())
print(df_test_under.count())

Counts before joining df: 
162234
162234
121380
121380
Counts after joining df: 
162234
121380


Save our final train and test DataFrames to CSV files for use in the next notebook.


In [180]:
df_train_under.write.csv(save_folder +  "df_train_under_all.csv", mode='overwrite', header=True)
df_test_under.write.csv(save_folder + "df_test_under_all.csv", mode='overwrite', header=True)

Please check that both datasets have been written to your Drive at the desired location because we are going to need them later for model training and testing.