# Homework 3, Question 2
## Implementing PageRank and HITS

### Setup

In [1]:
!pip install pyspark
!pip install -U -q PyDrive2
#the output 'xxx is not a symbolic link' will not affect your implementation or execution
#to fix 'xxx is not a symbolic link', you can uncomment the lines starting from !mv xxxx
#you may need to replace xxx.11 with the correct version if other errors come up after colab update
#to get the correct version, use !ls /usr/local/lib to find out
!mv /usr/local/lib/libtbbmalloc_proxy.so.2 /usr/local/lib/libtbbmalloc_proxy.so.2.backup
!mv /usr/local/lib/libtbbmalloc.so.2 /usr/local/lib/libtbbmalloc.so.2.backup
!mv /usr/local/lib/libtbbbind_2_5.so.3 /usr/local/lib/libtbbbind_2_5.so.3.backup
!mv /usr/local/lib/libtbb.so.12 /usr/local/lib/libtbb.so.12.backup
!mv /usr/local/lib/libtbbbind_2_0.so.3 /usr/local/lib/libtbbbind_2_0.so.3.backup
!mv /usr/local/lib/libtbbbind.so.3 /usr/local/lib/libtbbbind.so.3.backup
!ln -s /usr/local/lib/libtbbmalloc_proxy.so.2.11 /usr/local/lib/libtbbmalloc_proxy.so.2
!ln -s /usr/local/lib/libtbbmalloc.so.2.11 /usr/local/lib/libtbbmalloc.so.2
!ln -s /usr/local/lib/libtbbbind_2_5.so.3.11 /usr/local/lib/libtbbbind_2_5.so.3
!ln -s /usr/local/lib/libtbb.so.12.11 /usr/local/lib/libtbb.so.12
!ln -s /usr/local/lib/libtbbbind_2_0.so.3.11 /usr/local/lib/libtbbbind_2_0.so.3
!ln -s /usr/local/lib/libtbbbind.so.3.11 /usr/local/lib/libtbbbind.so.3
#If error related to the above execution occurs, you can try commenting out the above 12 lines under pip install PyDrive2 (not included)

# !sudo ldconfig
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=e30a30b2fcbf28a40ef38a8da0abfb16c1bf737fd83e08ee4d6c512fb794e619
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.7/46.7 kB[0m [31m821.6 kB/s[0m eta [36m0:00:00[0m
[?25hThe following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss

In [2]:
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [3]:
id='1Qm0rNAE-X3eiqhHb8nK7KcnipV61voNH'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('graph-small.txt')

id='1G3WFiu1X8Wb6PE6Di0V_EFHY4vxAEQwB'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('graph-full.txt')

In [4]:
# Let's import the libraries we will need
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import DoubleType, ArrayType, FloatType

In [5]:
# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

### (a) PageRank Implementation - Sanity check with a smaller dataset, graph-small.txt

In [39]:
data = sc.textFile("graph-small.txt")

# Create a list of all egdes
data = data.map(lambda x : list(int(i) for i in x.split('\t'))).collect()

# List of unique edges
edges = []
for edge in data:
    if edge not in edges:
        edges.append(edge)

edgesRDD = sc.parallelize(edges)

In [7]:
# Create a list of nodes
nodesRDD = edgesRDD.flatMap(lambda edge: edge).distinct()

# Create a dictionary to store the outgoing degree of each node
out_degree_dict = edgesRDD.map(lambda edge: (edge[0], 1)).reduceByKey(lambda a, b: a + b).collectAsMap()

# Function to calculate the matrix element
def calculate_matrix_element(edge):
    source, dest = edge
    out_degree = out_degree_dict.get(source)
    if out_degree == 0:
        return (dest, source, 0.0)
    else:
        return (dest, source, 1 / out_degree)

# Calculate matrix elements in a form of (i, j, el)
matrix_elementsRDD = edgesRDD.map(lambda edge : calculate_matrix_element(edge))

# Create a dictionary of matrix elements of form of (i, j) : el
matrix_elements_dict = {(i, j): el for (i, j, el) in matrix_elementsRDD.collect()}

In [8]:
# Function to create matrix elements with default value 0 for missing elements
def create_matrix(index_tuple):
    i, j = index_tuple
    el = matrix_elements_dict.get((i, j), 0)
    return i, j, el

# Create a matrix of all indices i, j from 1 to 100 and store them in RDD as (i, j, el)
indices = sc.parallelize([(i, j) for i in range(1, 101) for j in range(1, 101)])
matrix_elementsRDD = indices.map(lambda index : create_matrix(index)).map(lambda x : (x[0], x[1], x[2]))

In [9]:
# Create 100-by-100 matrix and store it as RDD
matrix = [[0] * 100 for _ in range(100)]
for i, j, el in matrix_elementsRDD.collect():
    matrix[i - 1][j - 1] = el

matrixRDD = sc.parallelize(matrix)

In [10]:
# Initial vector r
r = [1/100] * 100

# Set beta to 0.8
beta = 0.8
n = 100

# Perform matrix-vector multiplication
def iteration(v1, v2):
    result = (1 - beta) / n
    for i, j in zip(v1, v2):
        result += beta * i * j
    return result

# Iterate 40 times
for _ in range(40):
  r = matrixRDD.map(lambda line: iteration(line, r)).collect()

np.array(r).argmax()

52

In [11]:
r[52]

0.03573120223267159

### (a) PageRank Implementation

In [40]:
data = sc.textFile("graph-full.txt")

# Create a list of all egdes
data = data.map(lambda x : list(int(i) for i in x.split('\t'))).collect()

# List of unique edges
edges = []
for edge in data:
    if edge not in edges:
        edges.append(edge)

edgesRDD = sc.parallelize(edges)

In [13]:
# Create a list of nodes
nodesRDD = edgesRDD.flatMap(lambda edge: edge).distinct()

# Create a dictionary to store the outgoing degree of each node
out_degree_dict = edgesRDD.map(lambda edge: (edge[0], 1)).reduceByKey(lambda a, b: a + b).collectAsMap()

# Calculate matrix elements in a form of (i, j, el)
matrix_elementsRDD = edgesRDD.map(lambda edge : calculate_matrix_element(edge))

# Create a dictionary of matrix elements of form of (i, j) : el
matrix_elements_dict = {(i, j): el for (i, j, el) in matrix_elementsRDD.collect()}

In [14]:
# Create a matrix of all indices i, j from 1 to 1000 and store them in RDD as (i, j, el)
indices = sc.parallelize([(i, j) for i in range(1, 1001) for j in range(1, 1001)])
matrix_elementsRDD = indices.map(lambda index : create_matrix(index)).map(lambda x : (x[0], x[1], x[2]))

In [15]:
# Create 1000-by-1000 matrix and store it as RDD
matrix = [[0] * 1000 for _ in range(1000)]
for i, j, el in matrix_elementsRDD.collect():
    matrix[i - 1][j - 1] = el

matrixRDD = sc.parallelize(matrix)

In [16]:
# Initial vector of all ones
r = [1/1000] * 1000

# Set beta to 0.8
beta = 0.8
n = 1000

# Perform matrix-vector multiplication
def iteration(v1, v2):
    result = (1 - beta) / n
    for i, j in zip(v1, v2):
        result += beta * i * j
    return result

for _ in range(40):
  r = matrixRDD.map(lambda line: iteration(line, r)).collect()

In [17]:
indexed_list = [(el, idx) for idx, el in enumerate(r)]
sorted_list = sorted(indexed_list, key=lambda x: x[0], reverse=True)
indices_of_max_elements = sorted_list[:5]
indices_of_min_elements = sorted_list[-5:]

In [18]:
indices_of_max_elements

[(0.0020202911815182193, 262),
 (0.0019433415714531503, 536),
 (0.001925447807166263, 964),
 (0.0018526340162417312, 242),
 (0.0018273721700645148, 284)]

In [19]:
indices_of_min_elements

[(0.00038779848719291705, 407),
 (0.00035481538649301454, 423),
 (0.0003531481051059628, 61),
 (0.00035135689375165774, 92),
 (0.0003286018525215297, 557)]

### (b) HITS Implementation - Sanity check with a smaller dataset, graph-small.txt

In [20]:
data = sc.textFile("graph-small.txt")

# Create a list of all egdes
data = data.map(lambda x : list(int(i) for i in x.split('\t'))).collect()

# List of unique edges
edges = []
for edge in data:
    if edge not in edges:
        edges.append(edge)

edgesRDD = sc.parallelize(edges).map(lambda x : (x[0], x[1], 1))

In [21]:
# Create a dictionary of matrix elements of form of (i, j) : el
matrix_elements_dict = {(i, j): el for (i, j, el) in edgesRDD.collect()}

In [22]:
# Create a matrix of all indices i, j from 1 to 100 and store them in RDD as (i, j, el)
indices = sc.parallelize([(i, j) for i in range(1, 101) for j in range(1, 101)])
matrix_elementsRDD = indices.map(lambda index : create_matrix(index)).map(lambda x : (x[0], x[1], x[2]))

In [23]:
# Create 100-by-100 matrix and store it as RDD
matrix = [[0] * 100 for _ in range(100)]
for i, j, el in matrix_elementsRDD.collect():
    matrix[i - 1][j - 1] = el

matrixRDD = sc.parallelize(matrix)

In [24]:
transpose = [list(x) for x in np.array(matrix).T]
transposeRDD = sc.parallelize(transpose)

In [25]:
def dot_product(v1, v2):
    result = 0
    for i, j in zip(v1, v2):
        result += i * j
    return result

In [26]:
# Initial vector h
h = [1] * 100

for _ in range(40):
    # Compute vector a and scale so the largest value in the vector a has value 1
    a = transposeRDD.map(lambda line: dot_product(line, h))
    m_a = a.max()
    a = a.map(lambda x : x / m_a).collect()

    # Compute vector h and scale so the largest value in the vector h has value 1
    h = matrixRDD.map(lambda line: dot_product(line, a))
    m_h = h.max()
    h = h.map(lambda x : x / m_h).collect()

In [27]:
np.array(h).argmax()

58

In [28]:
np.array(a).argmax()

65

### (b) HITS Implementation

In [47]:
matrix_elementsRDD.take(3)

[(1, 1, 0), (1, 2, 1), (1, 3, 0)]

In [44]:
data = sc.textFile("graph-full.txt")

# Create a list of all egdes
data = data.map(lambda x : list(int(i) for i in x.split('\t'))).collect()

# List of unique edges
edges = []
for edge in data:
    if edge not in edges:
        edges.append(edge)

edgesRDD = sc.parallelize(edges).map(lambda x : (x[0], x[1], 1))

# Create a dictionary of matrix elements of form of (i, j) : el
matrix_elements_dict = {(i, j): el for (i, j, el) in edgesRDD.collect()}

# Create a matrix of all indices i, j from 1 to 1000 and store them in RDD as (i, j, el)
indices = sc.parallelize([(i, j) for i in range(1, 1001) for j in range(1, 1001)])
matrix_elementsRDD = indices.map(lambda index : create_matrix(index)).map(lambda x : (x[0], x[1], x[2]))

# Create 1000-by-1000 matrix and store it as RDD
matrix = [[0] * 1000 for _ in range(1000)]
for i, j, el in matrix_elementsRDD.collect():
    matrix[i - 1][j - 1] = el

matrixRDD = sc.parallelize(matrix)

# Transpose of matrix
transpose = [list(x) for x in np.array(matrix).T]
transposeRDD = sc.parallelize(transpose)

# Initial vector h
h = [1] * 100

for _ in range(40):
    # Compute vector a and scale so the largest value in the vector a has value 1
    a = transposeRDD.map(lambda line: dot_product(line, h))
    m_a = a.max()
    a = a.map(lambda x : x / m_a).collect()

    # Compute vector h and scale so the largest value in the vector h has value 1
    h = matrixRDD.map(lambda line: dot_product(line, a))
    m_h = h.max()
    h = h.map(lambda x : x / m_h).collect()

In [30]:
indexed_list = [(el, idx) for idx, el in enumerate(h)]
sorted_list = sorted(indexed_list, key=lambda x: x[0], reverse=True)
indices_of_max_elements_h = sorted_list[:5]
indices_of_min_elements_h = sorted_list[-5:]

In [31]:
indices_of_max_elements_h

[(1.0, 839),
 (0.949961862490654, 154),
 (0.8986645288972263, 233),
 (0.8634171101843793, 388),
 (0.8632841092495216, 471)]

In [32]:
indices_of_min_elements_h

[(0.07678413939216452, 888),
 (0.0660265937341849, 538),
 (0.06453117646225179, 140),
 (0.05779059354433016, 834),
 (0.042066854890936534, 22)]

In [33]:
indexed_list = [(el, idx) for idx, el in enumerate(a)]
sorted_list = sorted(indexed_list, key=lambda x: x[0], reverse=True)
indices_of_max_elements_a = sorted_list[:5]
indices_of_min_elements_a = sorted_list[-5:]

In [34]:
indices_of_max_elements_a

[(1.0, 892),
 (0.96355728496344, 15),
 (0.9510158161074022, 798),
 (0.9246703586198445, 145),
 (0.8998661973604051, 472)]

In [35]:
indices_of_min_elements_a

[(0.08571673456144879, 909),
 (0.08171239406816946, 23),
 (0.07544228624641902, 461),
 (0.06653910487622794, 134),
 (0.0560831637760762, 18)]