## Problem 5

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
import matplotlib.ticker as ticker   
import pandas as pd

import seaborn as sns
sns.set_style('whitegrid')

# shortest path code is in this module
from P5_sssp import *

# setup spark
conf = SparkConf().setAppName('WikiGraph')
sc = SparkContext(conf=conf, pyFiles=['P5_sssp.py'])
sc.setLogLevel('ERROR')

In [57]:
# prepare test graph
# graph should have no jumps in node IDs! (maybe remap before!)
testgraph = [(1, [2, 3]), (2, [1, 3]), (3, [1,2,4]), (4, [3]), (5, [6, 7]), (6, [5]), (7, [5])]

In [58]:
rdd = sc.parallelize(testgraph)

In [59]:
# transform to structure (init)
rddA = rdd.map(lambda x: (x[0], (x[1], x[0])))
maxNodeID = rddA.map(lambda x: x[0]).max()
minNodeID = rddA.map(lambda x: x[0]).min()

In [60]:
rddA.map(lambda x: x[0]).min()

1

In [61]:
rddA.collect()

[(1, ([2, 3], 1)),
 (2, ([1, 3], 2)),
 (3, ([1, 2, 4], 3)),
 (4, ([3], 4)),
 (5, ([6, 7], 5)),
 (6, ([5], 6)),
 (7, ([5], 7))]

In [62]:
# helper function to check whether the partitioning in connected components changed
def checkForPartitionChange(newSizes, oldSizes):
    partitionChanged = False

    # quick check if length of the dict is different to previous step
    # the partitions changed!
    if len(oldSizes.items()) != len(newSizes.items()):
        partitionChanged = True
    else:
        # the size did not change, but what about some internal shifting?
        # ==> check for each connected component if size changed!
        for ID in range(minNodeID, maxNodeID+1):
            # check if for old / new the current ID exists and is equal
            # ==> if not, a change happened!
            try:
                sizeOld = oldSizes[ID]
                sizeNew = newSizes[ID]

                if sizeOld != sizeNew:
                    partitionChanged = True
                    break
            except KeyError:
                partitionChanged = True
                break
                
    return partitionChanged

In [75]:

rdd = rddA


# this is the algorithm (Pegasus after ...)
done = False

while not done:
    # compute sizes of connected components
    rddComponents = rdd.map(lambda x: x[1][1])
    oldSizes = rddComponents.countByValue()


    # mapper / expander
    rdd = rdd.flatMap(lambda x: [x] + [(y, ([], x[1][1])) for y in x[1][0]])

    # reducer (as there is only !one! element with the whole adj. list, we can speed it up here!)
    rdd = rdd.reduceByKey(lambda a,b: (a[0] + b[0], min(a[1], b[1])))

    # determine if number of partitions changed! (0 is a dummy value to construct an pair rdd)
    rddComponents = rdd.map(lambda x: x[1][1])
    newSizes = rddComponents.countByValue()

    # if partition changed, one more round!
    done = not checkForPartitionChange(newSizes, oldSizes)

    oldSizes = newSizes

# reconstruct connected components
componentList = rdd.map(lambda x: (x[1][1], x[0])).sortByKey().groupByKey().map(lambda x: sorted(list(x[1]))).collect()


In [77]:
# perform PEGASUS algorithm after ...
def connectedComponents(rddIn):
    
    # transform to structure (init)
    rdd = rddIn.map(lambda x: (x[0], (x[1], x[0])))
    maxNodeID = rdd.map(lambda x: x[0]).max()
    minNodeID = rdd.map(lambda x: x[0]).min()
    
    # helper function to check whether the partitioning in connected components changed
    def checkForPartitionChange(newSizes, oldSizes):
        partitionChanged = False

        # quick check if length of the dict is different to previous step
        # the partitions changed!
        if len(oldSizes.items()) != len(newSizes.items()):
            partitionChanged = True
        else:
            # the size did not change, but what about some internal shifting?
            # ==> check for each connected component if size changed!
            for ID in range(minNodeID, maxNodeID+1):
                # check if for old / new the current ID exists and is equal
                # ==> if not, a change happened!
                try:
                    sizeOld = oldSizes[ID]
                    sizeNew = newSizes[ID]

                    if sizeOld != sizeNew:
                        partitionChanged = True
                        break
                except KeyError:
                    partitionChanged = True
                    break

        return partitionChanged
    
    
    # this is the algorithm (Pegasus after ...)
    done = False

    while not done:
        # compute sizes of connected components
        rddComponents = rdd.map(lambda x: x[1][1])
        oldSizes = rddComponents.countByValue()


        # mapper / expander
        rdd = rdd.flatMap(lambda x: [x] + [(y, ([], x[1][1])) for y in x[1][0]])

        # reducer (as there is only !one! element with the whole adj. list, we can speed it up here!)
        rdd = rdd.reduceByKey(lambda a,b: (a[0] + b[0], min(a[1], b[1])))

        # determine if number of partitions changed! (0 is a dummy value to construct an pair rdd)
        rddComponents = rdd.map(lambda x: x[1][1])
        newSizes = rddComponents.countByValue()

        # if partition changed, one more round!
        done = not checkForPartitionChange(newSizes, oldSizes)

        oldSizes = newSizes

    # reconstruct connected components
    componentList = rdd.map(lambda x: (x[1][1], x[0])).sortByKey().groupByKey().map(lambda x: sorted(list(x[1]))).collect()

    return componentList

In [79]:
# test run on rdd!
rddTest = sc.parallelize(testgraph)

cList = connectedComponents(rddTest)

In [80]:
cList

[[1, 2, 3, 4], [5, 6, 7]]