In [22]:
import pandas as pd
import re
import warnings
warnings.filterwarnings('ignore')


In [2]:
from pyspark.sql.types import *
from pyspark.sql.types import ArrayType, StringType, LongType, StructType,  StructField, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from operator import add

In [3]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

In [4]:
data2 = [(1,[2,4]),
    (2,[1,3,5]),
    (3,[4]),
    (4,[1,2]),
    (5,[])
  ]

schema = StructType([ \
    StructField("id", IntegerType(),True), \
    StructField("next", ArrayType(elementType= LongType()),True)
    ])
 
ForwardDF = spark.createDataFrame(data=data2,schema=schema)
ForwardPDF= ForwardDF.toPandas()
ForwardDF_WC = ForwardDF.select("id", "next", F.size("next").alias('n_next'))
ForwardPDF_WC = ForwardDF_WC.toPandas()

                                                                                

In [5]:
def reverseId(id,links):
    if (len(links)>0):
        reverse = [ (tgt_id,id) for tgt_id in links ]
    else:
        reverse=[]
    return reverse

In [6]:
ForwardRDD = ForwardDF_WC.rdd

In [7]:
ReverseRDD=(ForwardRDD
 .flatMap(lambda r: reverseId(r.id,r.next))
 .groupByKey()
 .map(lambda r: (r[0],list(r[1])))
 )

In [8]:
reverseDF=spark.createDataFrame(ReverseRDD,["id1","prev"])
reverseDF_WC = reverseDF.select("id1", "prev", F.size("prev").alias("n_prev"))

In [9]:
reverseDF.show()

+---+------+
|id1|  prev|
+---+------+
|  1|[2, 4]|
|  2|[1, 4]|
|  3|   [2]|
|  4|[1, 3]|
|  5|   [2]|
+---+------+



In [10]:
ReverseDF = reverseDF_WC.join(ForwardDF_WC, ForwardDF_WC.id == reverseDF_WC.id1).select("id","prev","n_prev","next","n_next").withColumn("rank", F.lit(0.2))
ReverseDF.show()
ReversePDF = ReverseDF.toPandas()

+---+------+------+---------+------+----+
| id|  prev|n_prev|     next|n_next|rank|
+---+------+------+---------+------+----+
|  5|   [2]|     1|       []|     0| 0.2|
|  1|[2, 4]|     2|   [2, 4]|     2| 0.2|
|  3|   [2]|     1|      [4]|     1| 0.2|
|  2|[1, 4]|     2|[1, 3, 5]|     3| 0.2|
|  4|[1, 3]|     2|   [1, 2]|     2| 0.2|
+---+------+------+---------+------+----+



                                                                                

In [12]:
print(type(ReverseDF.select("prev")))

<class 'pyspark.sql.dataframe.DataFrame'>


In [13]:
def new_pagerank(list_of_ids, pagerank):
    '''
    1 - Iterar por los diferentes ids de los preccessors 
    2 - Hacer el sumatorio ponderado (formula de PageRank) 
    '''
    # PageRankPDF= broadcast_PageRankPDF.value
    new_page_rank = 0.0
    N= pagerank.shape[0]-1
        
    for k in list_of_ids: 
        temp= pagerank
        line = temp.query(f'id== {k}', inplace = False)
        r = float(line['rank'])
        s = float(line['n_next'])
        #if s ==0:
        #    new_page_rank += r/N   
        #else:
        new_page_rank += r/s 
    return float(new_page_rank)

In [14]:
sum2= ReverseDF.filter(ReverseDF["n_next"]==0).select("rank").rdd.reduce(add)
a=0.0
for i in sum2: a += i 
print(sum2)





Row(rank=0.2)


                                                                                

In [15]:
def new_pagerank_nosuccessors(pagerank,size):
    '''
    Takes into account all the ids that do not have any succesor, and shares their rank with the whole dataframe  
    '''

    withNextDF = pagerank.filter(pagerank["n_next"]!=0)
    withoutNextDF = pagerank.filter(pagerank["n_next"]==0)
    
    temp= withoutNextDF.select("rank").rdd.reduce(add)
    sum= 0.0
    for i in temp: sum += i
    
    withNextDF2 = withNextDF.withColumn("rank",withNextDF.rank + F.lit(sum/(size-1)))
    UnionDF = withoutNextDF.unionByName(withNextDF2)
    
    return UnionDF

In [16]:
new_pagerank_nosuccessors(ReverseDF,5).show()

                                                                                

+---+------+------+---------+------+----+
| id|  prev|n_prev|     next|n_next|rank|
+---+------+------+---------+------+----+
|  5|   [2]|     1|       []|     0| 0.2|
|  1|[2, 4]|     2|   [2, 4]|     2|0.25|
|  3|   [2]|     1|      [4]|     1|0.25|
|  2|[1, 4]|     2|[1, 3, 5]|     3|0.25|
|  4|[1, 3]|     2|   [1, 2]|     2|0.25|
+---+------+------+---------+------+----+



In [19]:
count= 0
temp= 2
# broadcast_PageRankPDF= sc.broadcast(ReversePDF)
PageRankPDF= ReversePDF
size = PageRankPDF.shape[0]
PrevRankPDF= PageRankPDF

while (count < 20) and (temp > 0.0001):
    conv= 0.0
    # First we take into account the contribution of the usual pages
    udf_new_pagerank = udf(lambda l: new_pagerank(l , PageRankPDF), FloatType())
    NewPageRankDF = ReverseDF.select("id",udf_new_pagerank("prev").alias("rank"),"n_next")
    #PageRankPDF= NewPageRankDF.toPandas() 
    # Then the contribution of the non successor pages 
    NewPageRankDF_2 = new_pagerank_nosuccessors(NewPageRankDF, size)
    PageRankPDF = NewPageRankDF_2.toPandas() 
    
    temp = abs(PageRankPDF["rank"] - PrevRankPDF["rank"]).sum()/size
       
    PrevRankPDF = PageRankPDF
    count += 1 

FinalPageRankDF = NewPageRankDF_2.orderBy(F.desc("rank")).select("id", "rank")

                                                                                

In [21]:
FinalPageRankDF.show()




+---+-------------------+
| id|               rank|
+---+-------------------+
|  4|   0.31666667945683|
|  2| 0.2166666705161333|
|  1|0.18333333916962147|
|  3|0.08333333767950535|
|  5|0.06666667014360428|
+---+-------------------+



