#Spark ML Clustering

We use Spark ML to perform clustering. The dataset we are using has acceleration recording from wearable devices in x, y and z coordinates. The datapoints are divided into different classes which categorize what the person was doing while the acceleromoter was recording the values (e.g. walking, brushing teeth, pouring water, etc.)

In [2]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget -P /tmp https://github.com/IBM/coursera/raw/master/hmp.parquet

In [3]:
display(dbutils.fs.ls("file:/tmp/hmp.parquet"))

path,name,size
file:/tmp/hmp.parquet,hmp.parquet,932997


In [4]:
# create a dataframe out of it
df = spark.read.parquet('file:/tmp/hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')
display(df.limit(10))

x,y,z,source,class
22,49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
22,49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
22,52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
22,52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
21,52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
22,51,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
20,50,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
22,52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
22,50,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth
22,51,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth


In [5]:
# let's look at the different classes in our dataset
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
display(sqlContext.sql("select class, count(*) as `count` from df group by class"))

class,count
Use_telephone,15225
Standup_chair,25417
Eat_meat,31236
Getup_bed,45801
Drink_glass,42792
Pour_water,41673
Comb_hair,23504
Walk,92254
Climb_stairs,40258
Sitdown_chair,25036


In [6]:
# number of distinct classes in our dataset
display(sqlContext.sql("select count(distinct class) from df"))

count(DISTINCT class)
14


Let's do some preprocessing to data

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")
vecFeatures = vectorAssembler.transform(df)
display(vecFeatures.limit(10))

x,y,z,source,class,features
22,49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 49.0, 35.0))"
22,49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 49.0, 35.0))"
22,52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 52.0, 35.0))"
22,52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 52.0, 35.0))"
21,52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(21.0, 52.0, 34.0))"
22,51,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 51.0, 34.0))"
20,50,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(20.0, 50.0, 35.0))"
22,52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 52.0, 34.0))"
22,50,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 50.0, 34.0))"
22,51,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 51.0, 35.0))"


In [9]:
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
normFeatures = normalizer.transform(vecFeatures)
display(normFeatures.limit(10))

x,y,z,source,class,features,features_norm
22,49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 49.0, 35.0))","List(1, 3, List(), List(0.20754716981132076, 0.46226415094339623, 0.330188679245283))"
22,49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 49.0, 35.0))","List(1, 3, List(), List(0.20754716981132076, 0.46226415094339623, 0.330188679245283))"
22,52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 52.0, 35.0))","List(1, 3, List(), List(0.2018348623853211, 0.47706422018348627, 0.3211009174311927))"
22,52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 52.0, 35.0))","List(1, 3, List(), List(0.2018348623853211, 0.47706422018348627, 0.3211009174311927))"
21,52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(21.0, 52.0, 34.0))","List(1, 3, List(), List(0.19626168224299065, 0.48598130841121495, 0.3177570093457944))"
22,51,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 51.0, 34.0))","List(1, 3, List(), List(0.205607476635514, 0.4766355140186916, 0.3177570093457944))"
20,50,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(20.0, 50.0, 35.0))","List(1, 3, List(), List(0.19047619047619047, 0.47619047619047616, 0.3333333333333333))"
22,52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 52.0, 34.0))","List(1, 3, List(), List(0.2037037037037037, 0.48148148148148145, 0.3148148148148148))"
22,50,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 50.0, 34.0))","List(1, 3, List(), List(0.20754716981132076, 0.4716981132075472, 0.32075471698113206))"
22,51,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,"List(1, 3, List(), List(22.0, 51.0, 35.0))","List(1, 3, List(), List(0.2037037037037037, 0.4722222222222222, 0.32407407407407407))"


Create a pipeline for Kmeans clustering. 

Also, assuming that we did not have prior knowledge of number of classes, we can use our pipeline to build different variations of our kmeans model and evaluate each one to find the best fit.

We will evaulaute the clustering models using Silhouette analysis. Silhouette analysis can be used to study the separation distance between the resulting clusters. This measure has a range of [-1, 1].

In [11]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

k_values = [6, 8, 10, 12, 14, 16, 20]
silhouette_values = []
for k in k_values:
  
  kmeans = KMeans(featuresCol="features").setK(k).setSeed(1)
  pipeline = Pipeline(stages=[vectorAssembler, kmeans])
  model = pipeline.fit(df)
  predictions = model.transform(df)

  evaluator = ClusteringEvaluator()
  silhouette_values.append(evaluator.evaluate(predictions))
  
  print("With k = " + str(k) + ": Silhouette with squared euclidean distance = " + str(silhouette_values[-1]))


It appears from our results that as we increase the number of clusters k, our silhouette scores go down. 

A reason for this could be that a lot of the classes can be very similar in the type of x,y,z accelerations they produce. For example, 'standup_chair' and 'getup_bed' or 'sitdown_chair' and 'liedown_bed' could be very similar. Also, classes 'walk', 'climb_strairs' and 'descend_stairs' can generate very simialr recordings since all three activities involve swinging the arms back and forward (assuming the wearable device is on the wrist). So in reality we may only have 6-10 highly distinct classes of activities. 

We achieved silhouette score of 0.4124 for k=14. Let's try normalizing the features to see if helps.

In [13]:
kmeans = KMeans(featuresCol="features").setK(14).setSeed(1)
pipeline = Pipeline(stages=[vectorAssembler, normalizer, kmeans])
model = pipeline.fit(df)
predictions = model.transform(df)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Kmeans with k = " + str(14) + ": Silhouette with squared euclidean distance = " + str(silhouette))

Normalizing didn't make much differnce.

Sometimes, inflating the dataset helps, here we multiply x by 10, let’s see if the performance inceases.

In [15]:
from pyspark.sql.functions import col
df_denormalized = df.select([col('*'),(col('x')*10)]).drop('x').withColumnRenamed('(x * 10)','x')
display(df_denormalized.limit(10))

y,z,source,class,x
49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220
49,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220
52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220
52,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220
52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,210
51,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220
50,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,200
52,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220
50,34,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220
51,35,Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt,Brush_teeth,220


In [16]:
kmeans = KMeans(featuresCol="features").setK(14).setSeed(1)
pipeline = Pipeline(stages=[vectorAssembler, kmeans])
model = pipeline.fit(df_denormalized)
predictions = model.transform(df_denormalized)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Kmeans with k = " + str(14) + ": Silhouette with squared euclidean distance = " + str(silhouette))

We improved the score for k=14 clustering from 0.41 to 0.57 just by de-normalizing the x values!