<a href="https://colab.research.google.com/github/gzc/spark/blob/main/page_rank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz  
!tar xf /content/spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext()

In [4]:
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

In [10]:
import re
def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

In [7]:
# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...
# text_file = sc.textFile("hdfs://...")
from google.colab import files
files.upload()

text_file = sc.textFile("pagerank_data.txt")

Saving pagerank_data.txt to pagerank_data.txt


In [12]:
# Loads all URLs from input file and initialize their neighbors.
links = text_file.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
print (links.collect())

[('1', <pyspark.resultiterable.ResultIterable object at 0x7f7daec86828>), ('4', <pyspark.resultiterable.ResultIterable object at 0x7f7daec86898>), ('2', <pyspark.resultiterable.ResultIterable object at 0x7f7daec868d0>), ('3', <pyspark.resultiterable.ResultIterable object at 0x7f7daec86908>)]


In [23]:
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

In [19]:
from operator import add
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(10):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks).flatMap(
        lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))

    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

[('1', 0.7539975652935547), ('4', 0.5793357680397785), ('2', 0.5793357680397785), ('3', 0.5793357680397785), ('1', 0.7539975652935547), ('1', 0.7539975652935547)]
[('4', 0.6908979304995214), ('2', 0.6908979304995214), ('3', 0.6908979304995214), ('1', 0.6424354028338117), ('1', 0.6424354028338117), ('1', 0.6424354028338117)]
[('1', 0.7372632409245932), ('1', 0.7372632409245932), ('1', 0.7372632409245932), ('4', 0.59607009240874), ('2', 0.59607009240874), ('3', 0.59607009240874)]
[('1', 0.656659578547429), ('1', 0.656659578547429), ('4', 0.6766737547859042), ('2', 0.6766737547859042), ('3', 0.6766737547859042), ('1', 0.656659578547429)]
[('1', 0.7251726915680186), ('1', 0.7251726915680186), ('1', 0.7251726915680186), ('4', 0.6081606417653146), ('2', 0.6081606417653146), ('3', 0.6081606417653146)]
[('4', 0.6663967878328158), ('2', 0.6663967878328158), ('3', 0.6663967878328158), ('1', 0.6669365455005174), ('1', 0.6669365455005174), ('1', 0.6669365455005174)]
[('1', 0.7164372696578934), ('1

In [20]:
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
    print("%s has rank: %s." % (link, rank))

3 has rank: 0.7055659824943556.
4 has rank: 0.7055659824943556.
2 has rank: 0.7055659824943556.
1 has rank: 1.8833020525169324.
