# Author:

- Huu Khang Nguyen - 7402909
- hkn878@uowmail.edu.au


# Environment:

- Python 3.10.6
- Ubuntu 22.04.2 LTS x86_64

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()
spark_context = spark.sparkContext

23/05/30 22:46:53 WARN Utils: Your hostname, huukhang1512-B550I-AORUS-PRO-AX resolves to a loopback address: 127.0.1.1; using 192.168.0.162 instead (on interface wlp6s0)
23/05/30 22:46:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/30 22:46:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/30 22:46:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Load data into RDD

In [3]:
# Download the webpage dataset into the local machine
WEB_PAGE_DATASET = './gr0.California.txt'

# Read into RDD
lines = spark_context.textFile(WEB_PAGE_DATASET)

In [4]:
lines_map = lines.map(lambda line: line.strip().split())

In [5]:
def add_1(line):
    (_, src, dest) = line.strip().split()
    return (src, dest, 1)

In [6]:
def process_web_page(line):
    (_, id, url) = line.strip().split()
    return (id, url)

In [7]:
web_page = lines.filter(lambda line: "n" in line[0]).map(process_web_page)
web_page_graph = lines.filter(lambda line: "e" in line[0]).map(add_1)


In [8]:
len(web_page.collect())

                                                                                

9664

# Reduce Step

## Reduce by out-degree

### Out-degree

In [9]:
from operator import add
out_degree_map = web_page_graph \
    .map(lambda x: (x[0], x[2])) \
    .union(web_page.map(lambda x: (x[0], 0))) \
    .reduceByKey(add)


## Generate the transition matrix as a BlockMatrix

In [10]:
import numpy as np
from pyspark.mllib.linalg.distributed import *
from pyspark.mllib.linalg import *

In [11]:
beta = 0.85
total_web_pages = web_page.count()  #size of transition matrix (number of pages)
n_blocks = 5 #number of blocks per row/column (5 for 5x5 and 5x1 block matrices)
block_size = int(np.ceil(total_web_pages/n_blocks)) # size of each block

In [12]:
total_web_pages

9664

In [13]:
block_size

1933

### Create transition matrix

Inner join to get the path (src, dest) along with the total out degrees. Innner join will remove dangling node (node with no outdegree as well)

In [14]:
joined_total_out_degrees = out_degree_map.join(web_page_graph)

In [15]:
joined_total_out_degrees.take(5)

[('4', (23, '597')),
 ('4', (23, '598')),
 ('4', (23, '599')),
 ('4', (23, '600')),
 ('4', (23, '601'))]

In [16]:
tm_rdd = joined_total_out_degrees.map(lambda vertex: (vertex[1][1], vertex[0], 1/vertex[1][0])) #create transition matrix ([row_indx, col_indx, tran_prob])

In [17]:
tm_rdd.take(10)

[('597', '4', 0.043478260869565216),
 ('598', '4', 0.043478260869565216),
 ('599', '4', 0.043478260869565216),
 ('600', '4', 0.043478260869565216),
 ('601', '4', 0.043478260869565216),
 ('602', '4', 0.043478260869565216),
 ('603', '4', 0.043478260869565216),
 ('604', '4', 0.043478260869565216),
 ('605', '4', 0.043478260869565216),
 ('606', '4', 0.043478260869565216)]

In [18]:
tm = CoordinateMatrix(tm_rdd.map(lambda x: MatrixEntry(*x)), total_web_pages, total_web_pages).toBlockMatrix(n_blocks,n_blocks)
tm.cache()
tm.blocks.take(5)

[((25, 183), SparseMatrix(5, 5, [0, 1, 1, 1, 1, 1], [3], [0.3333], 0)),
 ((11, 500),
  SparseMatrix(5, 5, [0, 0, 1, 1, 2, 3], [4, 4, 4], [0.0769, 0.5, 0.1111], 0)),
 ((26, 301), SparseMatrix(5, 5, [0, 0, 0, 0, 1, 1], [3], [0.0667], 0)),
 ((59, 599), SparseMatrix(5, 5, [0, 0, 0, 0, 1, 1], [3], [0.1667], 0)),
 ((16, 692), SparseMatrix(5, 5, [0, 0, 0, 1, 1, 1], [4], [0.3333], 0))]

## Generate rank vector as a BlockMatrix

In [19]:
rv_rdd0 = spark_context.parallelize(range(total_web_pages)).map(lambda x : [x, 0, 1/total_web_pages])
print('Initial rank vector:', rv_rdd0.map(lambda x: x[2]).collect())

Initial rank vector: [0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.00010347682119205298, 0.0001034768211920

In [20]:
ranked_vector = CoordinateMatrix(rv_rdd0.map(lambda x: MatrixEntry(*x)), total_web_pages, 1).toBlockMatrix(n_blocks,1)

In [21]:
ranked_vector.blocks.count() # should matched with 

                                                                                

1933

In [22]:
tm.blocks.count()

11031

## Compute the PageRank iteratively

In [23]:
def print_vec(rdd):
    return rdd.map(lambda x : x[1]).reduce(lambda x,y: np.concatenate((x,y), axis=None))

def create_dense_matrix(block):
    return (block[0], DenseMatrix(block_size, 1, block[1]))

num_it = 10 # number of iteration

for i in range(num_it):
    ranked_vector = tm.multiply(ranked_vector)
    ranked_vector_rdd = ranked_vector.blocks.map(lambda x: (x[0], x[1].toArray()))
    # Adjust the rank vector with teleportation
    ranked_vector_rdd = ranked_vector_rdd.map(lambda x: (x[0], x[1]*beta + (1-beta)/total_web_pages))
    total = ranked_vector_rdd.map(lambda x: np.sum(x[1])).reduce(add)
    if total > 0:
        ranked_vector_rdd = ranked_vector_rdd.map(lambda x: (x[0], x[1]/total))
    else:
        print("rank vector sum is 0")
        break
    print('rank vector after {} iteration: {}'.format(i+1, print_vec(ranked_vector_rdd)))
    ranked_vector = BlockMatrix(ranked_vector_rdd.map(lambda x: (x[0], DenseMatrix(n_blocks,1,x[1]))), numRows=total_web_pages, rowsPerBlock=n_blocks, colsPerBlock=1)


                                                                                

rank vector after 1 iteration: [3.01056709e-05 3.01056709e-05 3.01056709e-05 ... 3.01056709e-05
 3.01056709e-05 3.01056709e-05]
rank vector after 2 iteration: [8.34276617e-04 8.26903134e-04 8.26903134e-04 ... 3.13067387e-05
 3.13067387e-05 3.13067387e-05]
rank vector after 3 iteration: [4.32597729e-04 4.23959803e-04 4.23959803e-04 ... 3.52683471e-05
 3.52683471e-05 3.52683471e-05]
rank vector after 4 iteration: [7.44946534e-04 7.37118595e-04 7.37118595e-04 ... 2.83710728e-05
 2.83710728e-05 2.83710728e-05]
rank vector after 5 iteration: [3.74771044e-04 3.68953326e-04 3.68953326e-04 ... 2.62114086e-05
 2.62114086e-05 2.62114086e-05]
rank vector after 6 iteration: [4.29474904e-04 4.24517734e-04 4.24517734e-04 ... 2.41744694e-05
 2.41744694e-05 2.41744694e-05]
rank vector after 7 iteration: [2.36742816e-04 2.32384264e-04 2.32384264e-04 ... 2.30461715e-05
 2.30461715e-05 2.30461715e-05]
rank vector after 8 iteration: [2.27588919e-04 2.23633886e-04 2.23633886e-04 ... 2.19363683e-05
 2.19363

In [60]:
local_matrix = ranked_vector.toLocalMatrix()

In [66]:
result = local_matrix.toArray()[0:20]

In [67]:
result

array([[9.96845719e-03],
       [1.69129593e-03],
       [5.15955419e-05],
       [5.80125363e-04],
       [4.42809473e-05],
       [2.19926958e-05],
       [1.42897707e-02],
       [2.05068124e-05],
       [2.21879878e-04],
       [5.63692981e-04],
       [6.12246875e-03],
       [8.44298915e-05],
       [1.47097819e-04],
       [2.05068124e-05],
       [4.30443858e-04],
       [7.46029083e-05],
       [7.30179660e-05],
       [1.13340960e-02],
       [2.22049649e-05],
       [3.41793438e-04]])

In [78]:
for idx, rank in enumerate(result):
    page_name = web_page.filter(lambda page: page[0] == str(idx)).collect()[0][1]
    print(page_name, rank)

http://www.berkeley.edu/ [0.00996846]
http://www.caltech.edu/ [0.0016913]
http://www.realestatenet.com/ [5.15955419e-05]
http://www.ucsb.edu/ [0.00058013]
http://www.washingtonpost.com/wp-srv/national/longterm/50states/ca.htm [4.42809473e-05]
http://www-ucpress.berkeley.edu/ [2.19926958e-05]
http://www.ucr.edu/ [0.01428977]
http://www.tegnetcorporation.com/ [2.05068124e-05]
http://www.research.digital.com/SRC/virtual-tourist/California.html [0.00022188]
http://www.leginfo.ca.gov/calaw.html [0.00056369]
http://www.csun.edu/ [0.00612247]
http://www.calpoly.edu/ [8.44298915e-05]
http://www.calbar.org/ [0.0001471]
http://ideas.uqam.ca/ideas/data/fthcalaec.html [2.05068124e-05]
http://www.sen.ca.gov/ [0.00043044]
http://www.csupomona.edu/ [7.46029083e-05]
http://www.csuchico.edu/ [7.3017966e-05]
http://www.calacademy.org/ [0.0113341]
http://cwis.usc.edu/ [2.22049649e-05]
http://www.usc.edu/CMSI/CalifSF/ [0.00034179]
