In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('Advanced-HW2').getOrCreate()

In [2]:
# Read lines from the text file
pr_sdf = spark.read.load('pr_graph.txt', format="text")

In [3]:
pr_sdf = pr_sdf.select(F.split(pr_sdf.value, ' ')[0].alias('from_node').cast('int'),\
                      F.split(pr_sdf.value, ' ')[1].alias('to_node').cast('int'))
pr_sdf.show()

+---------+-------+
|from_node|to_node|
+---------+-------+
|        1|      2|
|        1|      3|
|        1|      4|
|        1|      5|
|        2|      3|
|        2|      5|
|        3|      2|
|        4|      5|
|        5|      1|
|        5|      6|
|        5|      7|
|        6|      7|
|        7|      6|
|        7|      2|
|        7|      7|
|        5|      4|
+---------+-------+



In [4]:
def pagerank(G, num_iter):
    
    G.cache()
    # user defined
    fn_re = F.udf(lambda x :1/x, DoubleType())
    
    weights_sdf = G.groupBy('from_node').count().withColumnRenamed('from_node', 'node')
    weights_sdf = weights_sdf.select('node', fn_re('count').alias('weight'))
    
    weighted_pr_sdf = G.join(weights_sdf, weights_sdf.node == G.from_node, 'inner')
    weighted_pr_sdf = weighted_pr_sdf.drop('node').cache()
    weighted_pr_sdf.createOrReplaceTempView('weighted_pr_sdf_view')
    
    pr_values_sdf = G.select(G.from_node).drop_duplicates().withColumn('pagerank', F.lit(1/ G.count()))
    pr_values_sdf = pr_values_sdf.withColumnRenamed('from_node','node')
    
    G.unpersist()
    
    for i in range(num_iter):
        pr_values_sdf.createOrReplaceTempView('pr_values_sdf_view')
        pr_values_sdf = spark.sql('select weighted_pr_sdf_view.to_node as node,\
        0.85 * sum(pr_values_sdf_view.pagerank * weighted_pr_sdf_view.weight) + 0.15 as pagerank \
        from pr_values_sdf_view inner join weighted_pr_sdf_view\
        on pr_values_sdf_view.node = weighted_pr_sdf_view.from_node\
        group by weighted_pr_sdf_view.to_node\
        order by weighted_pr_sdf_view.to_node')
    
    weighted_pr_sdf.unpersist()

    return pr_values_sdf

## Step 5

In [5]:
pr_sdf.count()

16

In [6]:
pr_sdf.show()

+---------+-------+
|from_node|to_node|
+---------+-------+
|        1|      2|
|        1|      3|
|        1|      4|
|        1|      5|
|        2|      3|
|        2|      5|
|        3|      2|
|        4|      5|
|        5|      1|
|        5|      6|
|        5|      7|
|        6|      7|
|        7|      6|
|        7|      2|
|        7|      7|
|        5|      4|
+---------+-------+



In [7]:
pagerank(pr_sdf, 5).orderBy("node").show()

+----+-------------------+
|node|           pagerank|
+----+-------------------+
|   1|0.29434346987900906|
|   2| 0.7901554423964857|
|   3|0.49717473280894897|
|   4| 0.3519121121540776|
|   5| 0.7712117128233261|
|   6| 0.5055626720192206|
|   7| 0.8778237446376815|
+----+-------------------+

