In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 66.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=d54cfa3dc623034e9e0d3ddbf0f322c81157b682fcfb897292cc46a4f43838f5
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [3]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.accumulators import AddingAccumulatorParam
import numpy as np
import networkx as nx
import time

In [None]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
acc = sc.accumulator(0)

## **CCF CODEBASE**

In [None]:
class CCF:
    def __init__(self, sc, graph_data, nodes=None):
        self.sc = sc
        if nodes != None:
            self.raw_data = sc.parallelize(graph_data, nodes)
        else:
            self.raw_data = sc.parallelize(graph_data)
        self.spark_nodes = nodes
        self.components = None
        self.vertices_length = len(graph_data[0][0])
        self.step_time = None

    def reset(self):
        """
        Resets the self.components variable
        """
        self.components = None

    def find_connected_components(
        self, secondary_sorting=False, debug_time=False, debug=False
    ):
        """
        Finds connected components using CCF
        """
        if self.components != None:
            # avoid recalculating but uses space:
            return self.components
        else:
            # keep raw_data untouched for future use, proceed with rdd variable:
            rdd = self.raw_data
            PAIR_CREATED = False
            start = time.perf_counter()
            iteration = 0
            while True:
                # new pair counter:
                acc.value = 0
                # iterating...
                rdd = self.iterate(rdd, secondary_sorting, debug_time)
                iteration += 1
                PAIR_CREATED = bool(acc.value)
                if debug:
                    print("iterate {}:".format(iteration), rdd.collect())
                # testing terminating condition:
                if PAIR_CREATED == False:
                    break
            end = time.perf_counter()
            print("ccf exec. time: {}".format(end - start))
            # saving results:
            self.components = rdd
            return self.components

    def iterate(self, rdd, secondary_sorting, debug_time):
        """
        Iterates RDD according to the CCF process
        """
        self.step_time = time.perf_counter()
        # mapping job:
        rdd = self.map(rdd)
        self.print_time(debug_time, "map")
        # shuffle & sort:
        rdd = self.shuffle_and_sort(rdd, secondary_sorting)
        self.print_time(debug_time, "s&s")
        # reducing job:
        rdd = self.reduce(rdd, secondary_sorting)
        self.print_time(debug_time, "reduce")
        # deduplicating job:
        rdd = self.dedup(rdd)
        self.print_time(debug_time, "dedup")
        if debug_time:
            print("---")
        return rdd

    def map(self, rdd):
        """
        CCF Map Job
        """
        # flattening the newly created pairs (edges):
        return rdd.flatMap(lambda x: [(x[1], x[0]), (x[0], x[1])])

    def shuffle_and_sort(self, rdd, secondary_sorting):
        """
        CCF Shuffle & Sort
        """
        if secondary_sorting:
            rdd = self.secondary_sort(rdd)
        # grouping by key for the reducing job:
        rdd = rdd.groupByKey()
        return rdd

    def reduce(self, rdd, secondary_sorting):
        """
        CCF Reduce Job
        """

        def reduce_pair(key_value, ss):
            """
            Reduces (key, values) pair
            """
            key = key_value[0]
            values = key_value[1]
            if ss:
                it = values.__iter__()
                minimum = next(it)
                if minimum < key:
                    yield (key, minimum)
                    for value in it:
                        acc.add(1)
                        yield (value, minimum)
            else:
                value_list = []
                minimum = key
                for value in values:
                    if value < minimum:
                        minimum = value
                    value_list.append(value)
                if minimum < key:
                    yield (key, minimum)
                    for value in value_list:
                        if value != minimum:
                            acc.add(1)
                            yield (value, minimum)

        # flattening emitted pairs (edges):
        rdd = rdd.flatMap(lambda x: list(reduce_pair(x, secondary_sorting)))
        # (implementation purpose: triggers new pair counter collection)
        rdd.foreach(lambda x: None)
        return rdd

    def secondary_sort(self, rdd):
        """
        Secondary Sort
        """
        # creating composite keys: (key+value, None):
        rdd = rdd.map(lambda x: (x[0] + x[1], None))
        # sorting pairs:
        rdd = rdd.sortByKey()
        # (avoid pyspark error)
        length = self.vertices_length
        # retransforming pairs: (key, value):
        rdd = rdd.map(lambda x: (x[0][0 : 0 + length], x[0][length:]))
        # collecting unique keys: (USELESS)
        # keys = np.unique(rdd.keys().collect()).tolist()
        # partitioning for future grouping: (USELESS)
        # rdd = rdd.partitionBy(len(keys), lambda x: keys.index(x))
        return rdd

    def dedup(self, rdd):
        """
        Removes pair's duplicates
        """
        # transform (key,value) into (key+value, None) --> group by key --> keep only one occurence:
        rdd = rdd.map(lambda x: (x, None)).groupByKey().map(lambda x: x[0])
        return rdd

    def print_time(self, debug_time, job):
        """
        Displays jobs execution time
        """
        if debug_time:
            now = time.perf_counter()
            job_time = now - self.step_time
            print("- {}: {}".format(job, job_time))
            self.step_time = now

## **PAPER EXAMPLE**

In [None]:
edges = [("A", "B"), ("B", "D"), ("B", "C"), ("D", "E"), ("F", "G"), ("G", "H")]

instance = CCF(sc, edges)
res = instance.find_connected_components(debug_time=False, debug=False)
print(" - (Paper example) connected  comp.:")
print(res.collect())

ccf exec. time: 6.626811511999904
 - (Paper example) connected  comp.:
[('B', 'A'), ('E', 'A'), ('D', 'A'), ('G', 'F'), ('H', 'F'), ('C', 'A')]


In [None]:
instance.reset()

res = instance.find_connected_components(
    debug_time=False, debug=False, secondary_sorting=True
)
print("- (Paper example) connected  comp. (secondary sorting):")
print(res.collect())

ccf exec. time: 5.257217989999845
- (Paper example) connected  comp. (secondary sorting):
[('B', 'A'), ('D', 'A'), ('E', 'A'), ('G', 'F'), ('H', 'F'), ('C', 'A')]


## **TESTING: GOOGLE WEB GRAPH**

In [5]:
class GoogleGraph:
    def __init__(self):
        self.edges = None
        self.node_digits = 6

    def import_graph(self):
        edges = []
        with open("drive/MyDrive/web-Google.txt", "r") as reader:
            line = reader.readline()
            c = 0  # skip first lines
            while line != "" and c < 10:
                if c > 3:
                    nodes = line.split("\t")
                    edge = (
                        nodes[0].zfill(self.node_digits),
                        nodes[1].split("\n")[0].zfill(self.node_digits),
                    )
                    edges.append(edge)
                else:
                    c += 1
                line = reader.readline()
        self.edges = edges

    def get_edges(self):
        if self.edges != None:
            return self.edges
        else:
            self.import_graph()
            return

In [6]:
g = GoogleGraph()
g.import_graph()
print(g.get_edges()[0:2])
print(g.get_edges()[-2:])

[('000000', '011342'), ('000000', '824020')]
[('916425', '637936'), ('916425', '837379')]


In [None]:
google_ccf = CCF(sc, g.get_edges())
google_cc_std = google_ccf.find_connected_components(debug_time=True)
print("#components:", google_cc_std.map(lambda x: x[1]).distinct().count())

- map: 0.0003094399999099551
- s&s: 0.046900127000071734
- reduce: 79.34880173900001
- dedup: 0.031098041999939596
---
- map: 1.8279999949299963e-05
- s&s: 0.026213134000045102
- reduce: 248.49539604300003
- dedup: 0.03327128699993409
---
- map: 1.8245999854116235e-05
- s&s: 0.026581432000057248
- reduce: 131.3225960090001
- dedup: 0.024229388999856383
---
- map: 2.5254999854951166e-05
- s&s: 0.027879599000016242
- reduce: 127.35053314699985
- dedup: 0.02704241900028137
---
- map: 1.584699975865078e-05
- s&s: 0.03419299300003331
- reduce: 89.5147570830004
- dedup: 0.03063217499993698
---
- map: 1.7675999970379053e-05
- s&s: 0.020618737999939185
- reduce: 56.73327731900008
- dedup: 0.018731141999978718
---
- map: 1.5192999853752553e-05
- s&s: 0.016867442000148003
- reduce: 23.37400726499982
- dedup: 0.020140273999913916
---
- map: 1.0429999747429974e-05
- s&s: 0.02366459900031259
- reduce: 22.987762718999875
- dedup: 0.02287721099992268
---
- map: 1.005100011752802e-05
- s&s: 0.02104109

In [None]:
google_ccf.reset()
google_cc_ss = google_ccf.find_connected_components(
    debug_time=True, secondary_sorting=True
)
print("#components:", google_cc_ss.map(lambda x: x[1]).distinct().count())

- map: 0.0007850850001887011
- s&s: 18.579826112999854
- reduce: 110.98940198599985
- dedup: 0.041792764000092575
---
- map: 1.9083000097452896e-05
- s&s: 253.22066570599964
- reduce: 122.19920986500028
- dedup: 0.020465346000037243
---
- map: 2.3858000076870667e-05
- s&s: 153.74350883800025
- reduce: 90.79504688499992
- dedup: 0.02158926199990674
---
- map: 1.0443000064697117e-05
- s&s: 155.09256084799972
- reduce: 110.19486207599994
- dedup: 0.021932164000190824
---
- map: 1.088699991669273e-05
- s&s: 80.08770742200022
- reduce: 45.427534124999966
- dedup: 0.018323257999782072
---
- map: 1.2782999874616507e-05
- s&s: 52.45926239900018
- reduce: 32.423533187999965
- dedup: 0.02185456699999122
---
- map: 1.4957000075810356e-05
- s&s: 21.69100784500006
- reduce: 28.219643306000307
- dedup: 0.02079953599968576
---
- map: 1.0288999874319416e-05
- s&s: 21.15454496600023
- reduce: 16.860583767999742
- dedup: 0.019400208000206476
---
- map: 1.1297000128251966e-05
- s&s: 19.755967261000023
- 

In [15]:
graph = nx.Graph()
for edge in g.get_edges():
    graph.add_edge(edge[0], edge[1])
print("#components (by networkx):", nx.number_connected_components(graph))

#components (by networkx): 2746
