In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("cluster").getOrCreate()

In [3]:
from pyspark.ml.clustering import KMeans

In [4]:
dataset = spark.read.format("libsvm").load('sample_kmeans_data.txt')

In [5]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [6]:
final_data = dataset.select('features')

In [7]:
#seed number provides the same "randomness"
kmeans = KMeans().setK(2).setSeed(1)

In [8]:
model = kmeans.fit(final_data)

In [9]:
wsse = model.computeCost(final_data)

In [10]:
print (wsse)

0.11999999999994547


In [11]:
centers = model.clusterCenters()

In [12]:
centers

[array([ 0.1,  0.1,  0.1]), array([ 9.1,  9.1,  9.1])]

In [13]:
results = model.transform(final_data)

In [14]:
results.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         0|
|(3,[0,1,2],[0.1,0...|         0|
|(3,[0,1,2],[0.2,0...|         0|
|(3,[0,1,2],[9.0,9...|         1|
|(3,[0,1,2],[9.1,9...|         1|
|(3,[0,1,2],[9.2,9...|         1|
+--------------------+----------+



In [15]:
kmeans2 = KMeans().setK(3).setSeed(1)

In [16]:
model2 = kmeans2.fit(final_data)

In [17]:
wsse2 = model2.computeCost(final_data)

In [18]:
wsse2

0.07499999999994544

In [19]:
centers2 = model2.clusterCenters()

In [20]:
centers2

[array([ 9.1,  9.1,  9.1]),
 array([ 0.05,  0.05,  0.05]),
 array([ 0.2,  0.2,  0.2])]

In [21]:
results2 = model2.transform(final_data)

In [22]:
results2.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         1|
|(3,[0,1,2],[0.1,0...|         1|
|(3,[0,1,2],[0.2,0...|         2|
|(3,[0,1,2],[9.0,9...|         0|
|(3,[0,1,2],[9.1,9...|         0|
|(3,[0,1,2],[9.2,9...|         0|
+--------------------+----------+



## Code Along

In [3]:
dataset = spark.read.csv('seeds_dataset.csv', header=True, inferSchema=True)

In [4]:
dataset.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



In [5]:
dataset.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22)]

In [6]:
from pyspark.ml.clustering import KMeans

In [7]:
from pyspark.ml.feature import VectorAssembler

In [8]:
assembler = VectorAssembler(inputCols=dataset.columns, outputCol='features')

In [9]:
final_data = assembler.transform(dataset)

In [10]:
final_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)



In [11]:
from pyspark.ml.feature import StandardScaler

In [12]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

In [13]:
scaler_model = scaler.fit(final_data)

In [14]:
final_data = scaler_model.transform(final_data)

In [15]:
final_data.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22, features=DenseVector([15.26, 14.84, 0.871, 5.763, 3.312, 2.221, 5.22]), scaledFeatures=DenseVector([5.2445, 11.3633, 36.8608, 13.0072, 8.7685, 1.4772, 10.621]))]

In [18]:
kmeans = KMeans(featuresCol='scaledFeatures', k=3)

In [19]:
model = kmeans.fit(final_data)

In [20]:
print ('WSSE')
print (model.computeCost(final_data))

WSSE
429.0076196545483


In [21]:
centers = model.clusterCenters()

In [22]:
print (centers)

[array([  4.90455443,  10.919579  ,  37.26051182,  12.3885095 ,
         8.57467662,   1.81659031,  10.38074771]), array([  6.35645488,  12.40730852,  37.41990178,  13.93860446,
         9.7892399 ,   2.41585013,  12.29286107]), array([  4.06133795,  10.13721767,  35.82681204,  11.81771972,
         7.5087187 ,   3.25852121,  10.4215732 ])]


In [24]:
model.transform(final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         1|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         2|
+----------+
only showing top 20 rows

