<a href="https://colab.research.google.com/github/momo54/large_scale_data_management/blob/main/PageRank_an_PySPARK_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=90ff72d2b6721060708bf1be500ada2cba1b0821ee438ff7a248465e719195be
  Stored in directory: /home/amin3hajji/.cache/pip/wheels/07/a0/a3/d24c94bf043ab5c7e38c30491199a2a1

In [2]:
!pip install -q findspark
import findspark
findspark.init()

In [3]:
!wget -q https://storage.googleapis.com/public_lddm_data/small_page_links.nt
!ls

CharSets.ipynb			  flatten2.pig
GraphFramesPageRank.ipynb	  map_reduce_with_py_sparl.ipynb
PageRank_an_PySPARK_SQL.ipynb	  people.csv
PyPageRank.ipynb		  pyspark
README.md			  run.sh
Spark_Structured_Streaming.ipynb  small_page_links.nt
Streaming_CSV.ipynb		  small_page_links.nt.1
VoiD_with_PySpark_RDD.ipynb	  stream.py
Wikipedia-Stream		  vianu.pig
Wikipedia_Events.ipynb		  watdiv-100k.nt
authors.pig			  webdam-books.txt
data				  webdam-publishers.txt
dataproc.py			  wordcount.sh
exam2022.ipynb


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Créez une session Spark
spark = SparkSession.builder.appName("PageRankExample").getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 14:55:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
    StructField("source", StringType(), nullable=True),
    StructField("predicate", StringType(), nullable=True),
    StructField("target", StringType(), nullable=True)
])

In [6]:
# Chargez vos données web en tant que DataFrame
# Supposons que vous ayez un DataFrame avec deux colonnes : 'source' et 'target' représentant les liens entre les pages web
# Par exemple, vous pouvez le charger à partir d'un fichier CSV
data = spark.read.option("delimiter"," ").csv("small_page_links.nt", header=False, schema=schema)
data.show(5,truncate=200)

                                                                                

+-------------------------------------------------------+--------------------------------------+-----------------------------------------------------------+
|                                                 source|                             predicate|                                                     target|
+-------------------------------------------------------+--------------------------------------+-----------------------------------------------------------+
|       <http://dbpedia.org/resource/AfghanistanHistory>|<http://dbpedia.org/property/wikilink>|       <http://dbpedia.org/resource/History_of_Afghanistan>|
|     <http://dbpedia.org/resource/AfghanistanGeography>|<http://dbpedia.org/property/wikilink>|     <http://dbpedia.org/resource/Geography_of_Afghanistan>|
|      <http://dbpedia.org/resource/AccessibleComputing>|<http://dbpedia.org/property/wikilink>|       <http://dbpedia.org/resource/Computer_accessibility>|
|        <http://dbpedia.org/resource/AfghanistanPeople>|<

In [7]:
# So we can really write SQL !!
data.createOrReplaceTempView("SPO")
result=spark.sql("select source from SPO")
result.show(5)

+--------------------+
|              source|
+--------------------+
|<http://dbpedia.o...|
|<http://dbpedia.o...|
|<http://dbpedia.o...|
|<http://dbpedia.o...|
|<http://dbpedia.o...|
+--------------------+
only showing top 5 rows



In [8]:
data.take(1)

[Row(source='<http://dbpedia.org/resource/AfghanistanHistory>', predicate='<http://dbpedia.org/property/wikilink>', target='<http://dbpedia.org/resource/History_of_Afghanistan>')]

In [9]:
# Créez un DataFrame contenant le nombre de liens sortants pour chaque page
outdegrees = data.groupBy("source").count().withColumnRenamed("source", "page").withColumnRenamed("count", "outDegree")

# Définissez le nombre d'itérations pour le calcul du PageRank
max_iterations = 3
damping_factor = 0.85

# Initialisation du PageRank en attribuant à chaque page une valeur de départ
initial_pagerank = 1.0

# Créez un DataFrame contenant les valeurs de PageRank initiales
pagerank = outdegrees.withColumn("pagerank", col("outDegree") / initial_pagerank)

pagerank.show(5,truncate=100)




+--------------------------------------------------------+---------+--------+
|                                                    page|outDegree|pagerank|
+--------------------------------------------------------+---------+--------+
|            <http://dbpedia.org/resource/Actinopterygii>|      197|   197.0|
|   <http://dbpedia.org/resource/AtlasShruggedCharacters>|        1|     1.0|
|                     <http://dbpedia.org/resource/AbboT>|        1|     1.0|
|           <http://dbpedia.org/resource/AcademicElitism>|        1|     1.0|
|<http://dbpedia.org/resource/Atlas_%28disambiguation%29>|      145|   145.0|
+--------------------------------------------------------+---------+--------+
only showing top 5 rows



                                                                                

In [10]:
# Rejoignez le DataFrame pagerank avec le DataFrame data pour calculer la contribution à partir des liens entrants
contrib = data.join(pagerank, data.target == pagerank.page, "left").select("source", "pagerank")

new_pagerank = contrib.groupBy("source").sum("pagerank").withColumnRenamed("source", "page").withColumnRenamed("sum(pagerank)", "pagerank")

# Joignez le DataFrame "new_pagerank" avec le DataFrame "outdegrees" pour obtenir les "outDegree" appropriés
pagerank = new_pagerank.join(outdegrees, new_pagerank.page == outdegrees.page, "left").select(new_pagerank.page, new_pagerank.pagerank, outdegrees.outDegree)

# Appliquez la formule du PageRank
pagerank = pagerank.withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank") / col("outDegree"))
pagerank.show(5)

                                                                                

+--------------------+--------+---------+
|                page|pagerank|outDegree|
+--------------------+--------+---------+
|<http://dbpedia.o...|    NULL|      197|
|<http://dbpedia.o...|    52.0|        1|
|<http://dbpedia.o...|  158.25|        1|
|<http://dbpedia.o...|    NULL|        1|
|<http://dbpedia.o...|    NULL|      145|
+--------------------+--------+---------+
only showing top 5 rows



In [11]:
# quand le chat dit des bétises...
 # Rejoignez le DataFrame pagerank avec le DataFrame data pour calculer la contribution à partir des liens entrants
contrib = data.join(pagerank, data.target == pagerank.page, "left").select("source", "pagerank")
contrib.show(2,truncate=100)

# Calculez le nouveau PageRank
pagerank = contrib.groupBy("source").sum("pagerank").withColumnRenamed("source", "page").withColumnRenamed("sum(pagerank)", "pagerank")
pagerank.show(2)

# Appliquez la formule du PageRank
pagerank = pagerank.withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank") /  pagerank["outDegree"])
pagerank.show(2)

+--------------------------------------------------+--------+
|                                            source|pagerank|
+--------------------------------------------------+--------+
|  <http://dbpedia.org/resource/AfghanistanHistory>|    NULL|
|<http://dbpedia.org/resource/AfghanistanGeography>|    NULL|
+--------------------------------------------------+--------+
only showing top 2 rows

+--------------------+-----------------+
|                page|         pagerank|
+--------------------+-----------------+
|<http://dbpedia.o...|             NULL|
|<http://dbpedia.o...|9.513934426229508|
+--------------------+-----------------+
only showing top 2 rows



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `outDegree` cannot be resolved. Did you mean one of the following? [`page`, `pagerank`].

In [None]:
import time
max_iterations = 10
debut = time.time()

# Effectuez des itérations pour calculer le PageRank

for iteration in range(max_iterations):
  # Rejoignez le DataFrame pagerank avec le DataFrame data pour calculer la contribution à partir des liens entrants
  contrib = data.join(pagerank, data.target == pagerank.page, "left").select("source", "pagerank")

  new_pagerank = contrib.groupBy("source").sum("pagerank").withColumnRenamed("source", "page").withColumnRenamed("sum(pagerank)", "pagerank")

  # Joignez le DataFrame "new_pagerank" avec le DataFrame "outdegrees" pour obtenir les "outDegree" appropriés
  pagerank = new_pagerank.join(outdegrees, new_pagerank.page == outdegrees.page, "left").select(new_pagerank.page, new_pagerank.pagerank, outdegrees.outDegree)

  # Appliquez la formule du PageRank
  pagerank = pagerank.withColumn("pagerank", (1 - damping_factor) + damping_factor * col("pagerank") / col("outDegree"))


# Affichez les résultats
pagerank.select("page", "pagerank").show()
fin = time.time()
print(f"Temps d'exécution : {fin-debut} secondes")
# Arrêtez la session Spark
# spark.stop()

+--------------------+-------------------+
|                page|           pagerank|
+--------------------+-------------------+
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|0.28111708845019723|
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|0.27989073867732917|
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|0.15577653203305658|
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|0.15589785108855592|
|<http://dbpedia.o...|0.15595562103322655|
|<http://dbpedia.o...|0.15521238381856947|
|<http://dbpedia.o...| 0.1512481644640235|
|<http://dbpedia.o...|0.15288306779854458|
|<http://dbpedia.o...|0.15363072921406307|
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|0.15071076181142143|
|<http://dbpedia.o...|0.15159132688549384|
|<http://dbpedia.o...|               NULL|
|<http://dbpedia.o...|               NULL|
+----------