In [38]:
"""
    Алгоритм PageRank (PR) - это итеративный алгоритм, используемый поисковой системой Google для ранжирования веб-страниц в результатах поисковой системы.
    Источники: 1) Изучаем Spark. Молниеносный анализ данных, стр. 91-93
               2) https://russianblogs.com/article/9289124253/
"""
from pyspark.sql import SparkSession
sc=SparkSession.builder.getOrCreate().sparkContext

In [39]:
# Алгоритм оперирует двумя наборами данных: один - с элементами типа (pageID, linkList), где linkList -  страницы, на которые ссылается страница pageID
links=sc.parallelize([('A', ('B', 'C', 'D')),
                      ('B', ('A','C')),
                      ('C', ('A',)),
                      ('D', ('B','C'))])

In [40]:
links.collect()

[('A', ('B', 'C', 'D')), ('B', ('A', 'C')), ('C', ('A',)), ('D', ('B', 'C'))]

In [41]:
# второй - с элементами типа (pageID, rank), содержащими текущий ранг соответствующих страниц
c=links.count()
ranks=links.mapValues(lambda r: float(1/c))

In [27]:
ranks.collect()

[('A', 0.25), ('B', 0.25), ('C', 0.25), ('D', 0.25)]

In [42]:
contributions=links.join(ranks).persist()

In [43]:
sorted(contributions.collect())

[('A', (('B', 'C', 'D'), 0.25)),
 ('B', (('A', 'C'), 0.25)),
 ('C', (('A',), 0.25)),
 ('D', (('B', 'C'), 0.25))]

In [35]:
# весь алгоритм
def func(x):
    pageID, (links_, rank) = x
    l = [(k, rank) for k in links_]
    return l
for _ in range(10):
    ranks=links.join(ranks).flatMap(lambda x: func(x)).reduceByKey(lambda x,y: x+y).mapValues(lambda v: 0.15+0.85*v)

In [36]:
sorted(ranks.collect())

[('A', 143.39906214410976),
 ('B', 111.34080962752634),
 ('C', 168.3575530484062),
 ('D', 73.63726137581848)]

In [51]:
links=sc.parallelize([('A', ('B', 'C', 'D')),
                      ('B', ('A','C')),
                      ('C', ('A',)),
                      ('D', ('B','C'))])
c=links.count()
ranks=links.mapValues(lambda r: float(1/c))

In [52]:
for _ in range(1):
    ranks=links.join(ranks)
ranks.collect()

[('A', (('B', 'C', 'D'), 0.25)),
 ('B', (('A', 'C'), 0.25)),
 ('C', (('A',), 0.25)),
 ('D', (('B', 'C'), 0.25))]

In [53]:
def func(x):
    pageID, (links_, rank) = x
    l = [(k, rank) for k in links_]
    return l
ranks=ranks.flatMap(lambda x: func(x))
ranks.collect()

[('B', 0.25),
 ('C', 0.25),
 ('D', 0.25),
 ('A', 0.25),
 ('C', 0.25),
 ('A', 0.25),
 ('B', 0.25),
 ('C', 0.25)]

In [54]:
ranks=ranks.reduceByKey(lambda x,y: x+y)
ranks.collect()

[('B', 0.5), ('A', 0.5), ('C', 0.75), ('D', 0.25)]

In [55]:
ranks=ranks.mapValues(lambda v: 0.15+0.85*v)
ranks.collect()

[('B', 0.575), ('A', 0.575), ('C', 0.7875), ('D', 0.3625)]