# MAP543 - Database Management
Students: Khouloud EL ALAMI, Aya ERRAJRAJI, Ali EL ABBASSY
***
CCF: Fast and Scalable Connected Component Computation in MapReduce
https://www.cse.unr.edu/~hkardes/pdfs/ccf.pdf

In [0]:
import sys
import csv
import time
import pandas as pd

import pyspark
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, collect_list, col, sort_array, array_min, coalesce, size, sum, explode 

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 50) # used to deal with memory congestion when using shuffling functions such as aggregations and union
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", True) # to enable data compression in the memory
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000) # large batch sizes can improve memory utilization and compression 
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True) # to enable coalescing data into fewer partitions
spark.sparkContext.setLogLevel("OFF") # to turn off info logs

In [0]:
def clean(data):
    # Removing the headers
    rdd = data.filter(lambda x: '#' not in x)
    # Splitting on the tabulation
    rdd = rdd.map(lambda x: x.split('\t'))
    # Putting the nodes in the right type
    rdd = rdd.map(lambda x: (int(x[0]), int(x[1])))
    # Converting rdd to dataframe and renaming columns
    df = spark.createDataFrame(rdd).select(
        col('_1').alias('key'), col('_2').alias('value'))
    return df

In [0]:
# Toy graph from the paper
# 8 nodes and 6 edges
toy_graph = spark.createDataFrame(
    [('A', 'B'),
     ('B', 'C'),
     ('B', 'D'),
     ('D', 'E'),
     ('F', 'G'),
     ('G', 'H')],
    ['key', 'value']).cache()  # add your column names here

# Google web graph from http://snap.stanford.edu/data/web-Google.html
# 875K nodes and 5.1M edges
google = clean(sc.textFile('/FileStore/tables/web_Google.txt')).cache()

In [0]:
def CCF(ccf_iterate_map, secondary_sorting):
  
    iteration = 0 
    CounterNewPair = 1 # >0 to allow entering the while loop

    while CounterNewPair > 0:
    # CCF-Iterate and CCF-Dedup jobs without secondary sorting 
        if not secondary_sorting:
          # CCF-Iterate Map phase :
            ccf_iterate_map = ccf_iterate_map.union(ccf_iterate_map.select('value', 'key')).coalesce(5)
          # CCF-Iterate Reduce phase :
           # First, we extract the minimum value 
           # Then, since we only emit if the minimum value is smaller than the key, we filter whenever it's not the case 
           # Finally, we only keep the values that are different from the minimum 
            ccf_iterate_reduce = ccf_iterate_map.groupBy('key').agg(collect_list('value').alias('value')).withColumn('min_val', coalesce(array_min(col('value'))))\
            .filter(col('min_val') < col('key')).cache()
           # We update the global NewPair counter
            CounterNewPair = ccf_iterate_reduce.withColumn('CounterNewPair', size(col('value'))-1).agg(sum('CounterNewPair')).collect()[0][0]
           # We emit the union of the couples (key, min) and (value, min). The CCF-Dedup job is done by distinct()
            emit1 = ccf_iterate_reduce.select('key', col('min_val').alias('value'))
            ccf_iterate_map = emit1.union(ccf_iterate_reduce.select(explode(col('value')).alias('value'), col('min_val')).filter(col('value') != col('min_val'))).distinct()

     # CCF-Iterate and CCF-Dedup jobs with secondary sorting 
        elif secondary_sorting:        
         # CCF-Iterate Map phase :
            ccf_iterate_map = ccf_iterate_map.union(ccf_iterate_map.select('value', 'key')).coalesce(5)
         # CCF-Iterate Reduce phase :
          # First, we sort the value
          # Then, since we only emit if the minimum value is smaller than the key, we filter whenever it's not the case
          # Finally, we only keep the values that are different from the minimum
            ccf_iterate_reduce = ccf_iterate_map.groupBy('key').agg(collect_list('value').alias('value')).cache()
            ccf_iterate_reduce = ccf_iterate_reduce.select('key', sort_array(col('value')).alias('value')).withColumn('min_val', col('value').getItem(0)).filter(col('min_val') < col('key'))
           # We update the global NewPair counter
            CounterNewPair = ccf_iterate_reduce.withColumn('CounterNewPair', size(col('value'))-1).agg(sum('CounterNewPair')).collect()[0][0]
          # We emit the union of the couples (key, min) and (value, min). The CCF-Dedup job is done by distinct()
            emit1 = ccf_iterate_reduce.select('key', col('min_val').alias('value'))
            ccf_iterate_map = emit1.union(ccf_iterate_reduce.select(explode(col('value')).alias('value'), col('min_val')).filter(col('value') != col('min_val'))).distinct()
       # We update the iteration number   
        iteration += 1

    ccf_iterate_reduce.unpersist()
    return ccf_iterate_map, iteration 

#### Experiments

##### I) Toy Graph

In [0]:
start = time.time()
iterate_output, nb_iterations_1 = CCF(toy_graph, secondary_sorting=False)
timing_1 = time.time() - start
print(f'The total number of iterations is: {nb_iterations_1}')
print(f'Run-time: {timing_1:.2f} (sec) \n')

nb_cc = iterate_output.select('value').distinct().count()
nodes_largest = iterate_output.groupBy('value').count().sort(('count')).collect()[-1][1] + 1
print(f"There are {nb_cc} connected components in this graph")
print(f"There are {nodes_largest} nodes in the largest connected component")

In [0]:
start = time.time()
iterate_output, nb_iterations_2 = CCF(toy_graph, secondary_sorting=True)
timing_2 = time.time() - start
print(f'The total number of iterations is: {nb_iterations_2}')
print(f'Run-time: {timing_2:.2f} (sec) \n')

nb_cc = iterate_output.select('value').distinct().count()
nodes_largest = iterate_output.groupBy('value').count().sort(('count')).collect()[-1][1] + 1
print(f"There are {nb_cc} connected components in this graph")
print(f"There are {nodes_largest} nodes in the largest connected component")

##### II) Google Graph

In [0]:
start = time.time()
iterate_output, nb_iterations_3 = CCF(google, secondary_sorting=False)
timing_3 = time.time() - start
print(f'The total number of iterations is: {nb_iterations_3}')
print(f'Run-time: {timing_3:.2f} (sec) \n')

nb_cc = iterate_output.select('value').distinct().count()
nodes_largest = iterate_output.groupBy('value').count().sort(('count')).collect()[-1][1] + 1
print(f"There are {nb_cc} connected components in this graph")
print(f"There are {nodes_largest} nodes in the largest connected component")

In [0]:
start = time.time()
iterate_output, nb_iterations_4 = CCF(google, secondary_sorting=True)
timing_4 = time.time() - start
print(f'The total number of iterations is: {nb_iterations_4}')
print(f'Run-time: {timing_4:.2f} (sec) \n')

nb_cc = iterate_output.select('value').distinct().count()
nodes_largest = iterate_output.groupBy('value').count().sort(('count')).collect()[-1][1] + 1
print(f"There are {nb_cc} connected components in this graph")
print(f"There are {nodes_largest} nodes in the largest connected component")

#### Results

In [0]:
perf_toy = {'Number of iterations':  [nb_iterations_1, nb_iterations_2],
        'Run-time (sec)': [timing_1, timing_2]}
perf_toy = pd.DataFrame(perf_toy, columns=['Number of iterations', 'Run-time (sec)'], index=['CCF w/o sec. sorting', 'CCF w. sec. sorting'])
perf_toy

Unnamed: 0,Number of iterations,Run-time (sec)
CCF w/o sec. sorting,4,17.186287
CCF w. sec. sorting,4,10.4696


In [0]:
perf_google = {'Number of iterations':  [nb_iterations_3, nb_iterations_4], 'Run-time (sec)': [timing_3, timing_4]}
perf_google = pd.DataFrame(perf_google, columns=['Number of iterations', 'Run-time (sec)'], index=['CCF w/o sec. sorting', 'CCF w. sec. sorting'])
perf_google

Unnamed: 0,Number of iterations,Run-time (sec)
CCF w/o sec. sorting,8,394.455973
CCF w. sec. sorting,8,369.90501
