#### Dependencies
____

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql.functions import col, asc

#### Create Spark Session
_____

In [3]:
spark = SparkSession.builder.appName('CustomPageRank').getOrCreate()

##### Load RDD
____

In [18]:
rdd = spark.sparkContext.textFile('../resources/02AdjacencyList.txt')
rdd.persist()

../resources/02AdjacencyList.txt MapPartitionsRDD[16142] at textFile at NativeMethodAccessorImpl.java:0

#### Create Vertices and Edge Dataframes
_____

In [19]:
v_list = []
e_list = []

for node in rdd.map(lambda item: item.split(' ')).collect():
    v_list.append((node[0], 'vertice_'+node[0]))
    
    for edge in range(1, len(node)):
        e_list.append((node[0], node[edge]))   

In [22]:
vertices = spark.createDataFrame(v_list, ['id', 'name'])
edges = spark.createDataFrame(e_list, ['src', 'dst'])

vertices.persist()
edges.persist()

DataFrame[src: string, dst: string]

In [23]:
vertices.show()

+---+---------+
| id|     name|
+---+---------+
|  1|vertice_1|
|  2|vertice_2|
|  3|vertice_3|
|  4|vertice_4|
|  5|vertice_5|
+---+---------+



In [24]:
edges.show()

+---+---+
|src|dst|
+---+---+
|  1|  2|
|  2|  3|
|  2|  4|
|  3|  4|
|  4|  1|
|  4|  5|
|  5|  3|
+---+---+



#### PageRank DataFrame
___

In [25]:
l_rank = [(vertex[0], 0, 1/vertices.count(), 1/vertices.count()) for vertex in vertices.select('id').collect()]

pagerank = spark.createDataFrame(l_rank, ['page', 'iteration', 'pagerank', 'perc_rank'])

pagerank.show()

+----+---------+--------+---------+
|page|iteration|pagerank|perc_rank|
+----+---------+--------+---------+
|   1|        0|     0.2|      0.2|
|   2|        0|     0.2|      0.2|
|   3|        0|     0.2|      0.2|
|   4|        0|     0.2|      0.2|
|   5|        0|     0.2|      0.2|
+----+---------+--------+---------+



#### Perform PageRank 
______

In [26]:
def calc_page_rank(edge, damping_factor):
    
    total_rank = []
    
    for inbound in edges.filter(f"dst == '{edge}'").collect():
        last_iteration = pagerank.filter(f"page == '{inbound['src']}'").agg({'iteration' : 'max'}).collect()[0][0]
        last_rank = pagerank.filter(f"page == '{inbound['src']}'").filter(f'iteration == {last_iteration}').select('pagerank').collect()[0][0]
        total_outbound = edges.filter(f"src == '{inbound['src']}'").count()
        
        total_rank.append(last_rank/total_outbound)
        
    return (edge, last_iteration + 1, (1-damping_factor) + (damping_factor * sum(total_rank)))

In [27]:
damping_factor = 0.85
rows = []
iterations = 29

for _ in range(iterations):
    for page in vertices.select('id').collect():
        rows.append(calc_page_rank(page[0], damping_factor))
  
    newRows = spark.createDataFrame(rows)
    sum_pr = newRows.select(newRows.columns[-1]).agg({f'{newRows.columns[-1]}' : 'sum'}).collect()[0][0]    
    newRows = newRows.withColumn("perc_rank", (newRows[f'{newRows.columns[-1]}']/sum_pr))
    
    pagerank = pagerank.union(newRows)
    rows.clear() 

#### PageRank Results
______

In [28]:
last_iteration = pagerank.agg({'iteration' : 'max'}).collect()[0][0]

pagerank.filter(f"iteration == {last_iteration}").show()

+----+---------+------------------+-------------------+
|page|iteration|          pagerank|          perc_rank|
+----+---------+------------------+-------------------+
|   1|       29|0.7723196821160265|0.15558130445306423|
|   2|       29|0.8056407707925519|0.16229373010028633|
|   3|       29|1.1476149730879313|0.23118332817014126|
|   4|       29|1.4661954349708077| 0.2953603328234441|
|   5|       29|0.7723196821160265|0.15558130445306423|
+----+---------+------------------+-------------------+



#### Save To Disk
______

In [32]:
pagerank.write.format("csv").save("../output/custom_page_rank")