In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xf spark-3.3.2-bin-hadoop2.tgz

In [2]:
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

import findspark
findspark.init("spark-3.3.2-bin-hadoop2")# SPARK_HOME

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [3]:
def mapper(line):
    from_node, to_node = map(int, line.strip().split('\t'))

    # Adjacency list
    yield (from_node, (to_node, 0))

    # Emit to add to PageRank
    yield (to_node, 1)

def reducer(values):
    total_pr = 0
    adjalist = []

    for v in values:
        if isinstance(v, tuple):  # Add to adjacency list
            adjalist.append(v[0])
        else:  # Add to PageRank
            total_pr += v

    return (total_pr, adjalist)

def main():
    conf = SparkConf().setAppName("PageRank")
    sc = SparkContext(conf=conf)

    input = sc.textFile("web-Stanford.txt").filter(lambda line: not line.startswith("#"))

    mapped = input.flatMap(mapper).groupByKey()

    reduced = mapped.mapValues(list).mapValues(reducer)

    results = reduced.collect()
    for result in results:
        print(result)

if __name__ == "__main__":
    main()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
(216857, (2, [89073, 134832, 212100, 231363, 255852]))
(227547, (2, [89073, 134832, 212100, 223516, 231363]))
(216881, (2, [152355, 212114, 267728]))
(227527, (2, [152355, 212114, 223509]))
(223509, (2, [152355, 227527, 231940]))
(212121, (0, [251876, 272765]))
(212127, (1, [50785, 60210, 89073, 93989, 235381]))
(216873, (1, [50785, 60210, 89073, 93989, 235381]))
(231951, (1, [50785, 60210, 89073, 93989, 235381]))
(236749, (1, [50785, 60210, 89073, 93989, 235381]))
(242975, (1, [50785, 60210, 89073, 93989, 97934, 235381]))
(216825, (2, [212132, 243185]))
(243185, (2, [216825, 238712]))
(212139, (0, [224029, 226411]))
(223395, (2, [134735, 216834, 227638]))
(221961, (2, [89073, 134832, 212165, 224628, 231363]))
(223459, (2, [89073, 134832, 216788, 227606, 231363]))
(267793, (2, [91620, 192935, 271754, 278679]))
(212181, (2, [133133, 216775, 238743]))
(216775, (2, [133133, 212181, 223439]))
(238743, (2, [133133, 212181, 243

In [4]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

class Mapper:
    def __init__(self):
        self.H = {}

    def map(self, line):
        from_node, to_node = map(int, line.strip().split('\t'))

        # Adjacency list
        yield (from_node, (to_node, 0))

        # Add to PageRank in H
        self.H[to_node] = self.H.get(to_node, 0) + 1

    def close(self):
        for to_node, pr_add in self.H.items():
            yield (to_node, pr_add)

def reducer(values):
    total_pr = 0
    adjalist = []
    

    for v in values:
        if isinstance(v, tuple):  # Add to adjacency list
            adjalist.append(v[0])
        else:  # Add to PageRank
            total_pr += v

    return (total_pr, adjalist)

def main(sc):
    input = sc.textFile("web-Stanford.txt").filter(lambda line: not line.startswith("#"))

    mapper = Mapper()

    mapped = input.flatMap(mapper.map).union(sc.parallelize(mapper.close())).groupByKey()

    reduced = mapped.mapValues(list).mapValues(reducer)

    results = reduced.collect()
    for result in results:
        print(result)

if __name__ == "__main__":
    main(sc)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
(206447, (0, [250026, 261241]))
(206459, (0, [34573, 38342, 81435, 105607, 167295, 198090, 214128, 226411, 234704, 245659]))
(206463, (0, [226411]))
(252855, (0, [206472]))
(206731, (0, [76114, 84428, 89511, 146224, 170281, 170578, 173057, 185718, 203662, 204925, 226411, 258860, 266335]))
(264111, (0, [206491, 230014, 234341, 274255]))
(225487, (0, [206494, 265699]))
(274911, (0, [93778, 206517, 244093]))
(229503, (0, [22258, 206520, 234789]))
(229495, (0, [206525, 231550, 234795]))
(209191, (0, [206546]))
(206599, (0, [14628, 17781, 62478, 77999, 96745, 120708, 131346, 137632, 168274, 176790, 181701, 183004, 218757, 221087, 229139, 247241, 250751, 259455, 268537]))
(206607, (0, [48972, 76387, 209222]))
(206615, (0, [84428, 144489, 170281, 170578, 173057, 185718, 204925, 226411, 266335]))
(229471, (0, [67756, 69358, 206628, 234834, 241454]))
(216295, (0, [212656, 228076, 243520]))
(206651, (0, [84906, 115987, 116041]))
(2