In [None]:
import os
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

NUMBER_ITERATION = 10

In [44]:
# Define the schema
schema = StructType([
    StructField("urlid", StringType(), True),
    StructField("urlchildren", StringType(), True)
])

In [72]:
def PageRank_DataFrame(nombre_iteration:int, data_path:str):
    spark = SparkSession.builder.appName("pagerank_df").getOrCreate()
    sc = spark.sparkContext

    # Lit le fichier CSV en utilisant le schema specifie 
    df = spark.read.option("header", False).option("delimiter", ";").option("quote", '"').schema(schema).csv(data_path)
    df = df.withColumn("urlchildren", f.split(df["urlchildren"], ' '))

    # Eclatement de la liste des enfants
    exploded_df = df.select("urlid", f.array_size(df.urlchildren).alias("count"), f.explode("urlchildren").alias("urlchildren"))
    exploded_df = exploded_df.repartition(sc.defaultParallelism, "urlid")

    # Initialisation des ranks
    ranks = exploded_df.select(f.col("urlid")).union(exploded_df.select(f.col("urlchildren").alias("urlid"))).distinct()
    ranks = ranks.withColumn("rank", f.lit(1.0))
    ranks = ranks.repartition(sc.defaultParallelism, "urlid")

    # Calcul du pagerank
    for _ in range(nombre_iteration):
        contrib = exploded_df.join(ranks,"urlid").withColumn("rank", f.col("rank") / f.col("count"))

        ranks = contrib.groupBy("urlchildren").agg(f.sum("rank").alias("rankCount"))\
                    .withColumn("rank",f.col("rankCount") * 0.85 + 0.15)\
                    .select("urlchildren","rank")\
                    .withColumnRenamed("urlchildren","urlid")
    return ranks.sort(f.desc("rank"))

In [73]:
result = PageRank_DataFrame(NUMBER_ITERATION, os.path.join('..', 'data', 'wikilinks.csv'))