# PageRank RDD with joins
Implementation using joins from the Spark examples repo.   
https://github.com/apache/spark/blob/master/examples/src/main/python/pagerank.py

In [53]:
# imports
import re
import ast
import time
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt

In [16]:
%%writefile data/pagerank_data.txt
1 2
1 3
1 4
2 1
3 1
4 1

Overwriting data/pagerank_data.txt


In [73]:
%%writefile data/test_graph.txt
2	{'3': 1}
3	{'2': 2}
4	{'1': 1, '2': 1}
5	{'4': 3, '2': 1, '6': 1}
6	{'2': 1, '5': 2}
7	{'2': 1, '5': 1}
8	{'2': 1, '5': 1}
9	{'2': 1, '5': 1}
10	{'5': 1}
11	{'5': 2}

Overwriting data/test_graph.txt


In [141]:
%%writefile data/pagerank_data2.txt
1	{'2': 1,'3': 1,'4': 1}
2	{'1': 1}
3	{'1': 1}
4	{'1': 1}

Overwriting data/pagerank_data2.txt


In [144]:
%%writefile pagerank-spark-example.py
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This is an example implementation of PageRank. For more conventional use,
Please refer to PageRank implementation provided by graphx
Example Usage:
bin/spark-submit examples/src/main/python/pagerank.py data/mllib/pagerank_data.txt 10
"""
from __future__ import print_function

import re
import sys
import ast
from operator import add

from pyspark.sql import SparkSession

def quiet_logs( sc ):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )


def computeContribs(row):
    """
    Calculates URL contributions to the rank of other URLs.
    input: (1, ([], 1.0))
    """
    node, (edges, rank) = row
    num_edges = len(edges)
    
    for edge in edges:
        yield (edge, rank/num_edges )
        
    yield (node,0)    


def parseNeighbors(line):
        """
        Helper function to identify potential danglers and
        write edges as a csv string for efficient aggregation.
        input: 4	{'1': 1, '2': 2}
        output: 4 [1,2,2]
        """
        node, edges = line.strip().split('\t')
        edge_string = []       
        for edge, count in ast.literal_eval(edges).items():
            # emit potential danglers w/ empty string
            yield (int(edge), [])
            # add this edge to our string of edges
            for cnt in range(int(count)):
                edge_string.append(int(edge))
        # finally yield this node w/ its string formatted edge list
        yield (int(node), edge_string)


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: pagerank <file> <iterations>", file=sys.stderr)
        sys.exit(-1)

    print("WARN: This is a naive implementation of PageRank and is given as an example!\n" +
          "Please refer to PageRank implementation provided by graphx",
          file=sys.stderr)

    # Initialize the spark context.
    spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()

    quiet_logs( spark.sparkContext )
    
    # Loads in input file. It should be in format of:
    # 4	{'1': 1, '2': 2}
    
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    
    # Loads all URLs from input file and initialize their neighbors.
    links = lines.flatMap(lambda urls: parseNeighbors(urls)).reduceByKey(lambda a, b: a + b).cache()
#     N=links.count()
    print()
    # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
    ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
    
    # Calculates and updates URL ranks continuously using PageRank algorithm.
    for iteration in range(int(sys.argv[2])):
        # Calculates URL contributions to the rank of other URLs.
        contribs = links.join(ranks).flatMap(
            lambda url_urls_rank: computeContribs(url_urls_rank))

        # Re-calculates URL ranks based on neighbor contributions.
        ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    
    # Collects all URL ranks and dump them to console.
    for (link, rank) in sorted(ranks.collect(),key=lambda x: -x[1]):
        print("%s has rank: %s." % (link, rank))

    
    spark.stop()

Overwriting pagerank-spark-example.py


In [146]:
!spark-submit pagerank-spark-example.py data/test_graph.txt 20

2019-11-15 12:09:25 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN: This is a naive implementation of PageRank and is given as an example!
Please refer to PageRank implementation provided by graphx
2019-11-15 12:09:26 INFO  SparkContext:54 - Running Spark version 2.3.1
2019-11-15 12:09:26 INFO  SparkContext:54 - Submitted application: PythonPageRank
2019-11-15 12:09:26 INFO  SecurityManager:54 - Changing view acls to: root
2019-11-15 12:09:26 INFO  SecurityManager:54 - Changing modify acls to: root
2019-11-15 12:09:26 INFO  SecurityManager:54 - Changing view acls groups to: 
2019-11-15 12:09:26 INFO  SecurityManager:54 - Changing modify acls groups to: 
2019-11-15 12:09:26 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify per

In [47]:
[(2, 0.3620640495978871),
 (3, 0.333992700474142),
 (5, 0.08506399429624555),
 (4, 0.06030963508473455),
 (1, 0.04255740809817991),
 (6, 0.03138662354831139),
 (8, 0.01692511778009981),
 (10, 0.01692511778009981),
 (7, 0.01692511778009981),
 (9, 0.01692511778009981),
 (11, 0.01692511778009981)]

In [143]:
!spark-submit pagerank-spark-example.py data/pagerank_data2.txt 10

2019-11-15 12:08:08 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN: This is a naive implementation of PageRank and is given as an example!
Please refer to PageRank implementation provided by graphx
2019-11-15 12:08:08 INFO  SparkContext:54 - Running Spark version 2.3.1
2019-11-15 12:08:08 INFO  SparkContext:54 - Submitted application: PythonPageRank
2019-11-15 12:08:09 INFO  SecurityManager:54 - Changing view acls to: root
2019-11-15 12:08:09 INFO  SecurityManager:54 - Changing modify acls to: root
2019-11-15 12:08:09 INFO  SecurityManager:54 - Changing view acls groups to: 
2019-11-15 12:08:09 INFO  SecurityManager:54 - Changing modify acls groups to: 
2019-11-15 12:08:09 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify per

In [9]:
2 has rank: 0.7539975652935547.
3 has rank: 0.7539975652935547.
1 has rank: 1.7380073041193354.
4 has rank: 0.7539975652935547.

3.9999999999999996

In [137]:
%%writefile pagerank-spark-example-org.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This is an example implementation of PageRank. For more conventional use,
Please refer to PageRank implementation provided by graphx
Example Usage:
bin/spark-submit examples/src/main/python/pagerank.py data/mllib/pagerank_data.txt 10
"""
from __future__ import print_function

import re
import sys
from operator import add

from pyspark.sql import SparkSession

def quiet_logs( sc ):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)


def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: pagerank <file> <iterations>", file=sys.stderr)
        sys.exit(-1)

    print("WARN: This is a naive implementation of PageRank and is given as an example!\n" +
          "Please refer to PageRank implementation provided by graphx",
          file=sys.stderr)

    # Initialize the spark context.
    spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()
    quiet_logs( spark.sparkContext )

    # Loads in input file. It should be in format of:
    #     URL         neighbor URL
    #     URL         neighbor URL
    #     URL         neighbor URL
    #     ...
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

    # Loads all URLs from input file and initialize their neighbors.
    links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
    for l in links.collect():
        print(l[0],list(l[1]))
    # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
    ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

    # Calculates and updates URL ranks continuously using PageRank algorithm.
    for iteration in range(int(sys.argv[2])):
        # Calculates URL contributions to the rank of other URLs.
        contribs = links.join(ranks).flatMap(
            lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))

        # Re-calculates URL ranks based on neighbor contributions.
        ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

    # Collects all URL ranks and dump them to console.
    for (link, rank) in ranks.collect():
        print("%s has rank: %s." % (link, rank))

    spark.stop()


Overwriting pagerank-spark-example-org.py


In [138]:
!spark-submit pagerank-spark-example-org.py data/pagerank_data.txt 10

2019-11-15 12:05:21 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN: This is a naive implementation of PageRank and is given as an example!
Please refer to PageRank implementation provided by graphx
2019-11-15 12:05:22 INFO  SparkContext:54 - Running Spark version 2.3.1
2019-11-15 12:05:22 INFO  SparkContext:54 - Submitted application: PythonPageRank
2019-11-15 12:05:22 INFO  SecurityManager:54 - Changing view acls to: root
2019-11-15 12:05:22 INFO  SecurityManager:54 - Changing modify acls to: root
2019-11-15 12:05:22 INFO  SecurityManager:54 - Changing view acls groups to: 
2019-11-15 12:05:22 INFO  SecurityManager:54 - Changing modify acls groups to: 
2019-11-15 12:05:22 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify per