In [1]:
from pyspark import SparkContext, SparkConf
import random
import time
import numpy as np
import networkx as nx
from hard_test import parallel_bronKerbosch

### Pi calculation

In [2]:
def is_point_inside_unit_circle(_):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

In [3]:
def main_local(num_samples):
    count = 0
    for i in range(num_samples):
        if is_point_inside_unit_circle(i):
            count = count + 1

    pi_estimate = 4 * count / num_samples
    print("Approximate value of Pi is:", pi_estimate)

In [4]:
def main_cluster(num_samples, sc):
    
    # Parallelize the process
    count = sc.parallelize(range(0, num_samples)) \
              .filter(is_point_inside_unit_circle) \
              .count()

    # Calculate and print the approximate value of Pi
    pi_estimate = 4 * count / num_samples
    print("Approximate value of Pi is:", pi_estimate)

    sc.stop()

In [5]:
num_samples = int(1e9)
conf = SparkConf().setAppName("Benchmark")
sc = SparkContext(conf=conf)

23/12/03 19:39:15 WARN Utils: Your hostname, coartix-ubuntu resolves to a loopback address: 127.0.1.1; using 10.41.175.200 instead (on interface wlp0s20f3)
23/12/03 19:39:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/03 19:39:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# benchmark local
start_time = time.time()
main_local(num_samples)
end_time = time.time()
print("Time elapsed: ", end_time - start_time)

Approximate value of Pi is: 3.141511264
Time elapsed:  244.77220606803894


In [7]:
# benchmark spark local
start_time = time.time()
main_cluster(num_samples, sc)
end_time = time.time()
print("Time elapsed: ", end_time - start_time)

                                                                                

Approximate value of Pi is: 3.141539956
Time elapsed:  98.69377899169922


In [8]:
# benchmark spark cluster
!/opt/spark/bin/spark-submit --master spark://10.41.175.200:7077 --deploy-mode client --num-executors 4 --executor-cores 4 simple_test.py > /dev/null 2>&1

In [9]:
with open('spark_app.log', 'r') as log_file:
    log_content = log_file.read()
    print(log_content)

INFO:root:Approximate value of Pi is: 3.14165072
INFO:root:Time elapsed: 15.578842401504517
INFO:py4j.clientserver:Closing down clientserver connection
INFO:root:Approximate value of Pi is: 3.141582504
INFO:root:Time elapsed: 124.8446033000946
INFO:py4j.clientserver:Closing down clientserver connection



### BronKerbosch algorithm

In [6]:
def bronKerbosch1(R, P, X, G, res = []):
    """Algorithme de Bron-Kerbosch pour trouver les cliques d'un graphe."""
    if len(P) == 0 and len(X) == 0:
        res.append(R)
    for v in P.copy():
        bronKerbosch1(R.union({v}), P.intersection(set(G.neighbors(v))), X.intersection(set(G.neighbors(v))), G, res=res)
        P.remove(v)
        X.add(v)
    return res

def bronKerbosch2(R, P, X, G):
    """Algorithme de Bron-Kerbosch pivot pour trouver les cliques d'un graphe."""
    if len(P) == 0 and len(X) == 0:
        print(R)
    else:
        u = P.union(X).pop()
        for v in P.difference(set(G.neighbors(u))):
            bronKerbosch2(R.union({v}), P.intersection(set(G.neighbors(v))), X.intersection(set(G.neighbors(v))), G)
            P.remove(v)
            X.add(v)

def bronKerbosch3(G):
    """Algorithme de Bron-Kerbosch pivot et dégénérescence pour trouver les cliques d'un graphe."""
    P = set(G.nodes())
    X = set()
    R = set()
    for v in sorted(G.nodes(), key=lambda v: G.degree(v), reverse=True):
        bronKerbosch2(R.union({v}), P.intersection(set(G.neighbors(v))), X.intersection(set(G.neighbors(v))), G)
        P.remove(v)
        X.add(v)

In [7]:
G1 = nx.gnp_random_graph(100, 0.5, directed=False)

# benchmark local
R = set()
P = set(G1.nodes())
X = set()
start = time.time()
bronKerbosch1(R, P, X, G1)
end = time.time()
print("Time elapsed:", end - start)

Time elapsed: 0.8703324794769287


In [8]:
# benchmark spark local
start = time.time()
parallel_bronKerbosch(sc, G1)
end = time.time()
print("Time elapsed:", end - start)

                                                                                

Time elapsed: 19.248689651489258


In [9]:
# benchmark spark cluster
!/opt/spark/bin/spark-submit --master spark://10.41.175.200:7077 --deploy-mode client --num-executors 4 --executor-cores 4 hard_test.py > /dev/null 2>&1

In [10]:
with open('spark_app.log', 'r') as log_file:
    log_content = log_file.read()
    print(log_content)

INFO:root:Results: {2, 98, 44, 19, 86, 57, 58, 93, 30}
INFO:root:Time elapsed: 40.636481523513794



### Analysis

![Task example on cluster](./cluster_task.png)  
Pi calcultation task on cluster.  

Our cluster is composed of 2 workers.  

We can see that on some tasks using PySpark is faster (e.g. Pi calculation), but on others like bronKerbosch using the original implementation is way faster.  


In any case our cluster is very slow to compute solutions. We noticed that it wasn't taking full potential of our 2 workers. And there may be an issue inside our code explaining these results (e.g. we may be using the wrong functions to parallelize our code).  

We also noticed that the time to compute a solution is not linear with the number of workers. We think that this is due to the fact that we are using a shared memory.

### Conclusion

Our cluster should be expanded with more workers as we were not able to do so. The cluster should be faster to compute these solutions so we'll have to take a look at our implementation.