## **Configurations**

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u362-ga-0ubuntu1~20.04.1).
0 upgraded, 0 newly installed, 0 to remove and 24 not upgraded.


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [6]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

## **Q1**

In [31]:
rdd = sc.textFile("graph-full.txt")      

In [32]:
n = 1000     
beta = 0.8     
max_iteration = 40

In [33]:
edges = rdd.map(lambda x: x.split())\
           .map(lambda x: (int(x[0]), int(x[1])))\
           .distinct()    

reverse_edges = edges.map(lambda x: (x[1], x[0]))            

In [34]:
out_degrees = edges.groupByKey().mapValues(lambda x: len(x))      
out_degrees_dict = out_degrees.collectAsMap()                    

In [35]:
r = [(i, 1/n) for i in range(1, n + 1)]  # r: = (i, r_i) r_i is initialized with 1/N       

In [36]:
for i in range(max_iteration):          
  r = sc.parallelize(r)  
  r_dict = r.collectAsMap()   

  update_r = reverse_edges.map(lambda x: (x[0], r_dict[x[1]]/out_degrees_dict[x[1]]))\
                .reduceByKey(lambda x, y: x + y)\
                .map(lambda x: (x[0], beta * x[1] + (1 - beta)/ n))  

  r = r.union(update_r).reduceByKey(lambda x, y: y)
  r = r.collect()     

In [37]:
r = sc.parallelize(r)    
   
sorted_r = r.sortBy(lambda x: x[1], ascending=False)
top_five_r = sorted_r.take(5)
bottom_five_r = sorted_r.takeOrdered(5, key=lambda x: x[1])

print("the top 5 node ids with the highest PageRank scores: ", "\n", top_five_r)
print("the bottom 5 node ids with the lowest PageRank scores: ", "\n", bottom_five_r)

the top 5 node ids with the highest PageRank scores:  
 [(263, 0.002020291181518219), (537, 0.0019433415714531497), (965, 0.0019254478071662631), (243, 0.001852634016241731), (285, 0.0018273721700645144)]
the bottom 5 node ids with the lowest PageRank scores:  
 [(558, 0.0003286018525215297), (93, 0.00035135689375165774), (62, 0.00035314810510596274), (424, 0.00035481538649301454), (408, 0.00038779848719291705)]
