## Assignment 2 - Task 2

**Dataset**: Webpage links (gr0.epa)
**Dataset information**:
This dataset contains about 4800 webpages and their hyperlinks. The records with the “n” flag contain (unique) IDs and URLs of webpages. Those with the “e” flag represent hyperlinks between webpages. Essentially, this dataset models a “mini-WWW”.

**Objective**:
The objective of this task is to implement a PageRank algorithm to rank the webpages in the dataset.

In [1]:
# Used to get spark to work on Jupyter notebook
import findspark
findspark.init()

In [2]:
# Initialize spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CSCI316-ass2") \
    .config("spark-master", "local") \
    .getOrCreate()
spark

In [3]:
# Import relevant packages
from pyspark import SparkContext
from pyspark.sql import Row
import pyspark.sql.functions as pyf
from pyspark.sql.types import *
from pyspark.mllib.linalg.distributed import *
from pyspark.mllib.linalg import *

In [4]:
sc = SparkContext.getOrCreate()

# Load the data into a Spark DataFrame and print the schema
gr0_CD = spark.read.csv("gr0.epa", inferSchema=True, header=False, sep=' ')
gr0_CD.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)



In [5]:
# Split the data into two parts - the url part and the link part
# URL schema
schema_id = StructType([
    StructField("ID", StringType(), False),
    StructField("URL", StringType(), False)
]) 

# Link schema
schema_link = StructType([
    StructField("SRC", IntegerType(), False),
    StructField("DESTINATION", StringType(), False),
])

# Link DataFrame with DESTINATION casted to Integer Type
gr0_link = spark.createDataFrame(gr0_CD.where(gr0_CD['_c0'] == 'e').drop('_c0').collect(), schema_link)
gr0_link = gr0_link.withColumn("DEST", gr0_link["DESTINATION"].cast("int")).drop("DESTINATION")

# URL DataFrame
gr0_id = spark.createDataFrame(gr0_CD.where(gr0_CD['_c0'] == 'n').drop('_c0').collect(), schema_id)

# Compute the weight on each link for each source node
gr0_weight = gr0_link.groupBy("SRC").agg((1 / pyf.count('DEST')).alias('WEIGHT')).orderBy('SRC')
link_weight = gr0_link.join(gr0_weight, on=['SRC'], how='inner')
link_weight.show(5)

+----+----+-------------------+
| SRC|DEST|             WEIGHT|
+----+----+-------------------+
| 463|1508|                1.0|
| 471| 252|                1.0|
|1088|1194|                1.0|
|1342|1207|0.14285714285714285|
|1342|2151|0.14285714285714285|
+----+----+-------------------+
only showing top 5 rows



In [6]:
# Hyperparameters
id_count = gr0_id.count() # Number of URLs
TELEPORT_CONST = 0.85
ERROR_PARAM = 0.005
MAX_ITER = 20

In [7]:
# Compute the page rank matrix with deadends
link_rdd = link_weight.rdd.map(lambda x: tuple([x.DEST, x.SRC, x.WEIGHT * TELEPORT_CONST]))
link_rdd.cache()
link_matrix = CoordinateMatrix(link_rdd.map(lambda x: MatrixEntry(*x)), numRows=4772, numCols=4772)

# The transition matrix of the web graph is represented as a 4 × 4 block matrix
link_matrix = link_matrix.toBlockMatrix(4, 4)

In [8]:
# Compute the identity vector (row)
identity_row_rdd = sc.parallelize([0, i, 1] for i in range(id_count))
identity_row_rdd.cache()
identity_row_vector = CoordinateMatrix(identity_row_rdd.map(lambda x: MatrixEntry(*x)))
identity_row_vector = identity_row_vector.toBlockMatrix(1, 4)

# Compute the identity vector (column)
identity_col_rdd = sc.parallelize([i, 0, 1] for i in range(id_count))
identity_col_rdd.cache()
identity_col_vector = CoordinateMatrix(identity_col_rdd.map(lambda x: MatrixEntry(*x)))
identity_col_vector = identity_col_vector.toBlockMatrix(4, 1)

In [9]:
# Compute the normalization vector (row)
norm_rdd = sc.parallelize([0, i, 1 / id_count] for i in range(id_count))
norm_rdd.cache()
norm_vector = CoordinateMatrix(norm_rdd.map(lambda x: MatrixEntry(*x)))
norm_vector = norm_vector.toBlockMatrix(1, 4)

# Compute the normalization scalar 
scala_rdd = sc.parallelize([[0, 0, 1 / id_count]])
scala_rdd.cache()
scala_vector = CoordinateMatrix(scala_rdd.map(lambda x: MatrixEntry(*x)))
scala_vector = scala_vector.toBlockMatrix(1, 1)

In [16]:
# Set the initial rank vector to a uniform vector
rank_vector = sc.parallelize([[i, 0, 1/id_count] for i in range(id_count)])
rank_vector.cache()
rank_vector = CoordinateMatrix(rank_vector.map(lambda x: MatrixEntry(*x)))

# the (evolving) rank vector is represented as a 4 × 1 block matrix
block_vector = rank_vector.toBlockMatrix(4, 1)

# Page Rank Algorithm
for i in range(MAX_ITER):
  vector_new = link_matrix.multiply(block_vector)
  # Apply random teleports with probability 1.0 to dead ends in the web graph
  norm_scala = scala_vector.subtract(norm_vector.multiply(vector_new))
  vector_new = vector_new.add(identity_col_vector.multiply(norm_scala))
  # Stop when the rank vector converges (with the error parameter)
  difference = block_vector.subtract(vector_new)
  diff_vector = difference.toCoordinateMatrix().entries.take(id_count)
  diff_values = [abs(row.value) for row in diff_vector]
  print('Iteration ', (i + 1), ' - Difference: ', sum(diff_values))
  if sum(diff_values) < ERROR_PARAM:
    break
  block_vector = vector_new

Iteration  1  - Difference:  0.3732926014706724
Iteration  2  - Difference:  0.09761719349349852
Iteration  3  - Difference:  0.038346948829111015
Iteration  4  - Difference:  0.016518039086530042
Iteration  5  - Difference:  0.0112352333786202
Iteration  6  - Difference:  0.008143260675869868
Iteration  7  - Difference:  0.0062537178308174814
Iteration  8  - Difference:  0.004913888314557025


In [17]:
# Return the ranking scores of the first 20 webpages (i.e., webpages from ID 0 to ID 19)
result = vector_new.toCoordinateMatrix().entries.take(4772)
first_20 = [(entry.i, entry.value) for entry in result if entry.i in range(0, 20)]
first_20.sort(key=lambda tup: tup[0])
print('(ID, RANK)')
for tup in first_20:
  print(tup)

(ID, RANK)
(0, 0.0001468037937985509)
(1, 0.0001468037937985509)
(2, 0.0001468037937985509)
(3, 0.0006480517541762962)
(4, 0.0001468037937985509)
(5, 0.00020935218034406075)
(6, 0.0004467917834527771)
(7, 0.0001468037937985509)
(8, 0.0009417640018460071)
(9, 0.0004279767198264681)
(10, 0.00039699733998059035)
(11, 0.0004121475106143708)
(12, 0.000690102389368309)
(13, 0.0002719005668895706)
(14, 0.00035642018516509737)
(15, 0.0001468037937985509)
(16, 0.0008644134438251674)
(17, 0.00020935218034406075)
(18, 0.00022722314792849216)
(19, 0.0001468037937985509)


In [18]:
sc.stop()