# Kmeans with PySpark
## author: Konstantinos Nikopoulos

## Import Libraries

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np

## Configure Spark 

In [3]:
# Local
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)

## Data Preprocessing

### Read csv

In [4]:
sqlContext = SQLContext(sc)
df = sqlContext.read.options(header=False,inferSchema=True).csv("data.csv")

### Drop empty rows

In [5]:
df = df.na.drop()

### Data

In [6]:
df.show(5)

+------+---+
|   _c0|_c1|
+------+---+
|4074.0|928|
| 635.0|935|
|1392.0|562|
|4002.0|149|
|3394.0|777|
+------+---+
only showing top 5 rows



### Scale to 0-1

In [7]:
# Vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

for i in ["_c0", "_c1"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])
    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

print("Scale to 0-1 :")
df.show(5)

Scale to 0-1 :
+------+---+----------+----------+
|   _c0|_c1|_c0_Scaled|_c1_Scaled|
+------+---+----------+----------+
|4074.0|928|     0.851|     0.937|
| 635.0|935|     0.133|     0.945|
|1392.0|562|     0.291|     0.517|
|4002.0|149|     0.836|     0.044|
|3394.0|777|     0.709|     0.764|
+------+---+----------+----------+
only showing top 5 rows



### Merge columns to create column "features"

In [8]:
vecAssembler = VectorAssembler(inputCols=["_c0_Scaled", "_c1_Scaled"], outputCol="features")
df = vecAssembler.transform(df)

print("Merge columns :")
df.show(5)

Merge columns :
+------+---+----------+----------+-------------+
|   _c0|_c1|_c0_Scaled|_c1_Scaled|     features|
+------+---+----------+----------+-------------+
|4074.0|928|     0.851|     0.937|[0.851,0.937]|
| 635.0|935|     0.133|     0.945|[0.133,0.945]|
|1392.0|562|     0.291|     0.517|[0.291,0.517]|
|4002.0|149|     0.836|     0.044|[0.836,0.044]|
|3394.0|777|     0.709|     0.764|[0.709,0.764]|
+------+---+----------+----------+-------------+
only showing top 5 rows



## Kmeans

### Choose optimal number of clusters using silhouette method

In [9]:
evaluator = ClusteringEvaluator()
silhouette = np.zeros(10)
for k in range(2,10):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df.sample(False,0.1, seed=42))
    predictions = model.transform(df)
    silhouette[k] = evaluator.evaluate(predictions) 
best_k = np.argmax(silhouette)

print("Best k: ",best_k + 1)
print("Silhouette with squared euclidean distance = ",silhouette[best_k])

Best k:  5
Silhouette with squared euclidean distance =  0.7796536458687179


### Apply Kmeans

In [10]:
kmeans = KMeans(k=best_k, seed=1)  
model = kmeans.fit(df.select('features'))

predictions = model.transform(df)
print("After Clustering :")
predictions.show(5)

After Clustering :
+------+---+----------+----------+-------------+----------+
|   _c0|_c1|_c0_Scaled|_c1_Scaled|     features|prediction|
+------+---+----------+----------+-------------+----------+
|4074.0|928|     0.851|     0.937|[0.851,0.937]|         1|
| 635.0|935|     0.133|     0.945|[0.133,0.945]|         2|
|1392.0|562|     0.291|     0.517|[0.291,0.517]|         2|
|4002.0|149|     0.836|     0.044|[0.836,0.044]|         3|
|3394.0|777|     0.709|     0.764|[0.709,0.764]|         1|
+------+---+----------+----------+-------------+----------+
only showing top 5 rows

