In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=49823da9f1d68bca6d87bcbcd166f4a3d4c85618679831f0eb2b1f4a4b298636
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max as max_

In [4]:
conf = SparkConf().setAppName("HITS")
sc = SparkContext.getOrCreate()

In [6]:
input_edges = sc.textFile("drive/MyDrive/A3-data/graph-full.txt")

def parse_edge(edge):
    nodes = edge.split("\t")
    return (int(nodes[0]), int(nodes[1]))

def parse_edge_2(edge):
    nodes = edge.split("\t")
    return (int(nodes[1]), int(nodes[0]))

edges = input_edges.map(parse_edge).distinct()
reversed_edges = input_edges.map(parse_edge_2).distinct()

nodes = edges.flatMap(lambda x: (x[0], x[1])).distinct()

edges = edges.groupByKey().cache()
reversed_edges = reversed_edges.groupByKey().cache()

hubs = nodes.map(lambda node: (node, 1.0))
hubs_dict = hubs.collectAsMap()
hubs_sc = sc.broadcast(hubs_dict)

def compute_contribs(neighbors, rank):
    for neighbor in neighbors:
        yield (neighbor, rank)

iterations = 40

for iteration in range(iterations):
    auths = edges.flatMap(
        lambda edge_hub: compute_contribs(edge_hub[1], hubs_sc.value.get(edge_hub[0])
    )).reduceByKey(lambda x, y: x + y)

    max_auth = auths.values().max()
    auths = auths.mapValues(lambda rank: rank / max_auth)
    auths_dict = auths.collectAsMap()
    auths_sc = sc.broadcast(auths_dict)

    hubs = reversed_edges.flatMap(
        lambda auth_edge: compute_contribs(auth_edge[1], auths_sc.value.get(auth_edge[0]))
    ).reduceByKey(lambda x, y: x + y)

    max_hub = hubs.values().max()
    hubs = hubs.mapValues(lambda rank: rank / max_hub)
    hubs_dict = hubs.collectAsMap()
    hubs_sc = sc.broadcast(hubs_dict)

hub_scores = hubs.collect()
auth_scores = auths.collect()

hub_scores.sort(key=lambda x: -x[1])
auth_scores.sort(key=lambda x: -x[1])

print("Top 5 nodes by hub score:")
for i in range(5):
    print(hub_scores[i])

print("Bottom 5 nodes by hub score:")
for i in range(1, 6):
    print(hub_scores[-i])

print("Top 5 nodes by authority score:")
for i in range(5):
    print(auth_scores[i])

print("Bottom 5 nodes by authority score:")
for i in range(1, 6):
    print(auth_scores[-i])



Top 5 nodes by hub score:
(840, 1.0)
(155, 0.9499618624906541)
(234, 0.8986645288972263)
(389, 0.863417110184379)
(472, 0.8632841092495217)
Bottom 5 nodes by hub score:
(23, 0.04206685489093653)
(835, 0.05779059354433016)
(141, 0.0645311764622518)
(539, 0.06602659373418493)
(889, 0.07678413939216454)
Top 5 nodes by authority score:
(893, 1.0)
(16, 0.9635572849634397)
(799, 0.9510158161074016)
(146, 0.9246703586198443)
(473, 0.8998661973604049)
Bottom 5 nodes by authority score:
(19, 0.05608316377607618)
(135, 0.06653910487622794)
(462, 0.075442286246419)
(24, 0.08171239406816944)
(910, 0.08571673456144875)
