In [0]:

from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("cluster").getOrCreate()

In [0]:
from pyspark.ml.clustering import KMeans

In [0]:
dataset = spark.read.format("libsvm").load("/FileStore/tables/sample_kmeans_data.txt")

In [0]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [0]:
final_data = dataset.select("features")

In [0]:
final_data.show()

+--------------------+
|            features|
+--------------------+
|           (3,[],[])|
|(3,[0,1,2],[0.1,0...|
|(3,[0,1,2],[0.2,0...|
|(3,[0,1,2],[9.0,9...|
|(3,[0,1,2],[9.1,9...|
|(3,[0,1,2],[9.2,9...|
+--------------------+



In [0]:
kmeans = KMeans().setK(3).setSeed(1)

In [0]:
model = kmeans.fit(final_data)

In [0]:
# within set sum of squared errors
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F

# Get cluster centers
centers = model.clusterCenters()

# Broadcast centers to use in a UDF
broadcast_centers = sc.broadcast(centers)

# Define a UDF to compute squared Euclidean distance
def squared_distance(point, cluster):
    center = broadcast_centers.value[cluster]
    return float(point.squared_distance(Vectors.dense(center)))

distance_udf = F.udf(squared_distance)

# Add predictions to the DataFrame
predicted = model.transform(final_data)

# Compute cost by summing the squared distances
cost = predicted.withColumn("squaredDistance", distance_udf("features", "prediction")) \
                .agg(F.sum("squaredDistance").alias("cost")).collect()[0]["cost"]

print("Clustering cost: ", cost)

Clustering cost:  0.07499999999999958


In [0]:
centers

Out[19]: [array([9.1, 9.1, 9.1]), array([0.05, 0.05, 0.05]), array([0.2, 0.2, 0.2])]

In [0]:
final_data.show()

+--------------------+
|            features|
+--------------------+
|           (3,[],[])|
|(3,[0,1,2],[0.1,0...|
|(3,[0,1,2],[0.2,0...|
|(3,[0,1,2],[9.0,9...|
|(3,[0,1,2],[9.1,9...|
|(3,[0,1,2],[9.2,9...|
+--------------------+



In [0]:
results = model.transform(final_data)

In [0]:
results.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         1|
|(3,[0,1,2],[0.1,0...|         1|
|(3,[0,1,2],[0.2,0...|         2|
|(3,[0,1,2],[9.0,9...|         0|
|(3,[0,1,2],[9.1,9...|         0|
|(3,[0,1,2],[9.2,9...|         0|
+--------------------+----------+

