### Import necessarie

In [None]:
!pip install pyspark

In [2]:
import pyspark
import numpy as np
from pyspark.sql import SparkSession

### Inizializzare la sessione di PySpark

In [3]:
spark = SparkSession.builder.appName('kmeans').getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/08 18:48:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/08 18:48:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
sc = spark.sparkContext
sc

### Caricare il dataset (ristretto)

In [20]:
df_prova = spark.read.csv('PirncipalDf.csv', header=True, inferSchema=True)#.limit(100000)
df_prova = df_prova.select(['principal component 1', 'principal component 2'])
df_prova = df_prova.withColumnRenamed('principal component 1', 'PC1')
df_prova = df_prova.withColumnRenamed('principal component 2', 'PC2')
df_prova.printSchema()
df_prova.show(5)
df_prova.describe().show()

root
 |-- PC1: double (nullable = true)
 |-- PC2: double (nullable = true)

+-------------------+--------------------+
|                PC1|                 PC2|
+-------------------+--------------------+
|-0.7122313963840295|-0.27939170644584194|
| 12.984452034038485| -0.2386403389332261|
|-0.5401379856491022| -0.3221764947636554|
| 0.3009650926574159|   1.147140349844949|
| 0.0754529923373697| -0.2332551171850431|
+-------------------+--------------------+
only showing top 5 rows

+-------+--------------------+--------------------+
|summary|                 PC1|                 PC2|
+-------+--------------------+--------------------+
|  count|              839081|              839081|
|   mean|1.246505292145654...|-1.28376496120435...|
| stddev|   2.874785048573882|  1.4056057672352482|
|    min| -0.7407601156132129|  -33.28030770177722|
|    max|   693.0086959499822|  202.35776635060017|
+-------+--------------------+--------------------+



### Funzione per il mapping

In [6]:
def assign_cluster(x, centroids):
    distances = []
    for centroid in centroids:
        distances.append(np.linalg.norm(np.array(x) - np.array(centroid)))
    return (np.argmin(distances), np.array(x))

### Funzione per il reducing

In [7]:
def new_centroid(old_cluster):
    out = sum(old_cluster) / len(old_cluster)
    return out

### Funzione per la stopping condition

In [8]:
def compute_loss(y, centroid):
    loss = np.linalg.norm(np.array(y[1])-np.array(centroid[y[0]]))
    return loss

### Kmeans

In [21]:
def kmeansPS(X, k):
    # maximum number of iterations allowed
    max_iter = 100
    # if the loss is less than epsilon we arrive at convergence
    eps = 0.1
    # loss_history initialization
    loss_history = []
    # randomly chosing the initial centroids
    centroids = X.rdd.takeSample(False, k)
    # repeating the mapping and reducing until convergence or until reaching the max_iter
    for iteration in range(max_iter):
        # creating the cluster
        cluster = X.rdd.map(lambda x: assign_cluster(x, centroids)).collect()
        # creating rdd object
        cluster_rdd = sc.parallelize(cluster)
        # computing the new centroids
        centroids = cluster_rdd.groupByKey().mapValues(lambda x: new_centroid(x)).map(lambda a: a[1]).collect()
        # evaluating the distances of every element from their centroid
        loss = cluster_rdd.map(lambda x: (x[0], compute_loss(x, centroids))).groupByKey().mapValues(sum).values().collect()
        # summing all the distances in order the use that value as loss
        loss_history.append(np.sum(loss))
        # condition for convergence
        if iteration > 0:
            if loss_history[iteration-1]-loss_history[iteration] < eps:
                break
    return cluster_rdd.keys().collect(), iteration, loss_history[:-1]

In [22]:
labels, iteration, loss_history = kmeansPS(df_prova, 2)

                                                                                

22/12/08 18:51:35 WARN TaskSetManager: Stage 91 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:51:38 WARN TaskSetManager: Stage 93 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:52:04 WARN TaskSetManager: Stage 96 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:52:08 WARN TaskSetManager: Stage 98 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:52:32 WARN TaskSetManager: Stage 101 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:52:37 WARN TaskSetManager: Stage 103 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:53:01 WARN TaskSetManager: Stage 106 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:53:05 WARN TaskSetManager: Stage 108 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:53:30 WARN TaskSetManager: Stage 111 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:53:34 WARN TaskSetManager: Stage 113 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/08 18:53:38 WARN TaskSetManager: Stage 115 contains a task of very large size (6351 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [23]:
loss_history

[746901.4620140418, 630024.8711262993, 587022.5032690049, 580227.7570619785]

In [24]:
iteration

4

In [25]:
labels

[0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
