In [1]:
import findspark
findspark.init('/home/ek/spark-2.4.4-bin-hadoop2.7')
import pyspark
import os
java8_location= '/usr/lib/jvm/java-8-openjdk-amd64' # Set your own
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql.functions import udf
from IPython.display import Image
from collections import deque

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('HW').getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions',6)

Structure of the data : FromNodeId	ToNodeId

In [4]:
dataset = spark.sparkContext.textFile('web-Stanford_small.txt')\
                            .filter(lambda x: x[0]!='#')\
                            .map(lambda x: x.split('\t'))
# the structure of the data is now toNode: fromNode

In [5]:
from_to = dataset.map(lambda x: (int(x[0]),int(x[1])))

In [6]:
from_to.groupByKey().count() #number of nodes that point to another node

198

In [7]:
def explode(row):
    for k in row:
        yield k

In [8]:
all_nodes = dataset.flatMap(explode).distinct().map(lambda x: (int(x),None)).persist()

In [9]:
all_nodes.count() # total number of nodes

1587

In [10]:
from_to = all_nodes.leftOuterJoin(from_to).map(lambda x: (x[0],x[1][1]))

In [11]:
from_to.take(5)

[(98628, 2), (204604, 2), (241596, 2), (78056, None), (182456, None)]

In [12]:
def check(x):
    for i in x:
        if i is not None:
            return True

In [13]:
from_to = from_to.groupByKey()\
                .map(lambda x: (x[0],(len(x[1]),list(x[1]))) if check(x[1])\
                     else (x[0],(0,[])))

In [14]:
from_to.take(10)

[(98628, (1, [2])),
 (204604, (1, [2])),
 (241596, (1, [2])),
 (78056, (0, [])),
 (182456, (0, [])),
 (28140, (0, [])),
 (47352, (0, [])),
 (192120, (0, [])),
 (215600, (0, [])),
 (4, (4, [35716, 96512, 186750, 225872]))]

In [15]:
N_NODES = all_nodes.count()

In [25]:
page_rank = all_nodes.map(lambda x: (x[0],[1/N_NODES,0])) # [0] is the old [1] is the new

In [26]:
page_rank.take(5)

[(1, [0.000630119722747322, 0]),
 (15409, [0.000630119722747322, 0]),
 (17794, [0.000630119722747322, 0]),
 (25202, [0.000630119722747322, 0]),
 (53625, [0.000630119722747322, 0])]

In [27]:
page_rank = dict(page_rank.collect())

In [28]:
transition_matrix = dict(from_to.collect())

In [29]:
taxation_hyperparameter = 0.8

In [30]:
def total_sum_of_ratings(t = 1):
    c=0
    for i in page_rank.items():
        c+=i[1][t]
    return c

In [31]:
def calculate_page_rank(taxation_hyperparameter, epochs):
    for _ in range(epochs):
        total_sum = 0
        for node in transition_matrix.keys():
            #print('Node: ',node)
            #print('outdegree: ',transition_matrix[node][0])
            for successor in transition_matrix[node][1]:
                #print('Successor: ', successor)
                try:
                    page_rank[successor][1] += taxation_hyperparameter*\
                                            page_rank[node][0]/transition_matrix[node][0]
                    #print(page_rank[successor][1])
                    #break
                except ZeroDivisionError:
                    page_rank[successor][1] += 0
           # break
        # update:
        total_sum = total_sum_of_ratings()
        for node in page_rank.keys():
            page_rank[node][0] = page_rank[node][1] + (1-total_sum)/N_NODES
            page_rank[node][1] = 0
        #break

In [32]:
calculate_page_rank(taxation_hyperparameter, 5)

In [33]:
total_sum_of_ratings(0) 

0.9999999999999939