Preliminary start-up code:

In [1]:
import json
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext('local', 'PageRank')

In [2]:
# Load the links from polblogs_edges.jsonl
# Each line is a JSON object like {"1": [3]}
# Create LinksRDD as an RDD of (vertex_id, list of target vertex_ids)
LinksRDD = sc.textFile('polblogs_edges.jsonl') \
    .map(lambda x: json.loads(x)) \
    .flatMap(lambda x: x.items()) \
    .map(lambda x: (int(x[0]), list(map(int, x[1]))))

# Partition LinksRDD for better performance and persist it
numPartitions = 12  # Adjust based on available cores
LinksRDD = LinksRDD.partitionBy(numPartitions).persist()

In [3]:
# Load the names from polblogs_nodes.jsonl
# Each line is a JSON object like {"100monkeystyping.com": 1}
# Create namesRDD as an RDD of (vertex_id, URL)
namesRDD = sc.textFile('polblogs_nodes.jsonl') \
    .map(lambda x: json.loads(x)) \
    .flatMap(lambda x: x.items()) \
    .map(lambda x: (int(x[1]), x[0]))

In [4]:
# Get the total number of nodes
N = namesRDD.count()

# Initialize RankRDD with initial rank 1/N for each node
RankRDD = namesRDD.map(lambda x: (x[0], 1.0 / N))

# Partition RankRDD
RankRDD = RankRDD.partitionBy(numPartitions)

                                                                                

In [5]:
# --------------------- Define getContributions Function ------------------------
def getContributions(linksAndRank):
    j = linksAndRank[0]  # Current node ID
    targets = linksAndRank[1][0]  # List of target node IDs
    rankj = linksAndRank[1][1]  # Rank of node j
    odegj = float(len(targets))  # Out-degree of node j
    # For each target node i, emit (i, rankj / odegj)
    return [ (i, rankj / odegj) for i in targets ]

In [6]:
# --------------------- PageRank Iterations ------------------------

# Define the list of iteration counts
iterations_list = [10, 20, 30, 40]

for max_iter in iterations_list:
    # Re-initialize RankRDD for each run
    RankRDD = namesRDD.map(lambda x: (x[0], 1.0 / N))
    RankRDD = RankRDD.partitionBy(numPartitions)

    iter = 1
    while iter <= max_iter:
        # Join LinksRDD with RankRDD to get (node_id, (list of targets, rank))
        LinksRankInfo = LinksRDD.join(RankRDD)

        # Compute contributions from each node to its targets
        flatContributions = LinksRankInfo.flatMap(getContributions)

        # Sum contributions by target node ID to get new ranks
        RankRDD = flatContributions.reduceByKey(lambda x, y: x + y)

        iter += 1

    # After max_iter iterations, get the top 5 pages
    # Join RankRDD with namesRDD to get URLs
    RankWithNames = RankRDD.join(namesRDD)

    # Sort by rank descending and take top 5 nodes
    top5 = RankWithNames.sortBy(lambda x: x[1][0], ascending=False).take(5)

    # Display the results
    print(f"Top 5 pages after {max_iter} iterations:")
    for node_id, (rank, url) in top5:
        print(f"Node ID: {node_id}, URL: {url}, Rank: {rank}")
    print("\n")


                                                                                

Top 5 pages after 10 iterations:
Node ID: 798, URL: andrewsullivan.com, Rank: 0.051298048032891765
Node ID: 990, URL: freerepublic.com, Rank: 0.03403194299683014
Node ID: 1086, URL: kausfiles.com, Rank: 0.03121307385535634
Node ID: 1067, URL: jewishworldreview.com, Rank: 0.030829633782859366
Node ID: 514, URL: politicalwire.com, Rank: 0.03048333432899019




                                                                                

Top 5 pages after 20 iterations:
Node ID: 798, URL: andrewsullivan.com, Rank: 0.07280718174008817
Node ID: 990, URL: freerepublic.com, Rank: 0.04886749550791796
Node ID: 1086, URL: kausfiles.com, Rank: 0.044045937358357344
Node ID: 514, URL: politicalwire.com, Rank: 0.04373376487381114
Node ID: 1067, URL: jewishworldreview.com, Rank: 0.04203850143962231




                                                                                

Top 5 pages after 30 iterations:
Node ID: 798, URL: andrewsullivan.com, Rank: 0.08204721632733827
Node ID: 990, URL: freerepublic.com, Rank: 0.05516691702547013
Node ID: 1086, URL: kausfiles.com, Rank: 0.04954698693272156
Node ID: 514, URL: politicalwire.com, Rank: 0.04950982323038244
Node ID: 1067, URL: jewishworldreview.com, Rank: 0.04681784687420618






Top 5 pages after 40 iterations:
Node ID: 798, URL: andrewsullivan.com, Rank: 0.08602349832670199
Node ID: 990, URL: freerepublic.com, Rank: 0.05786922837051995
Node ID: 514, URL: politicalwire.com, Rank: 0.05200416472793629
Node ID: 1086, URL: kausfiles.com, Rank: 0.05191345030735264
Node ID: 1067, URL: jewishworldreview.com, Rank: 0.04887176255571141




                                                                                