
# Spark implementation of Page Rank

In [1]:
from __future__ import print_function

import sys
from operator import add
import pyspark
import os
import glob
from time import time

In [2]:
test_linked_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/test_linked.csv'
test_nolinked_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/test_nolinked.csv'
test_unlinked_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/test_unlinked.csv'
test_extralinks_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/test_extralinks.csv'
test_mostly_unlinked_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/test_mostly_unlinked.csv'

input_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/wiki_test.csv'
multi_input = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/enwiki*.csv'
nowiki_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/clean_data/nowiki*.csv'

In [3]:
output_test_linked_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/validation_linked_01'
output_test_unlinked_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/validation_unlinked_01'
output_test_extralinks_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/validation_extralinks_01'
output_test_mostly_unlinked_path = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/validation_mostly_unlinked_01'

output_path_enwiki_subset = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/page_ranks_enwiki_subset_06'
output_path_enwiki = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/page_ranks_enwiki_01'
output_path_nowiki = '/mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/page_ranks_nowiki_final'

# General functions

In [4]:
def getPages(path):
    """
    Reads a text file into a spark RDD if path is valid
    
    :param path: path of text file
    :return: RDD containing the text file
    """
    # Validity check of the path
    for file in glob.glob(path):
        if not os.path.isfile(file) or '/mnt/' not in file:
            print('Not a valid path')
            sys.exit(-1)
    return sc.textFile('file://'+path)

In [5]:
def getLinks(page):
    """
    This function retrieves the links from any given page
    
    :param page: a page represented as a tab seperated string
    :return: list of links belonging to page
    """
    num = int(page.strip().split('\t')[3])
    if num == 0:
        return []
    else:
        return page.strip().split('\t')[-num:]

In [6]:
def filterFile(links):
    """
    This function removes all links that link to files.
    
    :param links: list of links
    :return: list of filtered links
    """
    return [link for link in links if not (link.find('File:') == 0) and not (link.find('Wikipedia:') == 0)]

In [7]:
def computeContribs(links, rank):
    """
    Calculates the link contributions to the rank of other pages links.
    
    Adapted from the computeContribs funtion from:
        https://github.com/apache/spark/blob/master/examples/src/main/python/pagerank.py
        
    :param links: list of links belonging to a page
    :param rank: the rank of the page the list of links belongs to
    :return: tuples with links and their rank contribution
    """
    num_links = len(links)
    if num_links > 0:
        for link in links:
            yield (link, rank / num_links)

### Isolated functions for testing

In [33]:
def filterLinks(all_pages, links):
    filtered_links = []
    for link in links:
        if link in all_pages:
            filtered_links.append(link)
    return filtered_links

### Removing dead links and sink pages

Dead links are links that link to pages outside the dataset.
Sink pages are pages without outgoing links.

Removing these links must be done on three steps.
1. Remove dead links
  1. Get list of all pages in dataset
  2. Filter out all links that does not link to a page in the list
2. Remove sink pages
  1. Filter out pages with no outgoing links
3. Remove links to sink pages
  1. Get updated list of all pages in dataset
  2. Filter out all links that linked to a sink page

It is important to do it in this order. Because when removing dead links there is a chance of creating more sink pages. Therefore you must first remove the dead links, then the sink pages and finally the links to the sink pages.

In [66]:
def getPagePairs(pages):
    """
    Functions that retrieves the a list of each page with their outgoing links
    
    :param pages: spark RDD containg the input file
    :return: tuples of (page, [list of links])
    """
    # The filter removes empyt lines that may occur in the csv file
    # The map function maps the title of the page to the all the internal links in the page
    page_pairs = (pages
                  .filter(lambda line: len(line.strip()) > 0 and
                          not (line.strip().split('\t')[1].find('File:') == 0) and
                          not (line.strip().split('\t')[1].find('Wikipedia:') == 0))
                  .map(lambda line: (line.strip().split('\t')[1],
                                     filterFile(getLinks(line)))))
    # Removing links that link outside the dataset
    # and then removes all sink pages (pages with no outgoing lins)
    all_pages = page_pairs.map(lambda pair: pair[0]).collect()
    page_pairs = (page_pairs
                  .map(lambda pair: (pair[0], filterLinks(all_pages, pair[1])))
                  .filter(lambda pair: len(pair[1]) > 0))
    # Removing all links that linked to sink pages
    all_pages = page_pairs.map(lambda pair: pair[0]).collect()
    return page_pairs.map(lambda pair: (pair[0], filterLinks(all_pages, pair[1])))

In [9]:
def getN(page_pairs):
    """
    Function that returns the number of page pairs
    
    :param page_pairs: tuples of page pairs
    :return: number of pages
    """
    return page_pairs.count()

In [10]:
def initializeRanks(page_pairs, N):
    """
    Initializes the ranks for the pages in the dataset
    
    :param page_paris: tuples of page pairs
    :param N: number of pages
    :return: tuples of (page, initial rank)
    """
    return page_pairs.map(lambda pair: (pair[0], 1.0/N))

In [12]:
def calculatePageRanks(page_pairs, ranks, N, npart=12, eps=1e-4):
    """
    A function that iteratively calculates the page ranks until convergence
    
    :param page_pairs: tuples of page pairs
    :param ranks: tuples of (page, initial rank)
    :param N: number of pages
    :return: tuples of page ranks (page, final rank)
    """
    num_iter = 0
    while True:
        num_iter += 1
        contribs = procRDD(page_pairs.join(ranks, npart).flatMap(
            lambda page_links_rank:
                computeContribs(page_links_rank[1][0], page_links_rank[1][1])))
        ranks_previous = procRDD(ranks)
        ranks = procRDD(ranks
                        .leftOuterJoin(contribs.reduceByKey(add), npart)
                        .mapValues(lambda rank: 
                                   rank[1] * .85 + .15/N if rank[1]
                                   else (rank[0]/N) * .85 + .15/N))
        convergence = (ranks_previous
                       .join(ranks, npart)
                       .map(lambda rank: abs(rank[1][0]-rank[1][1]))
                       .sum())
        if convergence < eps:
            ranks = procRDD(ranks_previous
                            .leftOuterJoin(contribs.reduceByKey(add), npart)
                            .mapValues(lambda rank:
                                   rank[1] * .85 + .15/N if rank[1]
                                   else 0))
            break
        
    return page_pairs.join(ranks).map(
        lambda page_rank: (page_rank[0], page_rank[1][1]))

In [13]:
def getRanksSum(page_ranks):
    """
    Function that adds the rank of all pages. In a perfect system the sum should be 1.
    But if there are outgoing links to pages not in the dataset or if there are pages
    without incoming links the sum will not be 1, but should still be somewhat close.
    
    :param page_ranks: tuples of (page, rank)
    :return: sum of all ranks
    """
    return page_ranks.map(lambda rank: rank[1]).sum()

# Functions to handle partioning for iterative joins

In [14]:
def procRDD(rdd, cache=True, part=True, hashp=True, npart=12):
    """
    Helper to handle caching/partioning
    
    Function taken from:
    https://stackoverflow.com/questions/31659404/spark-iteration-time-increasing-exponentially-when-using-join
    
    :param rdd: pyspark RDD
    :param cache: boolean (default=True)
    :param part: boolean (default=True)
    :param hashp: boolean (default=True)
    :param npart: number of partitions (default=12 suggested to be 2*(number of cores))
    :return: rdd or rdd.cache()
    """
    rdd = rdd if not part else rdd.repartition(npart)
    rdd = rdd if not hashp else rdd.partitionBy(npart)
    return rdd if not cache else rdd.cache()

In [15]:
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.driver.cores', '2')
pyspark.SparkContext.setSystemProperty('spark.driver.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.cleaner.ttl', '180')
sc = pyspark.SparkContext(appName='pagerank')

In [16]:
enwiki_pages = getPages(multi_input)
nowiki_pages = getPages(nowiki_path)

In [17]:
page_pairs_enwiki = getPagePairs(enwiki_pages)
page_pairs_nowiki = getPagePairs(nowiki_pages)

In [20]:
enwiki = enwiki_pages.map(lambda line: line.strip().split('\t')[0])
nowiki = nowiki_pages.map(lambda line: line.strip().split('\t')[0])

In [21]:
N_enwiki = getN(enwiki)
N_nowiki = getN(nowiki)

In [22]:
N_enwiki, N_nowiki

(11132071, 698181)

## Test of the validity of the algorithm under various circumstances

page_pairs_linked:

All pages have incoming links and all outgoing links links to pages in set

PAGE | LINKS
-----|-------
A    | B C D
B    | A C
C    | A E
D    | A B C
E    | A D

page_pairs_unlinked:

All outgoing links links to pages in set, but not all pages in set have incoming links

PAGE | LINKS
-----|-------
A    | B C D
B    | A C
C    | A
D    | A B C
E    | A D

page_pairs_extralinks:

All pages have incoming links, but some links links to pages outside of the set

PAGE | LINKS
-----|--------
A    | B C D
B    | A C H J
C    | A E K L
D    | A B C
E    | A D

page_pairs_mostly_unlinked:

All outgoing links links to pages in set, but most pages in set does not have incoming links

PAGE | LINKS
-----|-------
A    | B C D
B    | A C
C    | A
D    | A B C
E    | A D
F    | B D
G    | A
H    | A B C
I    | C D

In [15]:
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.driver.cores', '2')
pyspark.SparkContext.setSystemProperty('spark.driver.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.cleaner.ttl', '180')
sc = pyspark.SparkContext(appName='pagerank')

In [67]:
test_linked_pages = getPages(test_linked_path)
test_nolinked_pages = getPages(test_nolinked_path)
test_unlinked_pages = getPages(test_unlinked_path)
test_extralinks_pages = getPages(test_extralinks_path)
test_mostly_unlinked_pages = getPages(test_mostly_unlinked_path)

In [68]:
page_pairs_linked = getPagePairs(test_linked_pages)
page_pairs_nolinked = getPagePairs(test_nolinked_pages)
page_pairs_unlinked = getPagePairs(test_unlinked_pages)
page_pairs_extralinks = getPagePairs(test_extralinks_pages)
page_pairs_mostly_unlinked = getPagePairs(test_mostly_unlinked_pages)

In [52]:
page_pairs_linked.map(lambda pair: pair[0]).collect(), type(page_pairs_linked.map(lambda pair: pair[0]).collect())

(['A', 'B', 'C', 'D', 'E'], list)

In [69]:
'linked', page_pairs_linked.collect(), 'nolinked', page_pairs_nolinked.collect(), 'unlinked', page_pairs_unlinked.collect(), 'extra', page_pairs_extralinks.collect(), 'many unlinked', page_pairs_mostly_unlinked.collect()

('linked',
 [('A', ['B', 'C', 'D']),
  ('B', ['A', 'C']),
  ('C', ['A', 'E']),
  ('D', ['A', 'B', 'C']),
  ('E', ['A', 'D'])],
 'nolinked',
 [('A', ['B', 'C', 'D']),
  ('B', ['A', 'C']),
  ('C', ['A']),
  ('D', ['A', 'B', 'C'])],
 'unlinked',
 [('A', ['B', 'C', 'D']),
  ('B', ['A', 'C']),
  ('C', ['A']),
  ('D', ['A', 'B', 'C']),
  ('E', ['A', 'D'])],
 'extra',
 [('A', ['B', 'C', 'D']),
  ('B', ['A', 'C']),
  ('C', ['A', 'E']),
  ('D', ['A', 'B', 'C']),
  ('E', ['A', 'D'])],
 'many unlinked',
 [('A', ['B', 'C', 'D']),
  ('B', ['A', 'C']),
  ('C', ['A']),
  ('D', ['A', 'B', 'C']),
  ('E', ['A', 'D']),
  ('F', ['B', 'D']),
  ('G', ['A']),
  ('H', ['A', 'B', 'C']),
  ('I', ['C', 'D'])])

In [70]:
N_linked = getN(page_pairs_linked)
N_nolinked = getN(page_pairs_nolinked)
N_unlinked = getN(page_pairs_unlinked)
N_extra = getN(page_pairs_extralinks)
N_mostly_unlinked = getN(page_pairs_mostly_unlinked)

In [71]:
N_linked, N_nolinked, N_unlinked, N_extra, N_mostly_unlinked

(5, 4, 5, 5, 9)

In [72]:
ranks_linked = initializeRanks(page_pairs_linked, N_linked)
ranks_nolinked = initializeRanks(page_pairs_nolinked, N_nolinked)
ranks_unlinked = initializeRanks(page_pairs_unlinked, N_unlinked)
ranks_extra = initializeRanks(page_pairs_extralinks, N_extra)
ranks_mostly_unlinked = initializeRanks(page_pairs_mostly_unlinked, N_mostly_unlinked)

In [73]:
page_ranks_linked = calculatePageRanks(page_pairs_linked, ranks_linked, N_linked)
page_ranks_nolinked = calculatePageRanks(page_pairs_nolinked, ranks_nolinked, N_nolinked)
page_ranks_unlinked = calculatePageRanks(page_pairs_unlinked, ranks_unlinked, N_unlinked)
page_ranks_extra = calculatePageRanks(page_pairs_extralinks, ranks_extra, N_extra)
page_ranks_mostly_unlinked = calculatePageRanks(page_pairs_mostly_unlinked, ranks_mostly_unlinked, N_mostly_unlinked)

In [74]:
'linked', page_ranks_linked.collect(), 'nolinked', page_ranks_nolinked.collect(), 'unlinked', page_ranks_unlinked.collect(), 'extra', page_ranks_extra.collect(), 'many unlinked', page_ranks_mostly_unlinked.collect()

('linked',
 [('D', 0.17062566477346572),
  ('A', 0.30233451660549926),
  ('C', 0.23370635742716941),
  ('E', 0.12933060638071578),
  ('B', 0.16400285481314963)],
 'nolinked',
 [('D', 0.1481819841284264),
  ('A', 0.39066040762631127),
  ('C', 0.27099187470561176),
  ('B', 0.19016573353965016)],
 'unlinked',
 [('D', 0.15731831479952635),
  ('A', 0.3951524436171989),
  ('C', 0.26581327875089417),
  ('B', 0.18653523581166995),
  ('E', 0)],
 'extra',
 [('D', 0.17062566477346572),
  ('A', 0.30233451660549926),
  ('C', 0.23370635742716941),
  ('B', 0.16400285481314963),
  ('E', 0.12933060638071578)],
 'many unlinked',
 [('I', 0),
  ('D', 0.14824504749182213),
  ('A', 0.3815948584312732),
  ('G', 0),
  ('C', 0.2562529047416004),
  ('F', 0),
  ('B', 0.17982402638834158),
  ('E', 0),
  ('H', 0)])

In [75]:
ranks_sum_linked = getRanksSum(page_ranks_linked)
ranks_sum_nolinked = getRanksSum(page_ranks_nolinked)
ranks_sum_unlinked = getRanksSum(page_ranks_unlinked)
ranks_sum_extra = getRanksSum(page_ranks_extra)
ranks_sum_mostly_unlinked = getRanksSum(page_ranks_mostly_unlinked)

In [76]:
ranks_sum_linked, ranks_sum_nolinked, ranks_sum_unlinked, ranks_sum_extra, ranks_sum_mostly_unlinked

(0.9999999999999998,
 0.9999999999999997,
 1.0048192729792893,
 0.9999999999999998,
 0.9659168370530374)

In [34]:
sc.stop()

### Results

page_pairs_linked:

PAGE | LINKS | PAGE RANK
-----|-------|--------------------
A    | B C D | 0.30233451660549926
B    | A C   | 0.16400285481314963
C    | A E   | 0.23370635742716941
D    | A B C | 0.17062566477346572
E    | A D   | 0.12933060638071578
TOTAL| RANK  | 0.9999999999999999

page_pairs_unlinked:

PAGE | LINKS | PAGE RANK
-----|-------|--------------------
A    | B C D | 0.39515244361719903
B    | A C   | 0.18653523581166997
C    | A     | 0.26581327875089417
D    | A B C | 0.15731831479952638
E    | A D   | 0.03614458161656832
TOTAL| RANK  | 1.040963854595858

page_pairs_extralinks:

PAGE | LINKS   | PAGE RANK
-----|---------|--------------------
A    | B C D   | 0.12030966188618383
B    | A C H J | 0.0894076291857128
C    | A E K L | 0.10859879786388368
D    | A B C   | 0.08728132127189267
E    | A D     | 0.05333157300527242
TOTAL| RANK    | 0.45892898321294545

page_pairs_mostly_unlinked:

PAGE | LINKS | PAGE RANK
-----|-------|---------------------
A    | B C D | 0.38159485843127317
B    | A C   | 0.17982402638834158
C    | A     | 0.2562529047416004
D    | A B C | 0.14824504749182213
E    | A D   | 0.018404907980694597
F    | B D   | 0.018404907980694597
G    | A     | 0.018404907980694597
H    | A B C | 0.018404907980694597
I    | C D   | 0.018404907980694597
TOTAL| RANK  | 1.0579413769565098

# PageRank of norwegian wiki dump

## Initial attempt
Attempt without manual control of partioning of join operations

In [36]:
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.driver.cores', '2')
pyspark.SparkContext.setSystemProperty('spark.driver.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.cleaner.ttl', '180')
sc = pyspark.SparkContext(appName='pagerank')
start = time()
eps = 1e-2
npart = 48
pages = getPages(nowiki_path)
page_pairs = pages \
    .filter(lambda line: len(line.strip()) > 0 and \
           not (line.strip().split('\t')[1].find('File:') == 0) and \
           not (line.strip().split('\t')[1].find('Wikipedia:') == 0)) \
    .map(lambda line: (line.strip().split('\t')[1], \
                       filterFile(getLinks(line))))
N = page_pairs.count()
ranks = page_pairs.map(lambda pair: (pair[0], 1.0/N))
num_iter = 0

while True:
    num_iter += 1
    contribs = page_pairs.join(ranks).flatMap(
        lambda page_links_rank:
            computeContribs(page_links_rank[1][0], page_links_rank[1][1]))
    ranks_previous = procRDD(ranks)
    ranks = procRDD(ranks \
        .leftOuterJoin(contribs.reduceByKey(add)) \
        .mapValues(lambda rank: 
                   rank[1] * .85 + .15/N if rank[1] 
                   else (rank[0]/N) * .85 + .15/N))
    convergence = ranks_previous \
        .join(ranks) \
        .map(lambda rank: abs(rank[1][0]-rank[1][1])) \
        .sum()
    print('Convergence at iteration %s: %s' % (num_iter, round(convergence, 10)))
    if convergence < eps or num_iter > 5:
        break

page_ranks = page_pairs.join(ranks).map(
    lambda page_rank: (page_rank[0], page_rank[1][1]))

ranks_sum = page_ranks.map(lambda rank: rank[1]).sum()
print(ranks_sum)
end = time()
print('Total execution time was:', round(end-start), 'seconds')
sc.stop()

Convergence at iteration 1: 1.0168933004
Convergence at iteration 2: 0.3969783753
Convergence at iteration 3: 0.2025033943
Convergence at iteration 4: 0.1226482849
Convergence at iteration 5: 0.0762758779
Convergence at iteration 6: 0.0474383168
0.5282880031918347
Total execution time was: 488 seconds


## With partioning
Attempt with manual partitioning of join operations

In [14]:
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.driver.cores', '2')
pyspark.SparkContext.setSystemProperty('spark.driver.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.cleaner.ttl', '180')
sc = pyspark.SparkContext(appName='pagerank')
start = time()
eps = 1e-2
npart = 48
# The filter removes empyt lines that may occur in the csv file
# The map function maps the title of the page to the all the internal links in the page
pages = getPages(nowiki_path)
page_pairs = pages \
    .filter(lambda line: len(line.strip()) > 0 and \
           not (line.strip().split('\t')[1].find('File:') == 0) and \
           not (line.strip().split('\t')[1].find('Wikipedia:') == 0)) \
    .map(lambda line: (line.strip().split('\t')[1], \
                       filterFile(getLinks(line))))
N = page_pairs.count()
ranks = page_pairs.map(lambda pair: (pair[0], 1.0/N))
num_iter = 0

while True:
    num_iter += 1
    contribs = page_pairs.join(ranks, npart).flatMap(
        lambda page_links_rank:
            computeContribs(page_links_rank[1][0], page_links_rank[1][1]))
    ranks_previous = ranks
    ranks = ranks \
        .leftOuterJoin(contribs.reduceByKey(add), npart) \
        .mapValues(lambda rank: 
                   rank[1] * .85 + .15/N if rank[1] 
                   else (rank[0]/N) * .85 + .15/N)
    convergence = ranks_previous \
        .join(ranks, npart) \
        .map(lambda rank: abs(rank[1][0]-rank[1][1])) \
        .sum()
    print('Convergence at iteration %s: %s' % (num_iter, round(convergence, 10)))
    if convergence < eps:
        break

page_ranks = page_pairs.join(ranks, npart).map(
    lambda page_rank: (page_rank[0], page_rank[1][1]))

ranks_sum = page_ranks.map(lambda rank: rank[1]).sum()
print(ranks_sum)
end = time()
print('Total execution time was:', round(end-start), 'seconds')
sc.stop()

Convergence at iteration 1: 1.0168933004
Convergence at iteration 2: 0.3969783753
Convergence at iteration 3: 0.2025033943
Convergence at iteration 4: 0.1226482849
Convergence at iteration 5: 0.0762758779
Convergence at iteration 6: 0.0474383168
Convergence at iteration 7: 0.0314807382
Convergence at iteration 8: 0.0209964926
Convergence at iteration 9: 0.0136085296
Convergence at iteration 10: 0.0090386682
0.4935836470026831
Total execution time was: 1557 seconds


# Final implementation with partitioning and caching

Best performence w.r.t execution time, but caching may cause "no space left on device" errors when working with limited resources.

The system property cleaner.ttl defines spark objects time to live. This is a vital component when working with limited resources. It needs to set to a value greater than the execution time of one iteration of the while loop, to have the desired effect. Preferably 1.5 times the execution time of one iteration.
    
Documentation for spark.cleaner.ttl:
>Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications).  Note that any RDD that persists in memory for more than this duration will be cleared as well.

In [17]:
sc.stop()

In [18]:
if __name__ == '__main__':
    pyspark.SparkContext.setSystemProperty('spark.executor.memory', '7g')
    pyspark.SparkContext.setSystemProperty('spark.driver.cores', '2')
    pyspark.SparkContext.setSystemProperty('spark.driver.memory', '7g')
    pyspark.SparkContext.setSystemProperty('spark.cleaner.ttl', '180')
    sc = pyspark.SparkContext(appName='pagerank')
    start = time()
    eps = 1e-2
    npart = 48
    # The filter removes empyt lines that may occur in the csv file
    # The map function maps the title of the page to the all the internal links in the page
    pages = getPages(nowiki_path)
    page_pairs = procRDD(pages \
        .filter(lambda line: len(line.strip()) > 0 and \
               not (line.strip().split('\t')[1].find('File:') == 0) and \
               not (line.strip().split('\t')[1].find('Wikipedia:') == 0)) \
        .map(lambda line: (line.strip().split('\t')[1], \
                           filterFile(getLinks(line)))), npart=npart)
    N = page_pairs.count()
    ranks = page_pairs.map(lambda pair: (pair[0], 1.0/N))
    num_iter = 0
    
    while True:
        num_iter += 1
        contribs = page_pairs.join(ranks, npart).flatMap(
            lambda page_links_rank:
                computeContribs(page_links_rank[1][0], page_links_rank[1][1]))
        ranks_previous = procRDD(ranks, npart=npart)
        ranks = procRDD(ranks \
            .leftOuterJoin(contribs.reduceByKey(add), npart) \
            .mapValues(lambda rank: 
                       rank[1] * .85 + .15/N if rank[1] 
                       else (rank[0]/N) * .85 + .15/N), npart=npart)
        convergence = ranks_previous \
            .join(ranks, npart) \
            .map(lambda rank: abs(rank[1][0]-rank[1][1])) \
            .sum()
        print('Convergence at iteration %s: %s' % (num_iter, round(convergence, 10)))
        if convergence < eps:
            ranks = procRDD(ranks_previous
                            .leftOuterJoin(contribs.reduceByKey(add), npart)
                            .mapValues(lambda rank:
                                   rank[1] * .85 + .15/N if rank[1]
                                   else 0))
            break
        else:
            page_pairs = page_pairs.cache()
    
    
    page_ranks = page_pairs.join(ranks, npart).map(
        lambda page_rank: (page_rank[0], page_rank[1][1]))
    
    ranks_sum = page_ranks.map(lambda rank: rank[1]).sum()
    print(ranks_sum)
    end = time()
    print('Total execution time was:', round(end-start), 'seconds')
    print('Total number of pages in dataset:', N)
    
    # Uncomment if results to be saved to file
#     page_ranks.saveAsTextFile(output_path_nowiki)

Convergence at iteration 1: 1.0168933004
Convergence at iteration 2: 0.3969783753
Convergence at iteration 3: 0.2025033943
Convergence at iteration 4: 0.1226482849
Convergence at iteration 5: 0.0762758779
Convergence at iteration 6: 0.0474383168
Convergence at iteration 7: 0.0314807382
Convergence at iteration 8: 0.0209964926
Convergence at iteration 9: 0.0136085296
Convergence at iteration 10: 0.0090386682
0.49358364700269325
Total execution time was: 613 seconds
Total number of pages in dataset: 679871


# Highest Ranked Pages

In [121]:
def getPageRank(line):
    pair = line.split(',')
    if len(pair) > 2:
        return ','.join(pair[:-1])[2:], float(pair[-1].strip()[:-1])
    else:
        return pair[0][2:-1], float(pair[1].strip()[:-1])

In [134]:
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.driver.cores', '2')
pyspark.SparkContext.setSystemProperty('spark.driver.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.cleaner.ttl', '600')
sc = pyspark.SparkContext(appName='pagerank')

In [135]:
pages_ranks = sc.textFile('file:///mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/page_ranks_nowiki_final/part*')

In [138]:
page_ranks = pages_ranks.map(lambda line: getPageRank(line))

In [139]:
page_ranks.take(5)

[('Knole House', 2.714370680994686e-07),
 ('Kapitel', 2.2063036596032346e-07),
 ('Vorspiel', 2.637562177457272e-07),
 ('Nyhetsbyrå', 3.6482482045256115e-07),
 ('Rostrevor', 2.7067263659980717e-07)]

In [140]:
sorted_ranking = page_ranks.sortBy(lambda a: a[1], False)
sorted_ranking.count(), sorted_ranking.take(10)

(679871,
 [('Kategori:Wikipedia-administrativa', 0.0031242702323758718),
  ('Kategori:Kategorier', 0.0024946961753485814),
  ('Kategori:Samfunn', 0.002282169055405368),
  ('Kategori:Kategoriinnhold', 0.0017586968871229995),
  ('Kategori:Geografi', 0.0016417855977165956),
  ('Kategori:Struktur', 0.0016032405898556326),
  ('Kategori:Tid', 0.0014818645713440173),
  ('Kategori:Århundrer', 0.0013188511020142159),
  ('Kategori:Kategorier etter område', 0.00131415971468974),
  ('Kategori:Sport', 0.0011078784725969942)])

In [141]:
sc.stop()

In [142]:
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.driver.cores', '2')
pyspark.SparkContext.setSystemProperty('spark.driver.memory', '7g')
pyspark.SparkContext.setSystemProperty('spark.cleaner.ttl', '600')
sc = pyspark.SparkContext(appName='pagerank')

In [143]:
pages_ranks = sc.textFile('file:///mnt/wiktorskit-danielb-ns0000k/home/notebook/group04/PR/page_ranks_enwiki_01/part*')

In [144]:
page_ranks = pages_ranks.map(lambda line: getPageRank(line))

In [145]:
page_ranks.take(5)

[('Jamie Campbell (Scottish footballer)', 1.917980528360071e-08),
 ('Marouane Mrabet', 1.6227994845750827e-08),
 ('Carry cot', 1.6227994845750827e-08),
 ('Draft:Barry Twomlow', 1.6227994845750827e-08),
 ('Category:File-Class The Amazing Race articles', 1.6227994845750827e-08)]

In [146]:
sorted_ranking = page_ranks.sortBy(lambda a: a[1], False)
sorted_ranking.count(), sorted_ranking.take(10)

(9243471,
 [('Category:Categories by parameter', 0.0013003267398329944),
  ('Category:Wikipedia categories', 0.0012249346062321908),
  ('Category:Categories by location', 0.000962433864150088),
  ('Category:Living people', 0.0008139243993796682),
  ('Category:Main topic classifications', 0.0007176716849691154),
  ('United States', 0.0006170628175589761),
  ('Category:Continents', 0.0005736776471412021),
  ('Category:Wikipedia administration', 0.0005318544402534897),
  ('Category:Categories by country', 0.0005265113883494863),
  ('Category:Categories by geographical categorization',
   0.0005091010298417353)])

In [147]:
sc.stop()