# Elaborated by Arij Bougda et Chayma Belgaied 

# BIG DATA Project: CCF

This code is the implementation of the CCF algorithm proposed in the article **CCF: Fast and Scalable Connected Component
Computation in MapReduce**.

We used the Google web graph dataset: http://snap.stanford.edu/data/web-Google.html


# About Data:
Nodes represent web pages and directed edges represent hyperlinks between them. The data was released in 2002 by Google as a part of Google Programming Contest.



# Import librairies

In [None]:
import pyspark
import time

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1612297847829_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Loading data

In [None]:
# Import graph file and cleaning input data
lines = sc.textFile("s3://aws-emr-resources-486249035020-us-east-1/web-Google.txt")
graph_rdd = lines.filter(lambda l: l[0] != "#").map(lambda x : (x.split("\t")[0],x.split("\t")[1]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Here is a sample of cleaned data
graph_rdd.collect()[:5]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('0', '11342'), ('0', '824020'), ('0', '867923'), ('0', '891835'), ('11342', '0')]

In [None]:
print(" The size of our graph is :{}".format(len(graph_rdd.collect())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 The size of our graph is :5105039

# CCF ALGORITHM IMPLEMENTATION

# 1.   CCF MAPPER

In [None]:
# CCF algorithm mapper
def mapper_ccf(input_data):
  """ CCF mapper function """

  output_mapper = input_data.flatMap(lambda x: (x,(x[1],x[0]))).\
                              groupByKey().map(lambda x : (x[0], list(x[1])))
  return output_mapper

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 2.    Reducer CCF-Iterate 

In [None]:
#Define CCF-Iterate Reduce phase
def reducer_ccf_iterate(output_mapper):
  """ reducer CCF-iterate function"""

  key = output_mapper[0]
  values = output_mapper[1]
  global Counter
  results = []
  min = key
  for value in values:
    if value < min:
      min = value
  if min < key:
    results.append((key,min))
    for value in values:
      if min != value:
        Counter.add(1)
        results.append((value,min))
  return results

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3. Reducer CCF-Iterate with sorting

In [None]:
def reducer_ccf_iterate_sorting(output_mapper):
  """ reducer CCF-iterate with sorting"""

  key = output_mapper[0]
  values = output_mapper[1]
  global Counter
  results = []
  min_value = min(values)
  if min_value < key:
    results.append((key, min_value))
    for value in values:
      if min_value != value:
        Counter.add(1)
        results.append((value, min_value))
  return results

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 4. CCF algorithm implementation

# 4.1.1. CCF-Iterate without Dedup

In [None]:
# CCF Algorithm execution
iteration_number = 1
Counter = sc.accumulator(1)
execution_time = 0
results = []
input_rdd = graph_rdd
while (Counter.value != 0):
  starting_iteration = time.time()
  print("Iteration number: {}".format(iteration_number))
  # initialize accumulator
  Counter = sc.accumulator(0)
  # Calling Mapper function
  mapper_output = mapper_ccf(input_rdd)
  # Calling Reducer function
  results = mapper_output.flatMap(reducer_ccf_iterate)
  results.collect()
  print("Number of pairs: {}".format(Counter.value))
  iteration_number += 1
  input_rdd = results
  ending_iteration = time.time()  
  print("Iteration execution time : {} \n".format(ending_iteration - starting_iteration))
  execution_time += (ending_iteration - starting_iteration)
print("Algorithm Total execution time : {}".format(execution_time))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Iteration number: 1
Number of pairs: 8546673
Iteration execution time : 42.16120171546936 

Iteration number: 2
Number of pairs: 23047041
Iteration execution time : 72.70266079902649 

Iteration number: 3
Number of pairs: 38079608
Iteration execution time : 97.62867593765259 

Iteration number: 4
Number of pairs: 62337433
Iteration execution time : 135.44587326049805 

Iteration number: 5
Number of pairs: 52186653
Iteration execution time : 182.4420657157898 

Iteration number: 6
Number of pairs: 13668685
Iteration execution time : 106.65830087661743 

Iteration number: 7
Number of pairs: 243337
Iteration execution time : 22.49075675010681 

Iteration number: 8
Number of pairs: 3112
Iteration execution time : 7.247816562652588 

Iteration number: 9
Number of pairs: 0
Iteration execution time : 6.804609298706055 

Algorithm Total execution time : 673.5819609165192

# 4.1.2. CCF-Iterate with Dedup

In [None]:
# CCF Algorithm execution
iteration_number = 1
Counter = sc.accumulator(1)
execution_time = 0
results = []
input_rdd = graph_rdd
while (Counter.value != 0):
  starting_iteration = time.time()
  print("Iteration number:".format(iteration_number))
  # initialize accumulator
  Counter = sc.accumulator(0)
  # Calling Mapper function
  mapper_output = mapper_ccf(input_rdd)
  # Calling Reducer function
  results = mapper_output.flatMap(reducer_ccf_iterate).distinct()
  results.collect()
  print("Number of pairs: {}".format(Counter.value))
  iteration_number += 1
  input_rdd = results
  ending_iteration = time.time()  
  print("Iteration execution time : {}\n".format(ending_iteration - starting_iteration))
  execution_time += (ending_iteration - starting_iteration)
print("Algorithm Total execution time : {}".format(execution_time))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Iteration number:
Number of pairs: 8546673
Iteration execution time : 126.28204917907715

Iteration number:
Number of pairs: 4774134
Iteration execution time : 69.4680233001709

Iteration number:
Number of pairs: 3235857
Iteration execution time : 45.77058553695679

Iteration number:
Number of pairs: 3852842
Iteration execution time : 52.91629958152771

Iteration number:
Number of pairs: 2014247
Iteration execution time : 17.922995805740356

Iteration number:
Number of pairs: 94614
Iteration execution time : 11.052850008010864

Iteration number:
Number of pairs: 1548
Iteration execution time : 10.35062551498413

Iteration number:
Number of pairs: 0
Iteration execution time : 10.151488780975342

Algorithm Total execution time : 343.91491770744324

# 4.2.1. CCF-Iterate with sorting without Dedup

In [None]:
# CCF Algorithm execution
iteration_number = 1
Counter = sc.accumulator(1)
execution_time = 0
results = []
input_rdd = graph_rdd
while (Counter.value != 0):
  starting_iteration = time.time()
  print("Iteration number: {}".format(iteration_number))
  # initialize accumulator
  Counter = sc.accumulator(0)
  # Calling Mapper function
  mapper_output = mapper_ccf(input_rdd)
  # Calling Reducer function
  results = mapper_output.flatMap(reducer_ccf_iterate_sorting)
  results.collect()
  print("Number of pairs: {}".format(Counter.value))
  iteration_number += 1
  input_rdd = results
  ending_iteration = time.time()  
  print("Iteration execution time : {}\n".format(ending_iteration - starting_iteration))
  execution_time += (ending_iteration - starting_iteration)
print("Algorithm Total execution time : {}".format(execution_time))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Iteration number: 1
Number of pairs: 8546673
Iteration execution time : 53.22439980506897

Iteration number: 2
Number of pairs: 23047041
Iteration execution time : 81.84360551834106

Iteration number: 3
Number of pairs: 38079608
Iteration execution time : 98.78018689155579

Iteration number: 4
Number of pairs: 62337433
Iteration execution time : 130.9786307811737

Iteration number: 5
Number of pairs: 52186653
Iteration execution time : 205.7621796131134

Iteration number: 6
Number of pairs: 13668685
Iteration execution time : 108.45494794845581

Iteration number: 7
Number of pairs: 243337
Iteration execution time : 22.112065315246582

Iteration number: 8
Number of pairs: 3112
Iteration execution time : 7.425825357437134

Iteration number: 9
Number of pairs: 0
Iteration execution time : 6.957655906677246

Algorithm Total execution time : 715.5394971370697

# 4.2.2. CCF-Iterate with sorting with Dedup

In [None]:
# CCF Algorithm execution
iteration_number = 1
Counter = sc.accumulator(1)
execution_time = 0
results = []
input_rdd = graph_rdd
while (Counter.value != 0):
  starting_iteration = time.time()
  print("Iteration number: {} \n".format(iteration_number))
  # initialize accumulator
  Counter = sc.accumulator(0)
  # Calling Mapper function
  mapper_output = mapper_ccf(input_rdd)
  # Calling Reducer function
  results = mapper_output.flatMap(reducer_ccf_iterate_sorting).distinct()
  results.collect()
  print("Number of pairs: {}".format(Counter.value))
  iteration_number += 1
  input_rdd = results
  ending_iteration = time.time()  
  print("Iteration execution time : {}".format(ending_iteration - starting_iteration))
  execution_time += (ending_iteration - starting_iteration)
print("Algorithm Total execution time : {}".format(execution_time))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Iteration number: 1 

Number of pairs: 8546673
Iteration execution time : 76.22124814987183
Iteration number: 2 

Number of pairs: 4774134
Iteration execution time : 56.383868932724
Iteration number: 3 

Number of pairs: 3235857
Iteration execution time : 26.637752056121826
Iteration number: 4 

Number of pairs: 3852842
Iteration execution time : 25.20892596244812
Iteration number: 5 

Number of pairs: 2014247
Iteration execution time : 17.705175161361694
Iteration number: 6 

Number of pairs: 94614
Iteration execution time : 11.350874662399292
Iteration number: 7 

Number of pairs: 1548
Iteration execution time : 10.56390380859375
Iteration number: 8 

Number of pairs: 0
Iteration execution time : 10.332489967346191
Algorithm Total execution time : 234.4042387008667

# 5. Conclusion

Algorithm | number of iteration | execution time
--- | --- | ---
CCF-Iterate without Dedup | 9| 673.58
CCF-Iterate with Dedup | 8| 343.91
CCF-Iterate with sorting without Dedup | 9| 715.53
CCF-Iterate with sorting with Dedup | 8| 234.40