In [None]:
# K-medias

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("T-systems: sexta sesion. K-means") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
df = spark.read.csv("C:\\Users\\ruben\\Desktop\\data engineering\\s5-pysparkML\\results.csv", header = True)

In [4]:
import pyspark.sql.functions as sql_f

In [5]:
df = df.select([sql_f.col(c).cast('float') for c in ['Ex01','Ex02','Ex03','Ex04','Project']]).cache()

In [6]:
df.show(5)

+-----+-----+-----+----+-------+
| Ex01| Ex02| Ex03|Ex04|Project|
+-----+-----+-----+----+-------+
|100.0| 85.0| 80.0|70.0|   80.0|
|100.0| 85.0| 80.0|90.0|   93.0|
|100.0|100.0| 85.0|30.0|   70.0|
| 95.0| 95.0|100.0|55.0|   87.0|
| 65.0| 95.0| 65.0|25.0|   70.0|
+-----+-----+-----+----+-------+
only showing top 5 rows



In [7]:
features_col = df.columns

In [9]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = features_col, outputCol = "features")
df = assembler.transform(df)

In [11]:
df.show(4)

+-----+-----+-----+----+-------+--------------------+
| Ex01| Ex02| Ex03|Ex04|Project|            features|
+-----+-----+-----+----+-------+--------------------+
|100.0| 85.0| 80.0|70.0|   80.0|[100.0,85.0,80.0,...|
|100.0| 85.0| 80.0|90.0|   93.0|[100.0,85.0,80.0,...|
|100.0|100.0| 85.0|30.0|   70.0|[100.0,100.0,85.0...|
| 95.0| 95.0|100.0|55.0|   87.0|[95.0,95.0,100.0,...|
+-----+-----+-----+----+-------+--------------------+
only showing top 4 rows



In [13]:
df = df.select("features")
df.show(5)

+--------------------+
|            features|
+--------------------+
|[100.0,85.0,80.0,...|
|[100.0,85.0,80.0,...|
|[100.0,100.0,85.0...|
|[95.0,95.0,100.0,...|
|[65.0,95.0,65.0,2...|
+--------------------+
only showing top 5 rows



In [24]:
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol = "features", outputCol = "scaled_features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

In [25]:
df.show(5, truncate = False)

+---------------------------------------------------------+---------------------------------------------------------+
|features                                                 |scaled_features                                          |
+---------------------------------------------------------+---------------------------------------------------------+
|[1.0,0.625,0.6363636363636364,0.7000000000000001,0.8]    |[1.0,0.625,0.6363636363636364,0.7000000000000001,0.8]    |
|[1.0,0.625,0.6363636363636364,0.9,0.93]                  |[1.0,0.625,0.6363636363636364,0.9,0.93]                  |
|[1.0,1.0,0.7272727272727273,0.3,0.7000000000000001]      |[1.0,1.0,0.7272727272727273,0.3,0.7000000000000001]      |
|[0.875,0.875,1.0,0.55,0.87]                              |[0.875,0.875,1.0,0.55,0.87]                              |
|[0.125,0.875,0.36363636363636365,0.25,0.7000000000000001]|[0.125,0.875,0.36363636363636365,0.25,0.7000000000000001]|
+-------------------------------------------------------

In [26]:
df = df.select(df["scaled_features"].alias("features")).cache()

In [27]:
df.show(5)

+--------------------+
|            features|
+--------------------+
|[1.0,0.625,0.6363...|
|[1.0,0.625,0.6363...|
|[1.0,1.0,0.727272...|
|[0.875,0.875,1.0,...|
|[0.125,0.875,0.36...|
+--------------------+
only showing top 5 rows



In [None]:
k = 2

In [35]:
centroids = df.sample(fraction = 0.3, seed = 123456789).limit(k).collect()

In [39]:
import numpy as np
centroids = np.array(centroids).squeeze()

In [41]:
def dist(a,b, axis=1):
    return np.sqrt(np.sum((a-b)**2, axis = axis))

In [42]:
centroids_bc = sc.broadcast(centroids)

In [43]:
centroids_bc.value

array([[1.        , 0.625     , 0.63636364, 0.7       , 0.8       ],
       [0.75      , 0.375     , 1.        , 0.9       , 0.74      ]])

In [None]:
## calcular centroide mas cercano

In [44]:
closest_centroid = sql_f.udf(
    lambda x: int(np.argmin(dist(centroids_bc.value, x)))
)

In [45]:
df_closest = df.withColumn("closest", closest_centroid("features"))

In [48]:
df_closest.show(10)

+--------------------+-------+
|            features|closest|
+--------------------+-------+
|[1.0,0.625,0.6363...|      0|
|[1.0,0.625,0.6363...|      0|
|[1.0,1.0,0.727272...|      0|
|[0.875,0.875,1.0,...|      0|
|[0.125,0.875,0.36...|      0|
|[0.75,0.75,0.6363...|      0|
|[0.875,0.625,1.0,...|      1|
|[1.0,0.625,0.6363...|      0|
|[0.5,0.75,0.81818...|      1|
|[0.75,0.375,1.0,0...|      1|
+--------------------+-------+
only showing top 10 rows



In [51]:
from pyspark.ml.stat import Summarizer

In [52]:
new_centroids = df_closest.groupBy("closest")\
                          .agg(Summarizer.mean(sql_f.col("features")))

In [54]:
new_centroids.show(5, truncate = False)

+-------+-----------------------------------------------------------------------------------------------+
|closest|mean(features)                                                                                 |
+-------+-----------------------------------------------------------------------------------------------+
|0      |[0.8499999999999999,0.722222222222222,0.6323232323232324,0.7033333333333335,0.7813333333333333]|
|1      |[0.5937499999999999,0.4609375000000001,0.7670454545454546,0.83125,0.723125]                    |
+-------+-----------------------------------------------------------------------------------------------+



In [57]:
new_centroids = np.array(new_centroids.select("mean(features)").collect()).squeeze()

In [58]:
new_centroids

array([[0.85      , 0.72222222, 0.63232323, 0.70333333, 0.78133333],
       [0.59375   , 0.4609375 , 0.76704545, 0.83125   , 0.723125  ]])

In [59]:
def distributed_kmeans(df, k, fraction, threshold = 0.01, seed = 12345):
    #inicializacion
    centroids = np.array(df.sample(fraction = fraction, seed = seed).limit(k).collect()).squeeze()

    #bucle
    while True:
        
        #broadcast
        centroids_bc = sc.broadcast(centroids)   
        
        #calcular centroide mas cercano
        df_closest = df.withColumn("closest", closest_centroid("features"))

        #recalcular los centroides
        new_centroids = df_closest.groupBy("closest").agg(Summarizer.mean(sql_f.col("features")))

        new_centroids = np.array(new_centroids.select("mean(features)").collect()).squeeze()
        
        if dist(new_centroids, centroids, axis = None) < threshold:
            break
        else:
            centroids = new_centroids
    return centroids

In [60]:
distributed_kmeans(df, 2, 0.3)

array([[0.85      , 0.72222222, 0.63232323, 0.70333333, 0.78133333],
       [0.59375   , 0.4609375 , 0.76704545, 0.83125   , 0.723125  ]])