# PageRank на Spark RDD

### Шаг №1
Создайте `SparkContext`

In [None]:
import random
SPARK_UI_PORT = random.choice(range(10000, 10200))
print(f"Spark UI port: {SPARK_UI_PORT}")

In [None]:
from pyspark import SparkConf, SparkContext

In [None]:
conf = SparkConf()
conf.set("spark.ui.port", SPARK_UI_PORT)
conf.set("spark.driver.memory", "512m")
conf.set("spark.executor.instances", "2")
conf.set("spark.executor.cores", "1")

sc = SparkContext(master="yarn", conf=conf)

### Шаг №2
1. Прочитайте граф из файла `/data/spark/lecture05/graph.tsv`
2. Создайте RDD, в которой граф будет представлен парами вершин
3. Убедитесь, что граф совпадает с рисунком на доске

In [None]:
raw_graph = sc.textFile("/data/spark/lecture05/graph.tsv")
graph = raw_graph.map(lambda x: tuple(x.split("\t")))\
                 .distinct()\
                 .cache()
graph.collect()

### Шаг №3
Создайте RDD с первоначальными pagerank всех уникальных вершин

In [None]:
vertices = graph.map(lambda x: x[0]).union(graph.map(lambda x: x[1])).distinct()

num_vertices = vertices.count()

ranks = vertices.map(lambda x: (x, 1 / num_vertices))
ranks.collect()

### Шаг №4
Создайте RDD, которая берет RDD с вершинами, объединяет ее с RDD с pagerank. В результате должна получится PairRDD, где ключ - это уникальная вершина, а значение - это все вершины, на которые она ссылаются и ее текущий pagerank

In [None]:
links = graph.groupByKey().mapValues(list).cache()

contributions = links.join(ranks)
contributions.collect()

### Шаг №5
Реализуйте функцию, которая рассчитывает pagerank для всех вершин, на которые ссылается данная вершина. Функция должна быть итератором, который возвращает вершину и ее pagerank

In [None]:
def computeContribs(neighbours, pagerank):
    num = len(neighbours)
    
    for n in neighbours:
        yield (n, pagerank / num)

### Шаг №6
Обновите RDD с pagerank значениями, посчитанными с помощью функции из предыдущего шага

In [None]:
ranks = contributions.flatMap(lambda x: computeContribs(x[1][0], x[1][1]))\
                     .reduceByKey(lambda x, y: x + y)

ranks.collect()

### Шаг №7
Напишите цикл, который проводит несколько итераций вычисления pagerank и на каждой печатает номер итерации и текущие pagerank

In [None]:
iterations = 10
raw_graph = sc.textFile("/data/spark/lecture05/graph.tsv")
graph = raw_graph.map(lambda x: tuple(x.split("\t")))\
                 .distinct()\
                 .cache()
vertices = graph.map(lambda x: x[0]).union(graph.map(lambda x: x[1])).distinct()
num_vertices = vertices.count()
ranks = vertices.map(lambda x: (x, 1 / num_vertices))

for i in range(iterations):
    links = graph.groupByKey().mapValues(list).cache()
    contributions = links.join(ranks)
    ranks = contributions.flatMap(lambda x: computeContribs(x[1][0], x[1][1]))\
                         .reduceByKey(lambda x, y: x + y)
    print("Iteration {0}: current pagerank {1}".format(i, sorted(ranks.collect(), key=lambda x: x[0])))

### Шаг №8
Не забудьте остановить SparkContext

In [None]:
sc.stop()