<a href="https://colab.research.google.com/github/momo54/large_scale_data_management/blob/main/PageRank_an_PySPARK_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [35]:
!pip install pyspark



In [36]:
!pip install -q findspark
import findspark
findspark.init()

In [37]:
!wget -q https://storage.googleapis.com/public_lddm_data/small_page_links.nt
!ls

sample_data  small_page_links.nt  small_page_links.nt.1


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum

# Créez une session Spark
spark = SparkSession.builder.appName("PageRankExample").getOrCreate()


In [14]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
    StructField("source", StringType(), nullable=True),
    StructField("predicate", StringType(), nullable=True),
    StructField("target", StringType(), nullable=True)
])

In [15]:
# Chargez vos données web en tant que DataFrame
# Supposons que vous ayez un DataFrame avec deux colonnes : 'source' et 'target' représentant les liens entre les pages web
# Par exemple, vous pouvez le charger à partir d'un fichier CSV
data = spark.read.option("delimiter"," ").csv("small_page_links.nt", header=False, schema=schema)
data.show(5,truncate=200)

+-------------------------------------------------------+--------------------------------------+-----------------------------------------------------------+
|                                                 source|                             predicate|                                                     target|
+-------------------------------------------------------+--------------------------------------+-----------------------------------------------------------+
|       <http://dbpedia.org/resource/AfghanistanHistory>|<http://dbpedia.org/property/wikilink>|       <http://dbpedia.org/resource/History_of_Afghanistan>|
|     <http://dbpedia.org/resource/AfghanistanGeography>|<http://dbpedia.org/property/wikilink>|     <http://dbpedia.org/resource/Geography_of_Afghanistan>|
|      <http://dbpedia.org/resource/AccessibleComputing>|<http://dbpedia.org/property/wikilink>|       <http://dbpedia.org/resource/Computer_accessibility>|
|        <http://dbpedia.org/resource/AfghanistanPeople>|<

In [16]:
# So we can really write SQL !!
data.createOrReplaceTempView("SPO")
result=spark.sql("select source from SPO")
result.show(5)

+--------------------+
|              source|
+--------------------+
|<http://dbpedia.o...|
|<http://dbpedia.o...|
|<http://dbpedia.o...|
|<http://dbpedia.o...|
|<http://dbpedia.o...|
+--------------------+
only showing top 5 rows



In [17]:
data.take(1)

[Row(source='<http://dbpedia.org/resource/AfghanistanHistory>', predicate='<http://dbpedia.org/property/wikilink>', target='<http://dbpedia.org/resource/History_of_Afghanistan>')]

In [18]:
# Créez un DataFrame contenant le nombre de liens sortants pour chaque page
outdegrees = data.groupBy("source").count().withColumnRenamed("source", "page").withColumnRenamed("count", "outDegree")

# Définissez le nombre d'itérations pour le calcul du PageRank
max_iterations = 3
damping_factor = 0.85

# Initialisation du PageRank en attribuant à chaque page une valeur de départ
initial_pagerank = 1.0

# Créez un DataFrame contenant les valeurs de PageRank initiales
pagerank = outdegrees.withColumn("pagerank", col("outDegree") / initial_pagerank)

pagerank.show(5,truncate=100)


+--------------------------------------------------------+---------+--------+
|                                                    page|outDegree|pagerank|
+--------------------------------------------------------+---------+--------+
|            <http://dbpedia.org/resource/Actinopterygii>|      197|   197.0|
|   <http://dbpedia.org/resource/AtlasShruggedCharacters>|        1|     1.0|
|                     <http://dbpedia.org/resource/AbboT>|        1|     1.0|
|           <http://dbpedia.org/resource/AcademicElitism>|        1|     1.0|
|<http://dbpedia.org/resource/Atlas_%28disambiguation%29>|      145|   145.0|
+--------------------------------------------------------+---------+--------+
only showing top 5 rows



In [20]:
from pyspark.sql.functions import col, lit, sum as spark_sum, coalesce

# Étape 1: Rejoignez le DataFrame pagerank avec le DataFrame data pour calculer la contribution à partir des liens entrants
contrib = data.join(pagerank, data.target == pagerank.page, "left") \
              .select(data.source.alias("page"), (col("pagerank") / col("outDegree")).alias("contrib"))

# Étape 2: Créez un DataFrame contenant toutes les pages (sources et cibles) distinctes
all_pages = data.select("source").union(data.select("target")).distinct().withColumnRenamed("source", "page")

# Étape 3: Calculez le nouveau PageRank en regroupant les contributions par page
new_pagerank = contrib.groupBy("page").agg(spark_sum("contrib").alias("pagerank"))

# Étape 4: Joignez toutes les pages avec le nouveau PageRank pour inclure les pages sans contributions
pagerank = all_pages.join(new_pagerank, "page", "left") \
                    .withColumn("pagerank", coalesce(col("pagerank"), lit(0.0))) \
                    .withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank"))

# Étape 5: Affichez les 5 premières lignes du DataFrame pagerank
pagerank.show(5)


                                                                                

+--------------------+-------------------+
|                page|           pagerank|
+--------------------+-------------------+
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...|                1.0|
|<http://dbpedia.o...|                1.0|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...|0.15000000000000002|
+--------------------+-------------------+
only showing top 5 rows



In [23]:
# Alias des DataFrames pour éviter l'ambiguïté
data_alias = data.alias("data")
pagerank_alias = pagerank.alias("pagerank")
outdegrees_alias = outdegrees.alias("outdegrees")

# Étape 2 : Joignez le DataFrame pagerank avec le DataFrame data pour calculer les contributions des pages
contrib = data_alias \
    .join(pagerank_alias, col("data.target") == col("pagerank.page"), "left") \
    .join(outdegrees_alias, col("data.source") == col("outdegrees.source"), "left") \
    .select(col("data.source").alias("page"), (col("pagerank.pagerank") / col("outdegrees.outDegree")).alias("contrib"))

# Étape 3 : Calculez le nouveau PageRank en regroupant les contributions par page
new_pagerank = contrib.groupBy("page").agg(spark_sum("contrib").alias("pagerank"))

# Étape 4 : Assurez-vous que toutes les pages reçoivent une valeur, même celles sans liens entrants
all_pages = data_alias.select("source").union(data_alias.select("target")).distinct().withColumnRenamed("source", "page")

pagerank = all_pages.join(new_pagerank, "page", "left") \
                    .withColumn("pagerank", coalesce(col("pagerank"), lit(0.0))) \
                    .withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank"))

# Étape 5 : Affichez les 2 premières lignes pour validation
pagerank.show(2)


                                                                                

+--------------------+-------------------+
|                page|           pagerank|
+--------------------+-------------------+
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...| 18.765000000000004|
+--------------------+-------------------+
only showing top 2 rows



In [25]:
import time
from pyspark.sql.functions import col, lit, sum as spark_sum, coalesce

max_iterations = 10
debut = time.time()

# Effectuez des itérations pour calculer le PageRank
for iteration in range(max_iterations):
    print(f"Iteration {iteration + 1}/{max_iterations}")

    # Alias pour éviter les ambiguïtés
    data_alias = data.alias("data")
    pagerank_alias = pagerank.alias("pagerank")
    outdegrees_alias = outdegrees.alias("outdegrees")

    # Étape 1 : Rejoignez le DataFrame pagerank avec data pour calculer les contributions
    contrib = data_alias \
        .join(pagerank_alias, col("data.target") == col("pagerank.page"), "left") \
        .join(outdegrees_alias, col("data.source") == col("outdegrees.source"), "left") \
        .select(col("data.source").alias("page"), (col("pagerank.pagerank") / col("outdegrees.outDegree")).alias("contrib"))

    # Étape 2 : Calculez le nouveau PageRank
    new_pagerank = contrib.groupBy("page").agg(spark_sum("contrib").alias("pagerank"))

    # Étape 3 : Assurez-vous que toutes les pages sont incluses
    all_pages = data.select("source").union(data.select("target")).distinct().withColumnRenamed("source", "page")

    pagerank = all_pages.join(new_pagerank, "page", "left") \
                        .withColumn("pagerank", coalesce(col("pagerank"), lit(0.0))) \
                        .withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank"))

# Affichez les résultats
pagerank.select("page", "pagerank").show()

fin = time.time()
print(f"Temps d'exécution : {fin - debut:.2f} secondes")

Iteration 1/10
Iteration 2/10
Iteration 3/10
Iteration 4/10
Iteration 5/10
Iteration 6/10
Iteration 7/10
Iteration 8/10
Iteration 9/10
Iteration 10/10


                                                                                

+--------------------+-------------------+
|                page|           pagerank|
+--------------------+-------------------+
|<http://dbpedia.o...|0.27749999999999997|
|<http://dbpedia.o...| 0.3889538184843779|
|<http://dbpedia.o...|0.38790712787573095|
|<http://dbpedia.o...|             0.2775|
|<http://dbpedia.o...| 0.2775000000000001|
|<http://dbpedia.o...| 0.2824131688855989|
|<http://dbpedia.o...| 0.2825131734252726|
|<http://dbpedia.o...| 0.2785609397944201|
|<http://dbpedia.o...|             0.2775|
|<http://dbpedia.o...|             0.2775|
|<http://dbpedia.o...|             0.2775|
|<http://dbpedia.o...| 0.2781131713322693|
|<http://dbpedia.o...|             0.2775|
|<http://dbpedia.o...| 0.2797527239482297|
|<http://dbpedia.o...|0.27898458904109585|
|<http://dbpedia.o...|0.27810516810610625|
|<http://dbpedia.o...|0.28029541098055233|
|<http://dbpedia.o...|0.38616585101824497|
|<http://dbpedia.o...| 0.2785724888668978|
|<http://dbpedia.o...| 0.3872161846153231|
+----------