## Important
1. Run the cell below once and restart the kernel.

In [2]:
%pip -qqq install pyspark python-dotenv

Note: you may need to restart the kernel to use updated packages.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf, collect_list, explode, sum as spark_sum
from pyspark.sql.types import ArrayType, FloatType, BooleanType, StructType, StructField, StringType
import os
import json
from dotenv import dotenv_values

In [2]:
config = dotenv_values("../.env")
config.items()

odict_items([('SPLIT_DATASET_DIR', '/dataset/splitupDataset'), ('LOG_DIR', '/Crawler/logs'), ('OUTPUT_DIR', '/Crawler/output'), ('SCRAPED_DATA', '../Crawler/output')])

**Graph Class**


In [3]:
class Graph:
    def __init__(self):
        self.graph = {}

    def add_edge(self, u, v):
        if u in self.graph:
            self.graph[u].append(v)
        else:
            self.graph[u] = [v]
    def __str__(self):
        result = ""
        for node, neighbors in self.graph.items():
            result += f"{node}: {neighbors}\n"
        return result

**Graph Builder**


In [4]:
def build_graph_from_json_folder(folder_path):
    graph = Graph()
    print("Graph building started")
    for filename in os.listdir(folder_path):
        if filename.endswith(".json"):
            file_path = os.path.join(folder_path, filename)
            with open(file_path, 'r') as file:
                data = json.load(file)
                page_name = data.get('url')
                forward_links = data.get('forwardLinks', [])
                if not forward_links:
                    forward_links = [None]
                for link in forward_links:
                    graph.add_edge(page_name, link)
    print("Graph building ended")
    return graph

**Page Rank**


In [5]:
def rank_dist(link_list, rank):
    len_link_list = len(link_list)
    if len_link_list > 0 and link_list[0] is not None:
        rank = rank / len_link_list
        r_list = [(x, rank) for x in link_list]
    else:
        r_list = [("DANGLING", rank)]
    return r_list

inner_schema = StructType([
    StructField("uri_id", StringType(), False),
    StructField("rank", FloatType(), False)
])

ranks_dist_udf = udf(rank_dist, ArrayType(inner_schema))

In [6]:
def run(graph, alpha=0.15, convergence=0.01):
    edge_info = [(src, dst) for src, dst_list in graph.graph.items() for dst in dst_list]
    spark = SparkSession.builder.appName("PageRank").getOrCreate()
    print("Start Spark session")
    edges_df = spark.createDataFrame(edge_info, ['src', 'dst'])

    dataframe_ranked = edges_df\
        .groupby('src')\
        .agg(collect_list('dst').alias('dst_list'))\
        .withColumnRenamed('src', 'uri_id')

    dataframe_ranked = dataframe_ranked.withColumn('rank', lit(1.0 / dataframe_ranked.count()))

    n_nodes = dataframe_ranked.count()
    i = 0
    checkpoint_dir = 'checkpoint_directory'  # Ensure this directory exists
    spark.sparkContext.setCheckpointDir(checkpoint_dir)
    while True:
        dataframe_ranked.cache()

        ranks_one_df = dataframe_ranked.withColumn(
            'link_map_pr', ranks_dist_udf('dst_list', 'rank'))
        ranks_one_df = ranks_one_df.select(
            explode('link_map_pr').alias('exploded'))
        ranks_one_df = ranks_one_df\
            .withColumn('dst_id', ranks_one_df['exploded'].getItem('uri_id'))\
            .withColumn('rank_i', ranks_one_df['exploded'].getItem('rank'))\
            .drop('exploded')

        ranks_one_df = ranks_one_df\
            .groupby('dst_id')\
            .sum('rank_i')\
            .withColumnRenamed('sum(rank_i)', 'rank_i')

        dataframe_ranked = dataframe_ranked\
            .join(ranks_one_df, dataframe_ranked['uri_id'] == ranks_one_df['dst_id'], 'outer')\
            .drop('dst_id')

        dangling_rank = dataframe_ranked\
            .filter(dataframe_ranked.uri_id == "DANGLING")\
            .select(spark_sum('rank_i'))\
            .first()[0]

        dataframe_ranked = dataframe_ranked.filter(dataframe_ranked.uri_id != "DANGLING")

        if dangling_rank:
            dist_alpha = ((dangling_rank / n_nodes) * (1 - alpha)) + alpha
        else:
            dist_alpha = alpha

        sum_alpha_and_pr_udf = udf(lambda x: (x * (1 - alpha)) + dist_alpha, FloatType())
        dataframe_ranked = dataframe_ranked.na.fill(0, ['rank_i'])
        dataframe_ranked = dataframe_ranked.withColumn('rank_i', sum_alpha_and_pr_udf('rank_i'))

        convergence_udf = udf(lambda rank_i, rank: abs(rank_i - rank) <= convergence, BooleanType())
        dataframe_ranked = dataframe_ranked.withColumn('convergence', convergence_udf('rank', 'rank_i'))
        count_not_converged = dataframe_ranked.filter(dataframe_ranked.convergence == False).count()
        dataframe_ranked = dataframe_ranked.drop('convergence').drop('rank').withColumnRenamed('rank_i', 'rank')

        dataframe_ranked = dataframe_ranked.checkpoint()

        if count_not_converged == 0:
            print("All nodes convergend according to the given criteria")
            break
        else:
            print(f'Nodes not yet converged: {count_not_converged}')

        i += 1

    output_graph = {}
    for row in dataframe_ranked.collect():
        output_graph[row['uri_id']] = row['rank']

    return output_graph

**Driver Code**


In [7]:
if __name__ == '__main__':
    spark = SparkSession.builder.appName("PageRank").getOrCreate()
    folder_path = config['SCRAPED_DATA']
    graph = build_graph_from_json_folder(folder_path)
    print("Start ranking")
    ranked_graph = run(graph)
    print("PageRank Results:")
    print(ranked_graph)

24/05/26 18:35:06 WARN Utils: Your hostname, Muhammads-MacBook-Pro-4.local resolves to a loopback address: 127.0.0.1; using 10.7.94.91 instead (on interface en0)
24/05/26 18:35:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/26 18:35:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Graph building started
Graph building ended
Start ranking
Start Spark session


24/05/26 18:35:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Nodes not yet converged: 16




All nodes convergend according to the given criteria
PageRank Results:
{'https://britannica.com': 0.15000000596046448, 'https://wordpress.com': 0.15000000596046448, 'https://instagram.com': 0.15000000596046448, 'https://ebay.com': 0.15000000596046448, 'https://nih.gov': 0.15000000596046448, 'https://wikipedia.org': 0.15000000596046448, 'https://quora.com': 0.15000000596046448, 'https://sciencedirect.com': 0.15000000596046448, 'https://facebook.com': 0.15000000596046448, 'https://researchgate.net': 0.15000000596046448, 'https://indeed.com': 0.15000000596046448, 'https://etsy.com': 0.15000000596046448, 'https://linkedin.com': 0.15000000596046448, 'https://youtube.com': 0.15000000596046448, 'https://pinterest.com': 0.15000000596046448, 'https://medium.com': 0.15000000596046448}


                                                                                