# Movement Monitoring Model

### 1. Initial Setups

Install PySpark 

In [1]:
# Install PySpark Python package 
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 34kB/s  eta 0:00:01                              | 1.3MB 2.6MB/s eta 0:01:23�                               | 5.0MB 2.6MB/s eta 0:01:22     |█▎                              | 8.6MB 2.6MB/s eta 0:01:22   |█▊                              | 11.4MB 5.5MB/s eta 0:00:38     |█▊                              | 11.7MB 5.5MB/s eta 0:00:38     |███                             | 21.0MB 5.3MB/s eta 0:00:37     |█████▌                          | 37.1MB 5.3MB/s eta 0:00:35     |█████▋                          | 37.9MB 5.3MB/s eta 0:00:3504     |███████████▏                    | 76.0MB 5.1MB/s eta 0:00:28     |███████████▎                    | 76.7MB 5.1MB/s eta 0:00:28     |███████████▌                    | 78.2MB 5.1MB/s eta 0:00:28████████████                    | 81.

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# SparkContext: Main entry point for Spark functionality
# SparkConf: For configuring Spark
# SparkSession: Allows programming Spark with DataFrame and Dataset APIs

In [3]:
# create a SparkContext object
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
# getOrCreate(): get a new entity or an existing entity from the database, if such entity exists
# setMaster: Set master URL to connect to, if local set as above

spark = SparkSession \
    .builder \
    .getOrCreate()

Load the datasets

In [6]:
# create dataframe 
df = spark.read.parquet('accelerometer.parquet')
# register a corresponding query table 
df.createOrReplaceTempView('df')

Check the unfiltered datasets

In [7]:
# display top 20 rows and the df scema
df.show()
df.printSchema()

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|
| 20| 50| 34|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 20| 51| 35|Accelerometer-201...|Brush_teeth|
| 18| 49| 34|Accelerometer-201...|Brush_teeth|
| 19| 48| 34|Accelerometer-201...|Brush_teeth|
| 16| 53| 34|Accelerometer-201...|Brush_teeth|
| 18| 52| 35|

### 2. Pipeline Processing

Import the ML packages 

In [8]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer, MinMaxScaler
from pyspark.ml.linalg import Vectors

Create the processing objects 

In [9]:
# converts the movement classes from strings to indexes, and one-hot encode them
indexer = StringIndexer(inputCol="class", outputCol="classIndex")
encoder = OneHotEncoder(inputCol="classIndex", outputCol="catVec")

# combine the coordinates into vectors 
vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")

# normalize the vector coordinates from 0 to 1 and standardize the scale
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
minmaxscaler = MinMaxScaler(inputCol="features_norm", outputCol="scaled_features")

Create the pipeline object

In [10]:
from pyspark.ml import Pipeline

# pipeline uses all the tranform object above
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer, minmaxscaler])

lets process the dataset by fitting it into the pipeline object

In [11]:
# transform the df and remove the original columns
processed_df = pipeline.fit(df).transform(df)

processed_df.show()

+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+--------------------+
|  x|  y|  z|              source|      class|classIndex|        catVec|        features|       features_norm|     scaled_features|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+--------------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|[0.26684636118598...|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|[0.26684636118598...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|[0.25950196592398...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|[0.25950196592398...|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.

### 3. Predictive Modelling

For ML algorithms are suitable for the objective - https://spark.apache.org/docs/latest/ml-clustering.html#gaussian-mixture-model-gmm

#### a. Clustering - K-Means and GMM (also returns the probabilty)

In [12]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import GaussianMixture

In [13]:
# create a K-Means/GMM object with K= # of movement types in the dataset
kmeans = KMeans(featuresCol="features").setK(14).setSeed(1)
gmm = GaussianMixture(featuresCol="features").setK(14)

Implement the objects in the pipeline

In [14]:
# pipeline uses all the tranform object above
pipeline_kmeans = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer, minmaxscaler, kmeans])
pipeline_gmm = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer, minmaxscaler, gmm])

# transform the df and remove the original columns
model_kmeans = pipeline_kmeans.fit(df)
model_gmm =  pipeline_gmm.fit(df)
pred_kmeans = model_kmeans.transform(df)
pred_gmm = model_gmm.transform(df)

pred_kmeans.show()
pred_gmm.show()

+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+--------------------+----------+
|  x|  y|  z|              source|      class|classIndex|        catVec|        features|       features_norm|     scaled_features|prediction|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+--------------------+----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|[0.26684636118598...|        11|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|[0.26684636118598...|        11|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|[0.25950196592398...|        11|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|[0.25950196592398...|        11|

Check accuracy of the models with Silhouette scores (closest to 1 is the most defined/accurate)

In [15]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

In [16]:
silhouette_kmeans = evaluator.evaluate(pred_kmeans)
silhouette_gmm = evaluator.evaluate(pred_gmm)

print("Silhouette with squared euclidean distance = " + str(silhouette_kmeans))
print("Silhouette with squared euclidean distance = " + str(silhouette_gmm))

Silhouette with squared euclidean distance = 0.41244594513295846
Silhouette with squared euclidean distance = 0.1323356399641611


#### b. Classification - Logistics Regression and Random Forest

Split the data into train and test sets 

In [17]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

Logistic Regression

In [18]:
from pyspark.ml.classification import LogisticRegression

In [19]:
# Create the Pipeline Processing Objects
indexer = StringIndexer(inputCol="class", outputCol="label")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

In [20]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline_lr = Pipeline(stages=[indexer, vectorAssembler, normalizer,lr])

In [21]:
model_lr = pipeline_lr.fit(df_train)
pred_lr = model_lr.transform(df_test)

pred_lr.show()

+---+---+---+--------------------+-------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  x|  y|  z|              source|        class|label|       features|       features_norm|       rawPrediction|         probability|prediction|
+---+---+---+--------------------+-------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  0| 16| 31|Accelerometer-201...|    Getup_bed|  1.0|[0.0,16.0,31.0]|[0.0,0.3404255319...|[1.25605454984587...|[0.20672602283027...|       0.0|
|  0| 25| 40|Accelerometer-201...|  Brush_teeth|  6.0|[0.0,25.0,40.0]|[0.0,0.3846153846...|[1.25605454984587...|[0.20672602283027...|       0.0|
|  0| 29| 17|Accelerometer-201...|    Getup_bed|  1.0|[0.0,29.0,17.0]|[0.0,0.6304347826...|[1.25605454984587...|[0.20672602283027...|       0.0|
|  0| 29| 34|Accelerometer-201...|         Walk|  0.0|[0.0,29.0,34.0]|[0.0,0.4603174603...|[1.25605454984587...|[0.20672602283027.

Check accuracy of LR prediction

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

eval = MulticlassClassificationEvaluator().setMetricName('accuracy').setLabelCol('label').setPredictionCol('prediction')
eval.evaluate(pred_lr)

0.20610695595534892

Random Forest

In [23]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

In [24]:
# Create the Pipeline Processing Objects
labelIndexer = StringIndexer(inputCol="class", outputCol="indexedLabel")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)

In [25]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=20)
pipeline_rf = Pipeline(stages=[labelIndexer,vectorAssembler, featureIndexer, rf])

In [26]:
model_rf = pipeline_rf.fit(df_train)
pred_rf = model_rf.transform(df_test)

pred_rf.show()

+---+---+---+--------------------+-------------+------------+---------------+---------------+--------------------+--------------------+----------+
|  x|  y|  z|              source|        class|indexedLabel|       features|indexedFeatures|       rawPrediction|         probability|prediction|
+---+---+---+--------------------+-------------+------------+---------------+---------------+--------------------+--------------------+----------+
|  0| 16| 31|Accelerometer-201...|    Getup_bed|         1.0|[0.0,16.0,31.0]|[0.0,16.0,31.0]|[7.29276286764421...|[0.36463814338221...|       0.0|
|  0| 25| 40|Accelerometer-201...|  Brush_teeth|         6.0|[0.0,25.0,40.0]|[0.0,25.0,40.0]|[4.92198623961629...|[0.24609931198081...|       1.0|
|  0| 29| 17|Accelerometer-201...|    Getup_bed|         1.0|[0.0,29.0,17.0]|[0.0,29.0,17.0]|[7.21487333949433...|[0.36074366697471...|       0.0|
|  0| 29| 34|Accelerometer-201...|         Walk|         0.0|[0.0,29.0,34.0]|[0.0,29.0,34.0]|[8.11502316287000...|[0.4

Check for accuracy of RF prediction

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred_rf)

print(accuracy)

0.44028928868225414
