In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Cluster').getOrCreate()

In [2]:
from pyspark.ml.clustering import KMeans

## Documentation Example

In [3]:
dataset = spark.read.format('libsvm').load('sample_kmeans_data.txt')

In [4]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [5]:
final_data = dataset.select('features')

In [6]:
final_data.show()

+--------------------+
|            features|
+--------------------+
|           (3,[],[])|
|(3,[0,1,2],[0.1,0...|
|(3,[0,1,2],[0.2,0...|
|(3,[0,1,2],[9.0,9...|
|(3,[0,1,2],[9.1,9...|
|(3,[0,1,2],[9.2,9...|
+--------------------+



In [7]:
kmeans = KMeans().setK(2).setSeed(1)

In [8]:
model = kmeans.fit(final_data)

In [9]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

In [10]:
predictions = model.transform(final_data)
silhouette = evaluator.evaluate(predictions)

In [11]:
print(f'silhouette score: {silhouette}')

silhouette score: 0.9997530305375207


In [12]:
wssse = model.summary.trainingCost

In [13]:
print(f'wssse: {wssse}')

wssse: 0.11999999999994547


In [14]:
centers = model.clusterCenters()

In [15]:
centers

[array([9.1, 9.1, 9.1]), array([0.1, 0.1, 0.1])]

In [16]:
predictions.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         1|
|(3,[0,1,2],[0.1,0...|         1|
|(3,[0,1,2],[0.2,0...|         1|
|(3,[0,1,2],[9.0,9...|         0|
|(3,[0,1,2],[9.1,9...|         0|
|(3,[0,1,2],[9.2,9...|         0|
+--------------------+----------+



## Example with more real data

In [17]:
dataset = spark.read.csv('seeds_dataset.csv', inferSchema=True, header=True)

In [18]:
dataset.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



In [19]:
dataset.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22)]

In [20]:
dataset.show(5)

+-----+---------+-----------+------------------+------------------+---------------------+----------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|length_of_groove|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|            5.22|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|           4.956|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|           4.825|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|           4.805|
|16.14|    14.99|     0.9034|5.6579999999999995|             3.562|                1.355|           5.175|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+
only showing top 5 rows



In [21]:
from pyspark.ml.feature import VectorAssembler

In [22]:
dataset.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

In [23]:
assembler = VectorAssembler(inputCols=dataset.columns, outputCol='features')

In [24]:
final_data = assembler.transform(dataset)

In [25]:
final_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)



In [26]:
final_data.show(5)

+-----+---------+-----------+------------------+------------------+---------------------+----------------+--------------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|length_of_groove|            features|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+--------------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|            5.22|[15.26,14.84,0.87...|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|           4.956|[14.88,14.57,0.88...|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|           4.825|[14.29,14.09,0.90...|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|           4.805|[13.84,13.94,0.89...|
|16.14|    14.99|     0.9034|5.6579999999999995|             3.562|                1.355|           5.17

In [27]:
from pyspark.ml.feature import StandardScaler

In [28]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

In [29]:
scale_model = scaler.fit(final_data)

In [30]:
final_data = scale_model.transform(final_data)

In [31]:
final_data.show(5)

+-----+---------+-----------+------------------+------------------+---------------------+----------------+--------------------+--------------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|length_of_groove|            features|      scaledFeatures|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+--------------------+--------------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|            5.22|[15.26,14.84,0.87...|[5.24452795332028...|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|           4.956|[14.88,14.57,0.88...|[5.11393027165175...|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|           4.825|[14.29,14.09,0.90...|[4.91116018695588...|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|           4.805|

In [32]:
final_data.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22, features=DenseVector([15.26, 14.84, 0.871, 5.763, 3.312, 2.221, 5.22]), scaledFeatures=DenseVector([5.2445, 11.3633, 36.8608, 13.0072, 8.7685, 1.4772, 10.621]))]

In [33]:
kmeans = KMeans(featuresCol='scaledFeatures', k=3)

In [34]:
model = kmeans.fit(final_data)

In [36]:
print(f'WSSSE:{model.summary.trainingCost}')

WSSSE:429.03171593809003


In [37]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()
predictions = model.transform(final_data)
silhouette = evaluator.evaluate(predictions)
print(f'silhouette score: {silhouette}')

silhouette score: 0.6289524749632378


In [38]:
centers = model.clusterCenters()
print(centers)

[array([ 6.3407095 , 12.39263108, 37.41143125, 13.92892299,  9.77251635,
        2.42396447, 12.28547936]), array([ 4.06818854, 10.13938448, 35.87110297, 11.81191124,  7.52564313,
        3.24585755, 10.40780927]), array([ 4.91589737, 10.9321157 , 37.2641905 , 12.39722305,  8.58688868,
        1.77370154, 10.37323607])]


In [40]:
predictions.show()

+-----+---------+-----------+------------------+------------------+---------------------+------------------+--------------------+--------------------+----------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|  length_of_groove|            features|      scaledFeatures|prediction|
+-----+---------+-----------+------------------+------------------+---------------------+------------------+--------------------+--------------------+----------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|              5.22|[15.26,14.84,0.87...|[5.24452795332028...|         2|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|             4.956|[14.88,14.57,0.88...|[5.11393027165175...|         2|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|             4.825|[14.29,14.09,0.90...|[4.91116018695588...|         2|
|13.84|    13.94|     0.8955

In [41]:
predictions.select('features', 'prediction').show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[15.26,14.84,0.87...|         2|
|[14.88,14.57,0.88...|         2|
|[14.29,14.09,0.90...|         2|
|[13.84,13.94,0.89...|         2|
|[16.14,14.99,0.90...|         2|
|[14.38,14.21,0.89...|         2|
|[14.69,14.49,0.87...|         2|
|[14.11,14.1,0.891...|         2|
|[16.63,15.46,0.87...|         0|
|[16.44,15.25,0.88...|         2|
|[15.26,14.85,0.86...|         2|
|[14.03,14.16,0.87...|         2|
|[13.89,14.02,0.88...|         2|
|[13.78,14.06,0.87...|         2|
|[13.74,14.05,0.87...|         2|
|[14.59,14.28,0.89...|         2|
|[13.99,13.83,0.91...|         2|
|[15.69,14.75,0.90...|         2|
|[14.7,14.21,0.915...|         2|
|[12.72,13.57,0.86...|         1|
+--------------------+----------+
only showing top 20 rows

