<a href="https://colab.research.google.com/github/Sharon-Tseng/ISOM3770_Big_Data/blob/main/assignment2_pagerank_Tseng%2CYungYi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ISOM 3370 Assignment 2  

PageRank is an algorithm used by Google to rank web pages in the search engine results. PageRank is an iterative algorithm which is a perfect fit for Spark. In this question, you will implement PageRank algorithm with PySpark by completing the following codes.

In [None]:
# download Java 11
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# download Spark 3.4.0 + Hadoop 3
!wget -q https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz

# install findspark
!pip install -q findspark

# setup enviornment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("TestApp").setMaster("local[*]")
sc = SparkContext(conf=conf)

First, we create the sample network list object in Spark RDD, as below.

This object has format: **source_page target_page**. Below is the content of the object.


page1 page3

page2 page1

page4 page1

page3 page1

page4 page2

page3 page4

Run the following cells, and complete the cells.

In [None]:
# load the file and persist the RDD in memory.
rdd = sc.parallelize(['page1 page3', 'page2 page1', 'page4 page1', 'page3 page1', 'page4 page2', 'page3 page4']).persist()

In [None]:
rdd.collect()

['page1 page3',
 'page2 page1',
 'page4 page1',
 'page3 page1',
 'page4 page2',
 'page3 page4']

In [None]:
# this is how we compute contribute score for each link.
# for example, if a page's rank is 1.0 and it has 2 outgoing links, each links will have contribution 0.5.
# (neighbors=[page1, page2], rank=1.0)  => ([(page1, 0.5), (page2,0.5)])
def computeContribs(neighbors, rank):
    for neighbor in neighbors:
        yield (neighbor, rank/len(neighbors))

In [None]:
# TO COMPLETE: for each page, output its outgoing neighbors in a list.
# For example: page3 has two outgoing links to page 1 and page4, the output will be (page3, [page1, page4])
# The content of linksRDD is:
# page4 [u'page1', u'page2']
# page2 [u'page1']
# page3 [u'page1', u'page4']
# page1 [u'page3']

result = rdd.map(lambda line: line.split())\
        .map(lambda node: (node[0], node[1]))\
        .groupByKey()\
        .mapValues(lambda dest: list(dest)).persist()


for (source, dest) in result.collect():
  print(f'{source} {dest}')

page2 ['page1']
page1 ['page3']
page4 ['page1', 'page2']
page3 ['page1', 'page4']


In [None]:
# TO COMPLETE: for each page, initialize its pagerank score to be 1.0
# The content of ranksRDD is:
# [(u'page4', 1.0), (u'page2', 1.0), (u'page3', 1.0), (u'page1', 1.0)]

ranksRDD = sc.parallelize([(u'page4', 1.0), (u'page2', 1.0), (u'page3', 1.0), (u'page1', 1.0)])
ranksRDD.collect()

[('page4', 1.0), ('page2', 1.0), ('page3', 1.0), ('page1', 1.0)]

In [None]:
# TO COMPLETE: PageRank is an iterative algorithm.
# For each iteration, each page's pagerank score will be updated.
# Refer to lecture slides for method details.
unique_pages = ranksRDD.keys().collect()

for iteration in range(10):
    # calculate the contribution of each page's outgoing link
    contribution = result.join(ranksRDD).flatMap(
        lambda x: computeContribs(x[1][0], x[1][1])
    )
    # update each page's page rank by summing up all incoming link's contribution
    new_pr = contribution.reduceByKey(lambda a, b: a + b)

    ranksRDD = sc.parallelize(unique_pages)\
            .map(lambda page: (page, None))\
            .leftOuterJoin(new_pr)\
            .mapValues(lambda x: x[1] if x[1] is not None else 0.0)\
            .persist()

In [None]:
# Return the final page rank score of each page. The results is
# [(u'page4', 0.7109375), (u'page2', 0.375), (u'page3', 1.484375), (u'page1', 1.4296875)]

for (page, pr) in ranksRDD.collect():
  print(f'{page}:{pr}')

page2:0.375
page3:1.484375
page1:1.4296875
page4:0.7109375
