In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, DoubleType,IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import array, lit,udf
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
import time

import warnings
from pyspark.sql.functions import col
import pandas as pd


## MiniBatch K-means Algorithm

### Mean cost function

In [2]:
import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

def cost(data, centroids):
    
    
    feature_array = data.collect()
    n_samples = len(feature_array)
    n_clusters = len(centroids)

    
    distances = np.zeros(n_samples)
    for i in range(n_samples):
        point = feature_array[i]
        squared_distances = np.array([Vectors.squared_distance(Vectors.dense(point), Vectors.dense(centroid)) for centroid in centroids])
        min_distance = np.min(squared_distances)
        distances[i] = min_distance

    
    total_cost = np.sum(distances)

    return total_cost / n_samples



### Compute Distances and Find Nearest Cluster

In [3]:
def compute_distance(point, centroids):
    return np.array([np.linalg.norm(np.array(point) - np.array(c)) for c in centroids])


def find_nearest_cluster(x):
    idx, distances = x
    return idx, np.argmin(distances)

### MiniBatch K-means

In [2]:
def mini_kmeans_rdd(X, sc, num_clusters=5, iteration=10, batch_size=200, n_partitions=10):
    import numpy as np
    import time

    k = num_clusters
    b = batch_size
    t = iteration
    N = X.count()  
    C = X.sample(False, k/N).collect()  
    C = [list(row) for row in C]  
    v = np.zeros(len(C))
    times = []
    mse = []

    for i in range(t):
        start = time.time()
        # Estrai un mini-batch in modo casuale
        mini_batch = X.sample(False, b/N).collect()
        mini_batch = [list(row) for row in mini_batch]

        if n_partitions is not None:
            dist_mini_batch = sc.parallelize(mini_batch, n_partitions)
        else:
            dist_mini_batch = sc.parallelize(mini_batch)

        distances_rdd = dist_mini_batch.map(lambda x: compute_distance(x, C))

        # Mean squared error
        mse_value = distances_rdd.map(lambda x: min(x)).reduce(lambda x, y: x + y) / (len(mini_batch) * len(C))

        mse.append(mse_value)

        nearest_cluster_indices = distances_rdd.map(lambda x: np.argmin(x)).collect()

        
        for idx, cluster_idx in enumerate(nearest_cluster_indices):
            v[cluster_idx] += 1
            learning_rate = 1 / v[cluster_idx]
            C[cluster_idx] = (1 - learning_rate) * np.array(C[cluster_idx]) + learning_rate * np.array(mini_batch[idx])

        end = time.time()
        times.append(end - start)

    return C, times, mse


# Data
* Data has been provided from scikit learn. The choosen subset is target. Each sample has a value of 1 in its categories, and 0 in others. The array has 3.15% of non zero values.
* In this specific case Pandas Data Frame has been converted in Spark Data Frame and in a second moment in Spark RDD, in order to observe the different behaviour with Spark Data Frame.
* Dataset dimensions are 3000 rows and 103 columns, this was the maximum allowed size from spark in order to run the algorithm with only one core. This allows to do a consistent comparison at the end of the process.

In [5]:
from sklearn.datasets import fetch_rcv1

rcv1 = fetch_rcv1()

In [6]:
target_df = pd.DataFrame(rcv1.target[0:3000].toarray(), columns=rcv1.target_names)
target_df


Unnamed: 0,C11,C12,C13,C14,C15,C151,C1511,C152,C16,C17,...,M11,M12,M13,M131,M132,M14,M141,M142,M143,MCAT
0,0,0,0,0,0,0,0,0,0,0,...,1,1,0,0,0,0,0,0,0,1
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,0,0,0,0,1,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,0,0,0,0,1,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2995,0,0,0,0,0,0,0,0,0,1,...,0,0,0,0,0,0,0,0,0,0
2996,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2997,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2998,0,0,0,0,0,0,0,0,0,0,...,0,0,1,0,1,0,0,0,0,1


# Spark Configuration
The cluster was composed by three virtual machines provided by CloudVeneto. The machines used for this project had the following charachteristics:

    | VM | IP Address | Memory  | RAM | Number of Cores |
    |----|------------|---------|-----|-----------------|-------|
    | VM2|10.67.22.233| 25 GB   | 8GB | 4               |Master |
    | VM1|10.67.22.219| 25 GB   | 8GB | 4               |Slave01|
    | VM3|10.67.22.157| 25 GB   | 8GB | 4               |Slave02|
    
* Spark configuration in order to select how many cores are effectively used to do the calculations is `.config("spark.cores.max","1")` because using `config("spark.executors.cores","1")` the application setted 1 core for the slave01 and 1 core for slave02, which prevents testing on the desired number of core.
* Memory is set for 6GB because was the maximum memory that application allowed to run the cluster.



## Experiments

* The following cells are varying  the number of partition in order to test the time execution and mean cost function of the algorithm.
* For each number of cores a spark configuration has been initialized and the other parameter remained fixed.
* In the experiments each procedure is repeated 3 times in order to estimate the mean value and stadard deviation for each quantity to measure.
* A the end of the notebook it has been run a measure of mean squared error between clusterized points and centroids in order to compare them with other methods.

In [10]:
part=np.array([1,2,4,6,8,10,12,14,16,18,20,22,34,36,38,64])

### 1 core

In [11]:
sc.stop()

In [12]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max", "1")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext
# print its status
sc



In [13]:
# Crea un DataFrame Spark dalla colonna "target"
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [18]:
ex_time_1 = []
err_1 = []
std_time_1 = []
std_cost_1 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_1.append(np.mean(repeat_times))
    err_1.append(np.mean(repeat_cost))
    std_time_1.append(np.std(repeat_times))
    std_cost_1.append(np.std(repeat_cost))


Exception in thread "serve RDD 1409" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
                                                                                

In [19]:
data_1 = {
    'Partition': part,
    'Execution time (s)': ex_time_1,
    'Time standard deviation (s)':std_time_1,
    'Mean Cost Function':err_1,
    'Mean Cost Function standard deviation':std_cost_1,
    'Number of cores': '1'
}

results_df_1=pd.DataFrame(data_1)

results_df_1.to_csv('results_rdd_spark_1.csv', index=False)

results_df_1

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,3.968951,0.009834,2.29764,0.212432,1
1,2,5.158405,0.065499,1.914712,0.285047,1
2,4,7.322456,0.027262,1.752413,0.097835,1
3,6,9.333595,0.061498,1.860435,0.164515,1
4,8,11.356936,0.045722,2.04248,0.086429,1
5,10,13.497903,0.12833,2.036022,0.165832,1
6,12,15.50274,0.047469,2.125599,0.02696,1
7,14,17.664972,0.094206,2.028448,0.187076,1
8,16,19.462629,0.057475,2.243635,0.066452,1
9,18,21.57675,0.054866,2.245855,0.342164,1


### 2 cores

In [20]:
sc.stop()

In [21]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","2")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [22]:
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [23]:
ex_time_2 = []
err_2 = []
std_time_2 = []
std_cost_2 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_2.append(np.mean(repeat_times))
    err_2.append(np.mean(repeat_cost))
    std_time_2.append(np.std(repeat_times))
    std_cost_2.append(np.std(repeat_cost))



                                                                                

In [24]:
data_2 = {
    'Partition': part,
    'Execution time (s)': ex_time_2,
    'Time standard deviation (s)':std_time_2,
    'Mean Cost Function':err_2,
    'Mean Cost Function standard deviation':std_cost_2,
    'Number of cores': '2'
}

results_df_2=pd.DataFrame(data_2)

results_df_2.to_csv('results_rdd_spark_2.csv', index=False)

results_df_2

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,4.854596,1.362113,1.744241,0.42032,2
1,2,3.378118,0.071651,1.749511,0.019418,2
2,4,4.750339,0.066744,1.737017,0.123622,2
3,6,5.73759,0.101321,2.176912,0.344583,2
4,8,6.94928,0.156274,1.664359,0.156743,2
5,10,8.09171,0.020831,1.950074,0.32648,2
6,12,9.036916,0.034054,1.862069,0.177978,2
7,14,10.021469,0.180931,2.037904,0.170082,2
8,16,10.877542,0.029506,1.776663,0.196678,2
9,18,12.230358,0.22002,2.045437,0.525545,2


### 3 cores

In [25]:
sc.stop()

In [26]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","3")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [27]:
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [29]:
ex_time_3 = []
err_3 = []
std_time_3 = []
std_cost_3 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_3.append(np.mean(repeat_times))
    err_3.append(np.mean(repeat_cost))
    std_time_3.append(np.std(repeat_times))
    std_cost_3.append(np.std(repeat_cost))

                                                                                

In [30]:
data_3 = {
    'Partition': part,
    'Execution time (s)': ex_time_3,
    'Time standard deviation (s)':std_time_3,
    'Mean Cost Function':err_3,
    'Mean Cost Function standard deviation':std_cost_3,
    'Number of cores': '3'
}

results_df_3=pd.DataFrame(data_3)

results_df_3.to_csv('results_rdd_spark_3.csv', index=False)

results_df_3

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,18.128933,2.292384,2.140258,0.295782,3
1,2,9.936727,9.212011,1.771426,0.050031,3
2,4,4.323851,0.075289,1.791046,0.051741,3
3,6,4.35069,0.013235,1.865241,0.253665,3
4,8,5.35447,0.031024,1.882225,0.006604,3
5,10,6.311841,0.027993,2.372213,0.265984,3
6,12,6.482027,0.071749,1.881149,0.118846,3
7,14,7.410503,0.032315,2.021557,0.107376,3
8,16,8.41033,0.061832,1.79888,0.164869,3
9,18,8.532001,0.038169,2.052637,0.231297,3


### 4 cores

In [31]:
sc.stop()

In [32]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","4")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [33]:
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [34]:
ex_time_4 = []
err_4 = []
std_time_4 = []
std_cost_4 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_4.append(np.mean(repeat_times))
    err_4.append(np.mean(repeat_cost))
    std_time_4.append(np.std(repeat_times))
    std_cost_4.append(np.std(repeat_cost))

                                                                                

In [35]:
data_4 = {
    'Partition': part,
    'Execution time (s)': ex_time_4,
    'Time standard deviation (s)':std_time_4,
    'Mean Cost Function':err_4,
    'Mean Cost Function standard deviation':std_cost_4,
    'Number of cores': '4'
}

results_df_4=pd.DataFrame(data_4)

results_df_4.to_csv('results_rdd_spark_4.csv', index=False)

results_df_4

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,4.974789,1.848996,2.273604,0.323665,4
1,2,3.432355,0.115825,2.070156,0.227939,4
2,4,3.458523,0.02889,1.896625,0.084891,4
3,6,4.398292,0.096996,2.154705,0.060664,4
4,8,4.469412,0.027575,1.745797,0.199918,4
5,10,5.493494,0.037873,1.913843,0.176985,4
6,12,5.723449,0.124145,1.898103,0.33844,4
7,14,6.507785,0.091175,1.675647,0.130902,4
8,16,6.696549,0.073888,2.178095,0.072191,4
9,18,7.594627,0.048088,1.732944,0.328493,4


### 5 cores

In [36]:
sc.stop()

In [37]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","5")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [38]:
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [40]:
ex_time_5 = []
err_5 = []
std_time_5 = []
std_cost_5 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_5.append(np.mean(repeat_times))
    err_5.append(np.mean(repeat_cost))
    std_time_5.append(np.std(repeat_times))
    std_cost_5.append(np.std(repeat_cost))

                                                                                

In [41]:
data_5 = {
    'Partition': part,
    'Execution time (s)': ex_time_5,
    'Time standard deviation (s)':std_time_5,
    'Mean Cost Function':err_5,
    'Mean Cost Function standard deviation':std_cost_5,
    'Number of cores': '5'
}

results_df_5=pd.DataFrame(data_5)

results_df_5.to_csv('results_rdd_spark_5.csv', index=False)

results_df_5

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,3.444424,0.071923,1.843765,0.202721,5
1,2,3.337472,0.008492,1.936714,0.12939,5
2,4,3.450104,0.050381,2.036209,0.074255,5
3,6,4.365196,0.073175,2.152772,0.279102,5
4,8,4.384993,0.061585,1.803554,0.095482,5
5,10,4.573216,0.069316,2.22522,0.188977,5
6,12,5.422552,0.060621,1.955768,0.280713,5
7,14,5.575207,0.121265,1.981041,0.390038,5
8,16,6.312083,0.016792,2.060136,0.474181,5
9,18,6.466928,0.040265,2.356809,0.250717,5


### 6 cores

In [42]:
sc.stop()

In [43]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","6")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [44]:
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [45]:
ex_time_6 = []
err_6 = []
std_time_6 = []
std_cost_6 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_6.append(np.mean(repeat_times))
    err_6.append(np.mean(repeat_cost))
    std_time_6.append(np.std(repeat_times))
    std_cost_6.append(np.std(repeat_cost))

                                                                                

In [46]:
data_6 = {
    'Partition': part,
    'Execution time (s)': ex_time_6,
    'Time standard deviation (s)':std_time_6,
    'Mean Cost Function':err_6,
    'Mean Cost Function standard deviation':std_cost_6,
    'Number of cores': '6'
}

results_df_6=pd.DataFrame(data_6)

results_df_6.to_csv('results_rdd_spark_6.csv', index=False)

results_df_6

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,5.07207,1.720987,1.823038,0.095317,6
1,2,3.598853,0.228669,1.739748,0.244682,6
2,4,3.379048,0.013575,1.994075,0.269203,6
3,6,3.645153,0.049075,2.065892,0.404474,6
4,8,4.525915,0.091086,2.044547,0.229675,6
5,10,4.709007,0.020547,1.997559,0.198866,6
6,12,4.955681,0.097202,1.921096,0.149821,6
7,14,5.742763,0.054013,1.689867,0.167046,6
8,16,5.837466,0.124019,1.996693,0.041662,6
9,18,6.100311,0.084692,1.819226,0.077747,6


### 7 cores

In [47]:
sc.stop()

In [48]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","7")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [49]:
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [50]:
ex_time_7 = []
err_7 = []
std_time_7 = []
std_cost_7 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_7.append(np.mean(repeat_times))
    err_7.append(np.mean(repeat_cost))
    std_time_7.append(np.std(repeat_times))
    std_cost_7.append(np.std(repeat_cost))

                                                                                

In [51]:
data_7 = {
    'Partition': part,
    'Execution time (s)': ex_time_7,
    'Time standard deviation (s)':std_time_7,
    'Mean Cost Function':err_7,
    'Mean Cost Function standard deviation':std_cost_7,
    'Number of cores': '7'
}

results_df_7=pd.DataFrame(data_7)

results_df_7.to_csv('results_rdd_spark_7.csv', index=False)

results_df_7

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,4.934366,1.866268,2.176206,0.093026,7
1,2,3.420053,0.152207,2.277235,0.346076,7
2,4,3.505914,0.086506,1.964918,0.288276,7
3,6,3.660648,0.036763,1.951775,0.116537,7
4,8,4.392998,0.038552,1.850335,0.122519,7
5,10,4.508564,0.00539,2.143572,0.168422,7
6,12,4.667114,0.06882,1.843009,0.066809,7
7,14,4.958194,0.137484,1.999468,0.224502,7
8,16,5.601955,0.045235,1.788068,0.335417,7
9,18,5.90809,0.077897,2.116047,0.24366,7


### 8 cores

In [52]:
sc.stop()

In [53]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","8")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [54]:
# Crea un DataFrame Spark dalla colonna "target"
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [55]:
ex_time_8 = []
err_8 = []
std_time_8 = []
std_cost_8 = []
num_repeats = 3 

for i in part:
    repeat_times = []
    repeat_cost = []
    
    for _ in range(num_repeats):
        start = time.time()
        C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=i, num_clusters=5)
        end = time.time()
        repeat_times.append(end - start)
        repeat_cost.append(cost(rcv1_rdd, C_k))
    
    ex_time_8.append(np.mean(repeat_times))
    err_8.append(np.mean(repeat_cost))
    std_time_8.append(np.std(repeat_times))
    std_cost_8.append(np.std(repeat_cost))

                                                                                

In [56]:
data_8 = {
    'Partition': part,
    'Execution time (s)': ex_time_8,
    'Time standard deviation (s)':std_time_8,
    'Mean Cost Function':err_8,
    'Mean Cost Function standard deviation':std_cost_8,
    'Number of cores': '8'
}

results_df_8=pd.DataFrame(data_8)

results_df_8.to_csv('results_rdd_spark_8.csv', index=False)

results_df_8

Unnamed: 0,Partition,Execution time (s),Time standard deviation (s),Mean Cost Function,Mean Cost Function standard deviation,Number of cores
0,1,5.201589,1.896728,1.972666,0.305071,8
1,2,4.248449,0.891695,1.858444,0.239561,8
2,4,3.502304,0.05291,1.870112,0.249239,8
3,6,3.776816,0.066824,2.204077,0.35457,8
4,8,4.24135,0.015684,1.960088,0.047356,8
5,10,4.727524,0.119167,1.876791,0.172093,8
6,12,4.740318,0.060575,2.137103,0.353982,8
7,14,4.968492,0.079355,1.86734,0.210254,8
8,16,5.259715,0.068946,2.0863,0.175821,8
9,18,5.769415,0.058253,2.170218,0.110494,8


### Last Experiment
Choosing 8 cores and 4 partitions as the quantity that minimizes time execution, results od mean squared error has been registered in order to compare them with other mothods.

In [9]:
sc.stop()

In [10]:
spark = SparkSession.builder \
    .appName("Mini Batch K-means")\
    .config("spark.cores.max","8")\
    .config("spark.executor.memory", "6g")\
    .config("spark.sql.debug.maxToStringFields", 100)\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

sc

In [11]:
rcv1_df = spark.createDataFrame(target_df)
rcv1_rdd= rcv1_df.rdd

In [15]:
mse_rdd=[]
num_repeats=3
for _ in range(num_repeats):
    C_k, timeS, mse = mini_kmeans_rdd(rcv1_rdd, sc, n_partitions=4, num_clusters=5)
    mse_rdd.append(mse)


In [16]:
mse_rdd_mean = [np.mean([mse_rdd[i][j] for i in range(3)]) for j in range(10)]
mse_rdd_std=[np.std([mse_rdd[i][j] for i in range(3)]) for j in range(10)]
print(mse_rdd_mean)
print(mse_rdd_std)

[0.27587418304512584, 0.21817708262713972, 0.2170829222586129, 0.21524588222911722, 0.2148754911779633, 0.2182385324958408, 0.21492520556602437, 0.20443840563439072, 0.2126499111734249, 0.2078996185796722]
[0.09103500070504006, 0.04856002399090181, 0.05026793643060061, 0.049969932201146594, 0.05141911087026129, 0.04962057216279861, 0.04939599949240323, 0.04055946240878119, 0.047502208296862164, 0.04204955843634416]


In [17]:
sc.stop()