In [23]:
!pip install pyspark



In [24]:
!pip install -q findspark
import findspark
findspark.init()

In [25]:
!wget -q https://storage.googleapis.com/public_lddm_data/small_page_links.nt
!ls

sample_data  small_page_links.nt  small_page_links.nt.1


In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

# Créez une session Spark
spark = SparkSession.builder.appName("PageRankExample").getOrCreate()


In [27]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
    StructField("source", StringType(), nullable=True),
    StructField("predicate", StringType(), nullable=True),
    StructField("target", StringType(), nullable=True)
])

In [28]:
# Chargez vos données web en tant que DataFrame
# Supposons que vous ayez un DataFrame avec deux colonnes : 'source' et 'target' représentant les liens entre les pages web
# Par exemple, vous pouvez le charger à partir d'un fichier CSV
data = spark.read.option("delimiter"," ").csv("small_page_links.nt", header=False, schema=schema)
data.show(5,truncate=200)

+-------------------------------------------------------+--------------------------------------+-----------------------------------------------------------+
|                                                 source|                             predicate|                                                     target|
+-------------------------------------------------------+--------------------------------------+-----------------------------------------------------------+
|       <http://dbpedia.org/resource/AfghanistanHistory>|<http://dbpedia.org/property/wikilink>|       <http://dbpedia.org/resource/History_of_Afghanistan>|
|     <http://dbpedia.org/resource/AfghanistanGeography>|<http://dbpedia.org/property/wikilink>|     <http://dbpedia.org/resource/Geography_of_Afghanistan>|
|      <http://dbpedia.org/resource/AccessibleComputing>|<http://dbpedia.org/property/wikilink>|       <http://dbpedia.org/resource/Computer_accessibility>|
|        <http://dbpedia.org/resource/AfghanistanPeople>|<

In [29]:
# So we can really write SQL !!
data.createOrReplaceTempView("SPO")
result=spark.sql("select source from SPO")
result.show(5, truncate=200)

+-------------------------------------------------------+
|                                                 source|
+-------------------------------------------------------+
|       <http://dbpedia.org/resource/AfghanistanHistory>|
|     <http://dbpedia.org/resource/AfghanistanGeography>|
|      <http://dbpedia.org/resource/AccessibleComputing>|
|        <http://dbpedia.org/resource/AfghanistanPeople>|
|<http://dbpedia.org/resource/AfghanistanCommunications>|
+-------------------------------------------------------+
only showing top 5 rows



In [30]:
data.take(1)

[Row(source='<http://dbpedia.org/resource/AfghanistanHistory>', predicate='<http://dbpedia.org/property/wikilink>', target='<http://dbpedia.org/resource/History_of_Afghanistan>')]

In [31]:
# Créez un DataFrame contenant le nombre de liens sortants pour chaque page
outdegrees = data.groupBy("source").count().withColumnRenamed("source", "page").withColumnRenamed("count", "outDegree")

# Définissez le nombre d'itérations pour le calcul du PageRank
max_iterations = 5
damping_factor = 0.85

# Initialisation du PageRank en attribuant à chaque page une valeur de départ
initial_pagerank = 1.0

# Créez un DataFrame contenant les valeurs de PageRank initiales
pagerank = outdegrees.withColumn("pagerank", lit(initial_pagerank))

pagerank.show(5,truncate=100)


+-----------------------------------------------------+---------+--------+
|                                                 page|outDegree|pagerank|
+-----------------------------------------------------+---------+--------+
|         <http://dbpedia.org/resource/Actinopterygii>|      197|     1.0|
|<http://dbpedia.org/resource/AtlasShruggedCharacters>|        1|     1.0|
|             <http://dbpedia.org/resource/Allegiance>|       35|     1.0|
|                  <http://dbpedia.org/resource/AbboT>|        1|     1.0|
|        <http://dbpedia.org/resource/AcademicElitism>|        1|     1.0|
+-----------------------------------------------------+---------+--------+
only showing top 5 rows



In [35]:
# Rejoignez le DataFrame pagerank avec le DataFrame data pour calculer la contribution à partir des liens entrants
contrib = data.join(pagerank, data.target == pagerank.page, "left").select("source", "pagerank")
contrib = contrib.withColumn("pagerank", when(col("pagerank").isNull(), 0.0).otherwise(col("pagerank")))

new_pagerank = contrib.groupBy("source").sum("pagerank").withColumnRenamed("source", "page").withColumnRenamed("sum(pagerank)", "pagerank")

# Joignez le DataFrame "new_pagerank" avec le DataFrame "outdegrees" pour obtenir les "outDegree" appropriés
pagerank = new_pagerank.join(outdegrees, new_pagerank.page == outdegrees.page, "left").select(new_pagerank.page, new_pagerank.pagerank, outdegrees.outDegree)

# Appliquez la formule du PageRank
pagerank = pagerank.withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank") / when(col("outDegree") > 0, col("outDegree")).otherwise(1))
pagerank.show(5)

+--------------------+-------------------+---------+
|                page|           pagerank|outDegree|
+--------------------+-------------------+---------+
|<http://dbpedia.o...|0.15000000000000002|      197|
|<http://dbpedia.o...|0.28111708845019734|        1|
|<http://dbpedia.o...|0.15000000000000002|       35|
|<http://dbpedia.o...|0.27989073867732917|        1|
|<http://dbpedia.o...|0.15000000000000002|        1|
+--------------------+-------------------+---------+
only showing top 5 rows



In [36]:
import time
max_iterations = 10
debut = time.time()

# Effectuez des itérations pour calculer le PageRank

for iteration in range(max_iterations):
  contrib = data.join(pagerank, data.target == pagerank.page, "left").select("source", "pagerank")
  contrib = contrib.withColumn("pagerank", when(col("pagerank").isNull(), 0.0).otherwise(col("pagerank")))

  new_pagerank = contrib.groupBy("source").sum("pagerank").withColumnRenamed("source", "page").withColumnRenamed("sum(pagerank)", "pagerank")

  # Joignez le DataFrame "new_pagerank" avec le DataFrame "outdegrees" pour obtenir les "outDegree" appropriés
  pagerank = new_pagerank.join(outdegrees, new_pagerank.page == outdegrees.page, "left").select(new_pagerank.page, new_pagerank.pagerank, outdegrees.outDegree)

  # Appliquez la formule du PageRank
  pagerank = pagerank.withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank") / when(col("outDegree") > 0, col("outDegree")).otherwise(1))


# Affichez les résultats
pagerank.select("page", "pagerank").show()
fin = time.time()
print(f"Temps d'exécution : {fin-debut} secondes")
max_pagerank_page = pagerank.orderBy(col("pagerank"), ascending=False).limit(1)
# Show the result
max_pagerank_page.show(truncate=200)
# Arrêtez la session Spark
# spark.stop()

+--------------------+-------------------+
|                page|           pagerank|
+--------------------+-------------------+
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...| 0.2811221393932113|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...| 0.2798907386773296|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...| 0.1557801986733062|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...|0.15589785108855592|
|<http://dbpedia.o...|0.15595563047687902|
|<http://dbpedia.o...|0.15521243414110686|
|<http://dbpedia.o...| 0.1512481644640235|
|<http://dbpedia.o...| 0.1528830682341113|
|<http://dbpedia.o...|0.15363241990176196|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...| 0.1507107618114224|
|<http://dbpedia.o...|0.15159133486957974|
|<http://dbpedia.o...|0.15000000000000002|
|<http://dbpedia.o...|0.15000000000000002|
+----------