In [1]:
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=ee983dae9ea7212823af655a3f5e4a69b5b39f0118f62eea30eac5fad6f8d665
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


#Clustering Code Along

We'll be working with a real data set about seeds, from UCI repository: https://archive.ics.uci.edu/ml/datasets/seeds.

The examined group comprised kernels belonging to three different varieties of wheat: Kama, Rosa and Canadian, 70 elements each, randomly selected for the experiment. High quality visualization of the internal kernel structure was detected using a soft X-ray technique. It is non-destructive and considerably cheaper than other more sophisticated imaging techniques like scanning microscopy or laser technology. The images were recorded on 13x18 cm X-ray KODAK plates. Studies were conducted using combine harvested wheat grain originating from experimental fields, explored at the Institute of Agrophysics of the Polish Academy of Sciences in Lublin.

The data set can be used for the tasks of classification and cluster analysis.

Attribute Information:

To construct the data, seven geometric parameters of wheat kernels were measured:

1. area A,
2. perimeter P,
3. compactness C = 4piA/P^2,
4. length of kernel,
5. width of kernel,
6. asymmetry coefficient
7. length of kernel groove.

All of these parameters were real-valued continuous.

Let's see if we can cluster them in to 3 groups with K-means!

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster').getOrCreate()

In [3]:
# Loads data.
dataset = spark.read.csv("/content/sample_data/seeds_dataset.csv",header=True,inferSchema=True)

In [4]:
dataset.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



In [5]:
dataset.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22)]

## Format the Data

In [24]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

In [25]:
dataset.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

In [26]:
assembler=VectorAssembler(inputCols=dataset.columns,outputCol='features')

In [27]:
final_data=assembler.transform(dataset)

In [28]:
final_data.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)



## Scale the Data

It is a good idea to scale our data to deal with the curse of dimensionality: https://en.wikipedia.org/wiki/Curse_of_dimensionality

In [29]:
from pyspark.ml.feature import StandardScaler

In [30]:
scaler=StandardScaler(inputCol='features',outputCol='scaledFeatures',withStd=True, withMean=False)

In [31]:
# Compute summary statistics by fitting the StandardScaler
scaler_model=scaler.fit(final_data)

In [32]:
# Normalize each feature to have unit standard deviation.
final_data=scaler_model.transform(final_data)

In [33]:
final_data.head(1)

[Row(area=15.26, perimeter=14.84, compactness=0.871, length_of_kernel=5.763, width_of_kernel=3.312, asymmetry_coefficient=2.221, length_of_groove=5.22, features=DenseVector([15.26, 14.84, 0.871, 5.763, 3.312, 2.221, 5.22]), scaledFeatures=DenseVector([5.2445, 11.3633, 36.8608, 13.0072, 8.7685, 1.4772, 10.621]))]

## Train the Model and Evaluate

In [34]:
# Trains a k-means model.
kmeans=KMeans(featuresCol='scaledFeatures',k=3)

In [17]:
model=kmeans.fit(final_data)

In [35]:
# Evaluate clustering by computing Within Set Sum of Squared Errors.
print("WSSSE: {}".format(model.summary.trainingCost))

WSSSE: 429.07559671507244


In [36]:
# Shows the result.
centers=model.clusterCenters()
print("Cluster Centers :")
for i in centers:
  print(i)

Cluster Centers :
[ 4.87257659 10.88120146 37.27692543 12.3410157   8.55443412  1.81649011
 10.32998598]
[ 6.31670546 12.37109759 37.39491396 13.91155062  9.748067    2.39849968
 12.2661748 ]
[ 4.06105916 10.13979506 35.80536984 11.82133095  7.50395937  3.27184732
 10.42126018]


In [37]:
model.transform(final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         1|
|         1|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         2|
+----------+
only showing top 20 rows

