## K-Means with PySpark

In [1]:
spark.version

'3.0.2'

### Step 0 - Create example data

In [2]:
df = spark.createDataFrame([[0, 33.3, -17.5],
                              [1, 40.4, -20.5],
                              [2, 28., -23.9],
                              [3, 29.5, -19.0],
                              [4, 32.8, -18.84]
                             ],
                              ["other","lat", "long"])

df.show()

+-----+----+------+
|other| lat|  long|
+-----+----+------+
|    0|33.3| -17.5|
|    1|40.4| -20.5|
|    2|28.0| -23.9|
|    3|29.5| -19.0|
|    4|32.8|-18.84|
+-----+----+------+



### Step 1 - Assemble your features

In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a ***single column*** of your dataframe, usually named `features`; and it provides a specific method for doing this, `VectorAssembler`:

In [3]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()

+-----+----+------+-------------+
|other| lat|  long|     features|
+-----+----+------+-------------+
|    0|33.3| -17.5| [33.3,-17.5]|
|    1|40.4| -20.5| [40.4,-20.5]|
|    2|28.0| -23.9| [28.0,-23.9]|
|    3|29.5| -19.0| [29.5,-19.0]|
|    4|32.8|-18.84|[32.8,-18.84]|
+-----+----+------+-------------+



As perhaps already guessed, the argument `inputCols` serves to tell `VectorAssembler` which particular columns in our dataframe are to be used as features.

### Step 2 - Fit your KMeans model

In [4]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

`select('features')` here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original `lat` & `long` features are no more directly used.

### Step 3 - Transform your initial dataframe to include cluster assignments

In [5]:
transformed = model.transform(new_df)
transformed.show()  

+-----+----+------+-------------+----------+
|other| lat|  long|     features|prediction|
+-----+----+------+-------------+----------+
|    0|33.3| -17.5| [33.3,-17.5]|         0|
|    1|40.4| -20.5| [40.4,-20.5]|         0|
|    2|28.0| -23.9| [28.0,-23.9]|         1|
|    3|29.5| -19.0| [29.5,-19.0]|         0|
|    4|32.8|-18.84|[32.8,-18.84]|         0|
+-----+----+------+-------------+----------+



The last column of the `transformed` dataframe, `prediction`, shows the cluster assignment - in my example case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.

### Resource:
- [KMeans clustering in PySpark](https://stackoverflow.com/a/47593712/9500955)
- [PySpark ML: Get KMeans cluster statistics](https://stackoverflow.com/a/47156822/9500955)