In [None]:
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import SQLContext
from graphframes import *    # Spark GraphX does not support Python

if __name__ == "__main__":
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    
    # Load data from text file into GraphFrame
    eRDD = sc.textFile("/path/to/data/text/file")
    e_split_RDD = eRDD.map(lambda line: line.split(" ")) 
    eSchema = StructType([StructField('src', StringType(), True),
                     StructField('dst', StringType(), True)])
    eDF = sqlContext.createDataFrame(e_split_RDD, eSchema)
    v0_RDD = e_split_RDD.map(lambda p: (str(p[0].strip()),))
    v1_RDD = e_split_RDD.map(lambda p: (str(p[1].strip()),))
    v_RDD = v0_RDD.union(v1_RDD)
    v_DF = sqlContext.createDataFrame(v_RDD, schema=["id"]).dropDuplicates(["id"])
    g = GraphFrame(vDF, eDF)
    edgesDF.registerTempTable("edges_table")
    edgesDF.cache()
  
    process_the_nodes()


def process_the_nodes():
    v_list = v_DF.rdd.map(lambda row: row.id).collect()
    for v in v_list:
        raw_pred_rank = get_ranked_raw_predictions(root_node=v)
        
    
def get_ranked_raw_predictions(root_node):
    raw_preds = get_raw_predictions(root_node=root_node)
    nbh_df = get_neighbours(root_node=root_node)
    nbh_list = nbh_df.rdd.map(lambda row: row.dst).collect()

    raw_pred_score = []

    for node in raw_preds:
        node_score = get_adamicadar_score(nbhrs_x_node=set(nbh_list), y_node=node)
        raw_pred_score.append((node, node_score))

        raw_pred_rank = sorted(raw_pred_score, key=operator.itemgetter(1), reverse=True)
    return raw_pred_rank  # list of tuples (node, node_score): sorted by score


def get_raw_predictions(root_node):
    root_nbh_df = get_neighbours(root_node)
    root_nbh_nodes = root_nbh_df.rdd.map(lambda row: row.dst).collect()
    raw_preds = set()

    for node in root_nbh_nodes:
        node_df = get_neighbours(node)
        raw_preds = raw_preds.union(set(node_df.rdd.map(lambda row: row.dst).collect()))

    raw_preds = raw_preds.difference(set(root_nbh_nodes)).difference({root_node})

    return raw_preds  


def get_neighbours(root_node):
    query = "SELECT dst FROM edges_table WHERE src = '%s'" % root_node
    nbh1_df = sqlContext.sql(query)
    query = "SELECT src FROM edges_table WHERE dst = '%s'" % root_node
    nbh2_df = sqlContext.sql(query)
    
    nbh_df = nbh1_df.unionAll(nbh2_df).dropDuplicates()  # union: so column names becomes dst (the first one)
    return nbh_df


def determine_score(n):
    score = float(0)
    try:
        score = log(n)**-1
    except Exception as e:
        logging.error(e)
    return score

  
def get_adamicadar_score(nbhrs_x_node, y_node): 
    nbhrs_y_node_df = get_neighbours(y_node)
    nbhrs_y_node = set(nbhrs_y_node_df.rdd.map(lambda row: row.dst).collect())
    common_nbhrs_xy = nbhrs_y_node.intersection(nbhrs_x_node)

    score = float(0)

    for node in common_nbhrs_xy:
        node_df = get_neighbours(node)
        score += determine_score(node_df.count())

    return score
  