# ðŸ§  K-Means Clustering Using Spark MLlib

This guide demonstrates how to apply **K-Means clustering** using **Spark MLlib** on a structured dataset.

---

## ðŸ“‚ Sample Dataset (`customers.csv`)

```text
customer_id,age,annual_income,spending_score
1,19,15,39
2,21,15,81
3,20,16,6
4,23,16,77
5,31,17,40
6,22,17,76
````

---

# âœ… Step 1: Load Dataset into Spark

```scala
val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("customers.csv")
```

---

# âœ… Step 2: Prepare Data for Clustering

Select numerical features and convert to vector:

```scala
import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
  .setInputCols(Array("age", "annual_income", "spending_score"))
  .setOutputCol("features")

val featureDF = assembler.transform(df).select("customer_id", "features")
```

---

# âœ… Step 3: Apply K-Means Algorithm

```scala
import org.apache.spark.ml.clustering.KMeans

val kmeans = new KMeans()
  .setK(3)
  .setSeed(1L)
  .setFeaturesCol("features")
  .setPredictionCol("cluster")

val model = kmeans.fit(featureDF)
```

---

# âœ… Step 4: Get Cluster Assignments

```scala
val clusteredDF = model.transform(featureDF)
clusteredDF.show()
```

---

# âœ… Step 5: Save Clustering Results

```scala
clusteredDF.write.mode("overwrite").csv("output/kmeans_result")
```

---

## ðŸ“Œ Sample Output

```text
+-----------+-------------+-------+
|customer_id|features     |cluster|
+-----------+-------------+-------+
|1          |[19,15,39]   |1      |
|2          |[21,15,81]   |0      |
|3          |[20,16,6]    |2      |
|4          |[23,16,77]   |0      |
|5          |[31,17,40]   |1      |
|6          |[22,17,76]   |0      |
+-----------+-------------+-------+
```


## PySpark

In [2]:
# ---------------------------------------------
# K-Means Clustering Using Spark MLlib
# With Sample Random Data
# ---------------------------------------------

from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import rand
import random

# ---------------------------------------------
# 1. Create Spark Session
# ---------------------------------------------
spark = SparkSession.builder \
    .appName("KMeansExample") \
    .getOrCreate()

# ---------------------------------------------
# 2. Generate Sample Random Data
# ---------------------------------------------
# Create random 2D points around 3 centers

data = []

# Cluster 1 around (2, 2)
for _ in range(100):
    data.append((random.gauss(2, 0.5), random.gauss(2, 0.5)))

# Cluster 2 around (8, 8)
for _ in range(100):
    data.append((random.gauss(8, 0.5), random.gauss(8, 0.5)))

# Cluster 3 around (5, 12)
for _ in range(100):
    data.append((random.gauss(5, 0.5), random.gauss(12, 0.5)))

# Create DataFrame
df = spark.createDataFrame(data, ["x", "y"])

print("Sample Data:")
df.show(5)

# ---------------------------------------------
# 3. Convert Columns into Feature Vector
# ---------------------------------------------
assembler = VectorAssembler(
    inputCols=["x", "y"],
    outputCol="features"
)

dataset = assembler.transform(df)

# ---------------------------------------------
# 4. Train K-Means Model
# ---------------------------------------------
kmeans = KMeans() \
    .setK(3) \
    .setSeed(1) \
    .setFeaturesCol("features") \
    .setPredictionCol("cluster")

model = kmeans.fit(dataset)

# ---------------------------------------------
# 5. Make Predictions
# ---------------------------------------------
predictions = model.transform(dataset)

print("Cluster Assignments:")
predictions.select("x", "y", "cluster").show(10)

# ---------------------------------------------
# 6. Print Cluster Centers
# ---------------------------------------------
centers = model.clusterCenters()

print("Cluster Centers:")
for i, center in enumerate(centers):
    print(f"Cluster {i}: {center}")


from pyspark.ml.evaluation import ClusteringEvaluator

# ---------------------------------------------
# Evaluate using Silhouette Score
# ---------------------------------------------
evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="cluster",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean"
)

silhouette = evaluator.evaluate(predictions)

print("Silhouette Score =", silhouette)

# ---------------------------------------------
# 7. Stop Spark Session
# ---------------------------------------------
spark.stop()

Sample Data:
+------------------+------------------+
|                 x|                 y|
+------------------+------------------+
| 2.142931171001465|1.7553987054858362|
|2.0965233228962723|1.8124620754314489|
|3.1611847805974573|1.5474923426841019|
|2.4627082038244725|1.5553530309118573|
| 2.154144869166751|1.9375302684215185|
+------------------+------------------+
only showing top 5 rows
Cluster Assignments:
+------------------+------------------+-------+
|                 x|                 y|cluster|
+------------------+------------------+-------+
| 2.142931171001465|1.7553987054858362|      1|
|2.0965233228962723|1.8124620754314489|      1|
|3.1611847805974573|1.5474923426841019|      1|
|2.4627082038244725|1.5553530309118573|      1|
| 2.154144869166751|1.9375302684215185|      1|
| 1.752965208665309| 2.473069050715892|      1|
|1.7329842153400274| 2.661099857346823|      1|
|2.4267711512894157|1.5080098984766086|      1|
|1.6729817530368574| 1.077658729676557|      1|
|2.300