# Basic operations using Apache SparkML Pipeline

In this notebook I’ll use the HMP dataset and perform some basic operations using Apache SparkML Pipeline component. This dataset is a public collection of labelled accelerometer data recordings to be used for the creation and validation of acceleration models of human motion primitives. Let's start.

## 1. Download Dataset


In [1]:
!git clone https://github.com/wchill/HMP_Dataset.git

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200305085727-0000
KERNEL_ID = bd96fc56-1d60-4444-ab91-399120bcba83
Cloning into 'HMP_Dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 0 bytes/s, done.
Checking out files: 100% (848/848), done.


In [2]:
!ls HMP_Dataset

Brush_teeth	Drink_glass  Liedown_bed  Sitdown_chair  final.py
Climb_stairs	Eat_meat     MANUAL.txt   Standup_chair  impdata.py
Comb_hair	Eat_soup     Pour_water   Use_telephone
Descend_stairs	Getup_bed    README.txt   Walk


In [4]:
!ls HMP_Dataset/Brush_teeth

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt


## 2. Create the Schema

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([StructField("x", IntegerType(), True), StructField("y", IntegerType(), True), StructField("z", IntegerType(), True)] )

## 3. Import Data
     

In [10]:
import os
file = os.listdir('HMP_Dataset')

In [11]:
file_filtered = [s for s in file if '_' in s]

In [12]:
file_filtered

['Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'Pour_water',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone']

In [1]:
from pyspark.sql.functions import lit
df = None
for category in file_filtered:
    data_files = os.listdir('HMP_Dataset/'+category)
    for data_file in data_files:
        temp_df = spark.read.option("header","false").option("delimiter", " ").csv("HMP_Dataset/"+category+"/"+data_file, schema = schema)
        
        temp_df = temp_df.withColumn('class', lit(category))
        temp_df = temp_df.withColumn('source', lit(data_file))
        
        if df is None:
            df =  temp_df
        else:
            df = df.union(temp_df)


In [21]:
df.show()

+---+---+---+-----------+--------------------+
|  x|  y|  z|      class|              source|
+---+---+---+-----------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 20| 51| 35|Brush_teeth|Accelerometer-201...|
| 18| 49| 34|Brush_teeth|Accelerometer-201...|
| 19| 48| 34|Brush_teeth|Accelerometer-201...|
| 16| 53| 34|Brush_teeth|Accelerometer-201...|
| 18| 52| 35|

## 4. Pipeline : Index the Class String

In [22]:
from pyspark.ml.feature import StringIndexer
Indexer = StringIndexer(inputCol ='class', outputCol = 'classIndex')
indexed = Indexer.fit(df).transform(df)
indexed.show()

+---+---+---+-----------+--------------------+----------+
|  x|  y|  z|      class|              source|classIndex|
+---+---+---+-----------+--------------------+----------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 51| 35|B

## 5. One-hot Encode

In [23]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol = 'classIndex', outputCol = 'Categoryvec')
encoded = encoder.transform(indexed)

In [24]:
encoded.show()

+---+---+---+-----------+--------------------+----------+--------------+
|  x|  y|  z|      class|              source|classIndex|   Categoryvec|
+---+---+---+-----------+--------------------+----------+--------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|     

 ## 6. Transform x, y, z to Vector

In [31]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
vectorAssambler = VectorAssembler(inputCols=["x","y","z"], outputCol ='features')

featuresVectorized = vectorAssambler.transform(encoded)

In [33]:
featuresVectorized.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+
|  x|  y|  z|      class|              source|classIndex|   Categoryvec|        features|
+---+---+---+-----------+--------------------+----------+--------------+----------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[20.0,50.0,35.0]|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,34.0]|
| 22| 50| 

## 7. p-norm Normalization
1-norm: Taxicab Norm or Manhattan Norm
1-norm is simply the sum of the absolute values of the columns.

$$||x||_1 := \Sigma^n_{i=1}|x_i|$$

2-norm: Euclidian Norm

$$||x||_2 := \sqrt{x_1^2 + ... + x_n^2}$$
p-norm

$$||X||_p := (\Sigma^n_{i=1} |X_i|^p)^{1/p}$$

In [35]:
from pyspark.ml.feature import Normalizer
normalized = Normalizer(inputCol = "features", outputCol = 'features_norm', p=1.0)

normalized_data = normalized.transform(featuresVectorized)
normalized_data.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|      class|              source|classIndex|   Categoryvec|        features|       features_norm|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

 ## 8. Pipeline

In [42]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages =[Indexer, encoder, vectorAssambler, normalized])


In [43]:
model = pipeline.fit(df)


Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/opt/ibm/spark/python/pyspark/ml/wrapper.py", line 105, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
AttributeError: 'Normalizer' object has no attribute '_java_obj'


In [44]:
prediction = model.transform(df)

In [45]:
df_train = prediction.drop('x').drop('y').drop('z').drop('class').drop('source').drop('features').drop('classIndex')


In [46]:
df_train.show()

+--------------+--------------------+
|   Categoryvec|       features_norm|
+--------------+--------------------+
|(12,[5],[1.0])|[0.20754716981132...|
|(12,[5],[1.0])|[0.20754716981132...|
|(12,[5],[1.0])|[0.20183486238532...|
|(12,[5],[1.0])|[0.20183486238532...|
|(12,[5],[1.0])|[0.19626168224299...|
|(12,[5],[1.0])|[0.20560747663551...|
|(12,[5],[1.0])|[0.19047619047619...|
|(12,[5],[1.0])|[0.20370370370370...|
|(12,[5],[1.0])|[0.20754716981132...|
|(12,[5],[1.0])|[0.20370370370370...|
|(12,[5],[1.0])|[0.2,0.4857142857...|
|(12,[5],[1.0])|[0.19230769230769...|
|(12,[5],[1.0])|[0.20388349514563...|
|(12,[5],[1.0])|[0.20388349514563...|
|(12,[5],[1.0])|[0.18867924528301...|
|(12,[5],[1.0])|[0.17821782178217...|
|(12,[5],[1.0])|[0.18811881188118...|
|(12,[5],[1.0])|[0.15533980582524...|
|(12,[5],[1.0])|[0.17142857142857...|
|(12,[5],[1.0])|[0.17821782178217...|
+--------------+--------------------+
only showing top 20 rows



## 9. Results


That's basically it. So, we have here category vector which is the target and our normalized input features are so in Spark meta vector location. In the next notebook, you will see what we can actually do with this data, but that's exactly what we wanted to achieve, and that's basically how Apache Spark and pipeline work.    