<a href="https://colab.research.google.com/github/momo54/large_scale_data_management/blob/main/PyPageRank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install pyspark



In [7]:
!pip install -q findspark
import findspark
findspark.init()

# SPARK INSTALLED... lets play

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [9]:
# !wget -q https://storage.googleapis.com/public_lddm_data/small_page_links.nt
# !ls

In [10]:
lines = spark.read.text("data/small_page_links.nt").rdd.map(lambda r: r[0])
lines.take(5)

['<http://dbpedia.org/resource/AfghanistanHistory> <http://dbpedia.org/property/wikilink> <http://dbpedia.org/resource/History_of_Afghanistan> .',
 '<http://dbpedia.org/resource/AfghanistanGeography> <http://dbpedia.org/property/wikilink> <http://dbpedia.org/resource/Geography_of_Afghanistan> .',
 '<http://dbpedia.org/resource/AccessibleComputing> <http://dbpedia.org/property/wikilink> <http://dbpedia.org/resource/Computer_accessibility> .',
 '<http://dbpedia.org/resource/AfghanistanPeople> <http://dbpedia.org/property/wikilink> <http://dbpedia.org/resource/Demography_of_Afghanistan> .',
 '<http://dbpedia.org/resource/AfghanistanCommunications> <http://dbpedia.org/property/wikilink> <http://dbpedia.org/resource/Communications_in_Afghanistan> .']

In [11]:
import re
def computeContribs(urls, rank) :
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)


def parseNeighbors(urls) :
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[2]

In [12]:
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()

import time
start_time = time.time()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))


In [13]:
links.take(5)

[('<http://dbpedia.org/resource/AfghanistanHistory>',
  <pyspark.resultiterable.ResultIterable at 0x179ae6ef710>),
 ('<http://dbpedia.org/resource/AfghanistanGeography>',
  <pyspark.resultiterable.ResultIterable at 0x179b013d850>),
 ('<http://dbpedia.org/resource/AccessibleComputing>',
  <pyspark.resultiterable.ResultIterable at 0x179b05ce090>),
 ('<http://dbpedia.org/resource/AfghanistanPeople>',
  <pyspark.resultiterable.ResultIterable at 0x179ad626fd0>),
 ('<http://dbpedia.org/resource/AfghanistanCommunications>',
  <pyspark.resultiterable.ResultIterable at 0x179b013c090>)]

In [14]:
#groupByKey makes lists !!
links.map(lambda x: (x[0],list(x[1]))).take(5)

[('<http://dbpedia.org/resource/AfghanistanHistory>',
  ['<http://dbpedia.org/resource/History_of_Afghanistan>']),
 ('<http://dbpedia.org/resource/AfghanistanGeography>',
  ['<http://dbpedia.org/resource/Geography_of_Afghanistan>']),
 ('<http://dbpedia.org/resource/AccessibleComputing>',
  ['<http://dbpedia.org/resource/Computer_accessibility>']),
 ('<http://dbpedia.org/resource/AfghanistanPeople>',
  ['<http://dbpedia.org/resource/Demography_of_Afghanistan>']),
 ('<http://dbpedia.org/resource/AfghanistanCommunications>',
  ['<http://dbpedia.org/resource/Communications_in_Afghanistan>'])]

In [15]:
#groupByKey makes lists !!
links.map(lambda x: (x[0],len(list(x[1])))).sortBy(lambda x:x[1],ascending=False).take(10)

[('<http://dbpedia.org/resource/Academy_Award_for_Best_Art_Direction>', 1105),
 ('<http://dbpedia.org/resource/August_15>', 678),
 ('<http://dbpedia.org/resource/Afghanistan>', 642),
 ('<http://dbpedia.org/resource/Azerbaijan>', 642),
 ('<http://dbpedia.org/resource/Austria>', 638),
 ('<http://dbpedia.org/resource/Amsterdam>', 632),
 ('<http://dbpedia.org/resource/April_1>', 632),
 ('<http://dbpedia.org/resource/August_24>', 628),
 ('<http://dbpedia.org/resource/August_1>', 627),
 ('<http://dbpedia.org/resource/August_19>', 610)]

In [16]:
ranks.take(5)

[('<http://dbpedia.org/resource/AfghanistanHistory>', 1.0),
 ('<http://dbpedia.org/resource/AfghanistanGeography>', 1.0),
 ('<http://dbpedia.org/resource/AccessibleComputing>', 1.0),
 ('<http://dbpedia.org/resource/AfghanistanPeople>', 1.0),
 ('<http://dbpedia.org/resource/AfghanistanCommunications>', 1.0)]

In [17]:
links.join(ranks).take(5)

[('<http://dbpedia.org/resource/AfghanistanHistory>',
  (<pyspark.resultiterable.ResultIterable at 0x179aee17110>, 1.0)),
 ('<http://dbpedia.org/resource/AmoeboidTaxa>',
  (<pyspark.resultiterable.ResultIterable at 0x179af309b50>, 1.0)),
 ('<http://dbpedia.org/resource/AlbaniaHistory>',
  (<pyspark.resultiterable.ResultIterable at 0x179afb42e50>, 1.0)),
 ('<http://dbpedia.org/resource/AfroAsiaticLanguages>',
  (<pyspark.resultiterable.ResultIterable at 0x179ada94250>, 1.0)),
 ('<http://dbpedia.org/resource/ArtificalLanguages>',
  (<pyspark.resultiterable.ResultIterable at 0x179b0699010>, 1.0))]

In [18]:
links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(
            url_urls_rank[1][0], url_urls_rank[1][1]  # type: ignore[arg-type]
        )).take(5)

[('<http://dbpedia.org/resource/History_of_Afghanistan>', 1.0),
 ('<http://dbpedia.org/resource/Amoeboid>', 1.0),
 ('<http://dbpedia.org/resource/History_of_Albania>', 1.0),
 ('<http://dbpedia.org/resource/Afroasiatic_languages>', 1.0),
 ('<http://dbpedia.org/resource/Constructed_language>', 1.0)]

In [19]:
from operator import add
for iteration in range(1):
  # Calculates URL contributions to the rank of other URLs.
  contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(
            url_urls_rank[1][0], url_urls_rank[1][1]  # type: ignore[arg-type]
        ))

  # Re-calculates URL ranks based on neighbor contributions.
  ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

    # Collects all URL ranks and dump them to console.

end_time = time.time()
execution_time = end_time - start_time

for (link, rank) in ranks.collect():
  print("%s has rank: %s." % (link, rank))


<http://dbpedia.org/resource/History_of_Afghanistan> has rank: 1.858407320872274.
<http://dbpedia.org/resource/History_of_Albania> has rank: 1.8499999999999999.
<http://dbpedia.org/resource/Afroasiatic_languages> has rank: 1.8499999999999999.
<http://dbpedia.org/resource/Constructed_language> has rank: 2.6999999999999997.
<http://dbpedia.org/resource/Abbadid> has rank: 1.8499999999999999.
<http://dbpedia.org/resource/Abbey> has rank: 1.0263398203592813.
<http://dbpedia.org/resource/Abbot> has rank: 1.0903350323425858.
<http://dbpedia.org/resource/Abbreviation> has rank: 1.0154557672543283.
<http://dbpedia.org/resource/Arthur_Koestler> has rank: 1.0031365313653136.
<http://dbpedia.org/resource/Alexander_the_Great> has rank: 1.380802292616962.
<http://dbpedia.org/resource/Existence_of_God> has rank: 1.0.
<http://dbpedia.org/resource/Andre_Agassi> has rank: 1.001597744360902.
<http://dbpedia.org/resource/Academy_Award> has rank: 1.0306257530637937.
<http://dbpedia.org/resource/American_fo

In [20]:
print(f"Temps d'exécution : {execution_time} secondes")

Temps d'exécution : 31.848017692565918 secondes
