In [3]:
from pyspark.sql import SparkSession

import re
import sys
from operator import add

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://ben-spark-master:7077") \
        .appName("page_rank")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

In [2]:
# Based on: https://github.com/apache/spark/blob/master/examples/src/main/python/pagerank.py

def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

NUMBER_ITERATIONS = 5


# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...

lines = spark_session.sparkContext.parallelize(
                       ['1 5',
                        '2 5',
                        '3 5',
                        '4 5',
                        '5 1',
                        '5 6',
                        '2 6'])

#lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

# Loads all URLs from input file and initialize their neighbors.
parsed_lines = lines.map(lambda urls: parseNeighbors(urls))
links = parsed_lines.distinct().groupByKey().cache()
# (source, [target,target,target])
#print(links.take(10))
#break


#print(links.flatMapValues(lambda targets: targets).flatMap(lambda x: x).take(10))
#break

all_urls = parsed_lines.flatMap(lambda source_target: [source_target[0], source_target[1]]).distinct()
print("\nall_urls")
print(all_urls.take(10))


#print("links (with neighbours):")
#print(links.take(10))

# Loads all URLs with other URL(s) linked to from input file and initialize ranks of them to one.
ranks = all_urls.map(lambda source_targets: (source_targets[0], 1.0))
# (source, rank_integer)
print("\ninitial ranks:")
print(ranks.take(10))


for iteration in range(NUMBER_ITERATIONS):
    # join(): match keys, combine values into 2-tuple: (k, (v1, v2))
    contribs = links.join(ranks).flatMap(
        # For each URL, compute its contrib to other URLs (based on the rank of each source)
        lambda source__targets_rank: computeContribs(source__targets_rank[1][0], source__targets_rank[1][1]))
    # (target, rank_contribution_to_target)
    #print(contribs.take(10))

    new_ranks_from_sources = contribs.reduceByKey(add)
    #print(new_ranks_from_sources.take(10))
    
    # Re-calculates URL ranks based on linking source URL contributions.
    ranks = ranks.leftOuterJoin(new_ranks_from_sources).mapValues(lambda oldrank_newrank: (oldrank_newrank[1] or 0) * 0.85 + 0.15)

    print("\nnew ranks:")
    print(ranks.take(10))

print("\n\n\n")
    
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
     print("%s has rank: %s." % (link, rank))




all_urls
['4', '1', '6', '5', '3', '2']

initial ranks:
[('4', 1.0), ('1', 1.0), ('6', 1.0), ('5', 1.0), ('3', 1.0), ('2', 1.0)]

new ranks:
[('4', 0.15), ('3', 0.15), ('2', 0.15), ('1', 0.575), ('5', 3.125), ('6', 1.0)]

new ranks:
[('1', 1.478125), ('5', 0.9575), ('6', 1.5418749999999999), ('4', 0.15), ('2', 0.15), ('3', 0.15)]

new ranks:
[('2', 0.15), ('5', 1.7251562499999995), ('3', 0.15), ('1', 0.5569375), ('6', 0.6206875), ('4', 0.15)]

new ranks:
[('2', 0.15), ('1', 0.8831914062499998), ('3', 0.15), ('6', 0.9469414062499998), ('5', 0.9421468749999999), ('4', 0.15)]

new ranks:
[('5', 1.2194626953124996), ('3', 0.15), ('4', 0.15), ('2', 0.15), ('1', 0.550412421875), ('6', 0.6141624218749999)]




5 has rank: 1.2194626953124996.
3 has rank: 0.15.
4 has rank: 0.15.
2 has rank: 0.15.
1 has rank: 0.550412421875.
6 has rank: 0.6141624218749999.


In [None]:
spark.stop()