# Imports

In [None]:
from pyspark import SparkContext

# Helper Function

In [None]:
def parse_links(line):
    """
    Parses a line of pagelinks.txt and returns (source, [targets])
    Example: '1: [49, 5]' -> (1, [49, 5])
    """
    parts = line.strip().split(":")
    source = int(parts[0])
    targets = eval(parts[1])
    return (source, targets)


def compute_contributions(links, ranks):
    """
    Given an RDD of (page, ([neighbors], rank)), compute contributions
    Each neighbor gets an equal share of the page's rank.
    """
    contributions = []

    for page, (neighbors, rank) in links.collect():
        num_links = len(neighbors)
        if num_links == 0:
            continue
        share = rank / num_links
        for neighbor in neighbors:
            contributions.append((neighbor, share))

    return contributions

# Page Rank

## Initialize `SparkContext`

In [None]:
sc = SparkContext("local", "PageRank")

## Load and parse the link structure

In [None]:
lines = sc.textFile("pagelinks.txt")
links = lines.map(parse_links).cache()

## Initialize all ranks to 1.0

In [None]:
ranks = links.mapValues(lambda _: 1.0)

## PageRank Iterations

In [None]:
damping_factor = 0.85
num_iterations = 10

for i in range(num_iterations):
    joined = links.join(ranks)

    contributions_rdd = joined.flatMap(
        lambda x: compute_contributions([(x[0], x[1])], ranks)
    )

    ranks = contributions_rdd.reduceByKey(lambda a, b: a + b).mapValues(
        lambda rank: (1 - damping_factor) + damping_factor * rank
    )

## Collect and sort final ranks

In [None]:
final_ranks = ranks.collect()
sorted_ranks = sorted(final_ranks, key=lambda x: x[1], reverse=True)

## Output top and bottom results

In [None]:
print("\nTop 5 nodes by PageRank:")
for item in sorted_ranks[:5]:
    print(f"Node {item[0]} → Rank: {item[1]:.5f}")

print("\nBottom 5 nodes by PageRank:")
for item in sorted_ranks[-5:]:
    print(f"Node {item[0]} → Rank: {item[1]:.5f}")

## Print highest and lowest

In [None]:
highest_node, highest_score = sorted_ranks[0]
lowest_node, lowest_score = sorted_ranks[-1]

print(f"\nHighest Rank: Node {highest_node} with score {highest_score:.5f}")
print(f"Lowest Rank: Node {lowest_node} with score {lowest_score:.5f}")

## Stop `SparkContext`

In [None]:
sc.stop()