#### Name: `Jike Lu`
#### AndrewID: `jikelu`

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

tar: spark-3.3.2-bin-hadoop3.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


Below is the outline of pyspark code for calculating the pagerank of a graph expressed in the edge vector representation.  For this past of the assignment:

1. Complete the code below
2. Write doc string comments for all functions documenting what they do
3. Test your code on other graphs we've worked in previous assignments

I recommend reviewing the (i) the Excel spread sheet calculation we did for simulating the page rank calculation (ii) and slides on pagerank.  Ensure you under the algorithm well before starting on this exercise.

Rather than just stating "Write PySpark code to calculate PageRank" I've provided some helper functions and sample output along the way to provide guidance.  To faciliate grading, do stay with these functions.

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext()

# page rank

In [2]:
def parse_line(line):
    nodes = line.split()
    return (nodes[0], nodes[1:])

In [3]:
def read_graph(fname):
    return sc.textFile(fname).map(parse_line)

In [4]:
read_graph('graph-1.txt').collect()

[('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]

In [5]:
def init_ranks(graph):
    nodes = graph.flatMap(lambda x: [x[0]] + x[1]).distinct()  # Get all unique nodes
    num_nodes = nodes.count()
    return nodes.map(lambda node: (node, 1.0 / num_nodes))

In [6]:
g = read_graph('graph-1.txt')
init_ranks(g).collect()

[('C', 0.25), ('A', 0.25), ('B', 0.25), ('D', 0.25)]

In [12]:
def calc_partials(x):
    node, (links, rank) = x  # Correctly unpack ([neighbors], rank)
    if not links or len(links) == 0:  # Handle nodes with no outgoing links
        return []  # No contributions
    num_links = len(links)  # Calculate number of outgoing links
    return [(link, rank / num_links) for link in links]  # Distribute rank among links


In [8]:
def calc_contribs(ranks, graph):
    return graph.join(ranks).flatMap(calc_partials)

In [9]:
g = read_graph('graph-1.txt')
r = init_ranks(g)

In [10]:
print(g.collect())
print(r.collect())

[('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]
[('C', 0.25), ('A', 0.25), ('B', 0.25), ('D', 0.25)]


Note that when calculating the contributions we have A, B, C, D also paired with 0?  Why?

In [13]:
c1 = calc_contribs(r, g)
print("Joined RDD:", g.join(r).collect())

c1.collect()

Joined RDD: [('A', (['B', 'C'], 0.25)), ('B', (['C'], 0.25)), ('C', (['A'], 0.25)), ('D', (['C'], 0.25))]


[('B', 0.125), ('C', 0.125), ('C', 0.25), ('A', 0.25), ('C', 0.25)]

In [14]:
def calc_ranks(contribs, num_nodes, beta):
    return contribs.reduceByKey(lambda x, y: x + y).mapValues(lambda contrib: beta * contrib + (1 - beta) / num_nodes)

In [15]:
calc_ranks(c1, num_nodes=g.count(), beta=0.85).collect()

[('B', 0.14375), ('A', 0.25), ('C', 0.56875)]

In [18]:
def pagerank(fname, beta=0.85, n=10):
    graph = read_graph(fname)
    print("Graph:", graph.collect())  # Debug

    ranks = init_ranks(graph)
    print("Initial Ranks:", ranks.collect())  # Debug

    for i in range(n):
        contribs = calc_contribs(ranks, graph)
        print(f"Contributions at iteration {i}:", contribs.collect())  # Debug

        ranks = calc_ranks(contribs, num_nodes=graph.count(), beta=beta)
        print(f"Ranks at iteration {i}:", ranks.collect())  # Debug

    return ranks.collect()


In [19]:
sorted(pagerank('graph-1.txt', beta=0.85, n=10))

Graph: [('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]
Initial Ranks: [('C', 0.25), ('A', 0.25), ('B', 0.25), ('D', 0.25)]
Contributions at iteration 0: [('B', 0.125), ('C', 0.125), ('C', 0.25), ('A', 0.25), ('C', 0.25)]
Ranks at iteration 0: [('B', 0.14375), ('A', 0.25), ('C', 0.56875)]
Contributions at iteration 1: [('A', 0.56875), ('B', 0.125), ('C', 0.125), ('C', 0.14375)]
Ranks at iteration 1: [('C', 0.2659375), ('A', 0.5209374999999999), ('B', 0.14375)]
Contributions at iteration 2: [('C', 0.14375), ('A', 0.2659375), ('B', 0.26046874999999997), ('C', 0.26046874999999997)]
Ranks at iteration 2: [('B', 0.25889843749999997), ('C', 0.38108593749999997), ('A', 0.26354687499999996)]
Contributions at iteration 3: [('B', 0.13177343749999998), ('C', 0.13177343749999998), ('C', 0.25889843749999997), ('A', 0.38108593749999997)]
Ranks at iteration 3: [('A', 0.36142304687499993), ('B', 0.149507421875), ('C', 0.36957109374999997)]
Contributions at iteration 4: [('B', 0.1807115234

[('A', 0.3132179320754508),
 ('B', 0.17004084065059966),
 ('C', 0.31595982835913006)]