In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

openjdk-8-jdk-headless is already the newest version (8u312-b07-0ubuntu1~18.04).
The following packages were automatically installed and are no longer required:
  libnvidia-common-460 nsight-compute-2020.2.0
Use 'apt autoremove' to remove them.
0 upgraded, 0 newly installed, 0 to remove and 42 not upgraded.


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
id='13tN7nzAV_Lk79NLm4KRGVO7735T9R5nn'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('graph-full.txt')

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import re
import numpy as np

In [None]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
beta=0.8
n=1000
max_iterations = 40

data = sc.textFile("graph-full.txt")
edges = data.map(lambda x: tuple(x.split('\t'))).distinct() #Splits on tab, we get all distinct edge pairs
r_edges = edges.map(lambda x: tuple(reversed(x))) #Since matrix is Mji instead of Mij
#print(len(edges)) 8161 distinct pairs

outgoing_edge_count = edges.countByKey()
#print(outgoing_edge_count)
#for k, v in outgoing_edge_count.items():
  #print(k,v)
  #print(type(k), type(v))


In [None]:
#Borrowed this function from https://github.com/seahyinghang8/asst5/blob/master/pageRank.py
def valueToVec(nodes, deg, n):
  vec = np.zeros((1, n))
  for node in nodes:
    if (node not in deg): 
      continue
    else:
      if(int(node!=0)):
        vec[0][int(node) - 1] = 1 / deg[(node)]
  return vec


#M = r_edges.groupByKey().mapValues(list).collect()
#print(M[0])
#print(M[0][1]) # k, v edge-pairs
#print(type(M[0][0]))
#print(type(int(M[0][0])))

In [None]:
M = r_edges.groupByKey().map(lambda x: (x[0], valueToVec(x[1], outgoing_edge_count, n))) #.take(10)
#print(M)

In [None]:
r = np.ones((n, 1)) / n
r_prev = r
tp_chance = (1-beta)/n

In [None]:
for i in range(max_iterations):
  temp = M.map(lambda x: (x[0], (x[1].dot(r_prev) * beta))).collect()
  r[:] = tp_chance #initialize all r_values to chance of random teleport
  for(k, v) in temp:
    r[int(k)-1][0] += v

  r_prev = r

In [None]:
#https://numpy.org/doc/stable/reference/generated/numpy.argsort.html
sort_ind = np.argsort(r, axis=0) #Sorts data along given axis

print("Top 5 nodes")
print("Pagerank score,  id")
temp=[]
for i in range(1,6):
  val = float(r[sort_ind[-i]])
  #print(val)
  node = sort_ind[-i] + 1
  #print(node)
  temp.append((val, node[0]))
print('\n'.join(map(str, temp)))

print()
print("Bottom 5 nodes")
print("PageRank score,  id")
temp=[]
for i in reversed(range(5)):
  val = float(r[sort_ind[i]])
  #print(val)
  node = sort_ind[i]+1
  #print(node)
  temp.append((val, node[0]))
print('\n'.join(map(str, temp)))

Top 5 nodes
Pagerank score,  id
(0.0020202911815182184, 263)
(0.0019433415714531497, 537)
(0.0019254478071662627, 965)
(0.001852634016241731, 243)
(0.0018273721700645144, 285)

Bottom 5 nodes
PageRank score,  id
(0.00038779848719291705, 408)
(0.0003548153864930145, 424)
(0.00035314810510596274, 62)
(0.0003513568937516577, 93)
(0.0003286018525215297, 558)
