In [1]:
! git clone https://github.com/wchill/HMP_dataset.git

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200405172126-0000
KERNEL_ID = 3465d6be-ca08-41d9-9ef1-91f5c270cae5
Cloning into 'HMP_dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 0 bytes/s, done.
Checking out files: 100% (848/848), done.


In [4]:
! ls HMP_dataset

Brush_teeth	Drink_glass  Liedown_bed  Sitdown_chair  final.py
Climb_stairs	Eat_meat     MANUAL.txt   Standup_chair  impdata.py
Comb_hair	Eat_soup     Pour_water   Use_telephone
Descend_stairs	Getup_bed    README.txt   Walk


In this work, the pipeline for preprocessing of the accelerometer for various tasks is done

In [5]:
! ls HMP_dataset/Brush_teeth

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt


### So, now, iteratively progressing  through these folders to create the spark dataframes

1. Create Schema

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType

In [8]:
schema=StructType([
        StructField('x', IntegerType(), True),
        StructField('y', IntegerType(), True),
        StructField('z', IntegerType(), True)
])

In [9]:
import os

In [13]:
file_list=os.listdir('HMP_dataset')
file_list

['.git',
 '.idea',
 'Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'MANUAL.txt',
 'Pour_water',
 'README.txt',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone',
 'Walk',
 'final.py',
 'impdata.py']

In [19]:
#lets get rid of unwanted datas
file_list_filtered=[s for s in file_list if '_' in s]
file_list_filtered

['Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'Pour_water',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone']

In [22]:
#initiate empty df
df=None

# now iterate over these file categories and inside categories, the datafiles
from pyspark.sql.functions import lit

for category in file_list_filtered:
    data_files=os.listdir('HMP_dataset/'+category)
        #for each file we have a temperory data frame
    for data_file in data_files:    
        temp_df=spark.read.option('header', 'false').option('delimiter', " ").csv('HMP_dataset/'+category+'/'+data_file, schema = schema)
        #now add the source file (file name) and the y values i.e. categories too to the dataframes here using lit function.
        temp_df= temp_df.withColumn('class', lit(category)) #added the dependent values here
        temp_df= temp_df.withColumn('source', lit(data_file)) #added the source file

        if df is None:
            df=temp_df
        else:
            df=df.union(temp_df)

In [23]:
# LAZY spark
df.show()

+---+---+---+-----------+--------------------+
|  x|  y|  z|      class|              source|
+---+---+---+-----------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 20| 51| 35|Brush_teeth|Accelerometer-201...|
| 18| 49| 34|Brush_teeth|Accelerometer-201...|
| 19| 48| 34|Brush_teeth|Accelerometer-201...|
| 16| 53| 34|Brush_teeth|Accelerometer-201...|
| 18| 52| 35|

### The next step is to transform the  data. 

2. Integer representation of string. I.e Label Encoding of the class values

In [25]:
from pyspark.ml.feature import StringIndexer #for label encoding

indexer=StringIndexer(inputCol='class', outputCol='ClassIndex')
indexed=indexer.fit(df).transform(df)

indexed.show()

+---+---+---+-----------+--------------------+----------+
|  x|  y|  z|      class|              source|ClassIndex|
+---+---+---+-----------+--------------------+----------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 51| 35|B

3. One Hot encoding of the class Labels obtained in previous step. Here, the sparse matrix of the labels are done. 

They are in form of example: [12, 3, 1]- meaning: 12 digits, on 3rd position, there is a 1

PS: One hot encoder of pyspark has only transform function

In [26]:
from pyspark.ml.feature import OneHotEncoder

encoder=OneHotEncoder(inputCol='ClassIndex', outputCol='CategoryVec')
encoded=encoder.transform(indexed)

encoded.show()

+---+---+---+-----------+--------------------+----------+--------------+
|  x|  y|  z|      class|              source|ClassIndex|   CategoryVec|
+---+---+---+-----------+--------------------+----------+--------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|     

PS: Indexer is a estimator whereas the one hot encoder is a pure transformer as the indexer has to go through all the strings to label index them

4. To convert the values x, y, z (independent variables) into the vectors as the pyspark can only work on vector objects

In [31]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vectorAssembler=VectorAssembler(inputCols=['x', 'y', 'z'],
                               outputCol='features')
features_vectorized=vectorAssembler.transform(encoded)

features_vectorized.show()

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/opt/ibm/spark/python/pyspark/ml/wrapper.py", line 105, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'
Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/opt/ibm/spark/python/pyspark/ml/wrapper.py", line 105, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


+---+---+---+-----------+--------------------+----------+--------------+----------------+
|  x|  y|  z|      class|              source|ClassIndex|   CategoryVec|        features|
+---+---+---+-----------+--------------------+----------+--------------+----------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[20.0,50.0,35.0]|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,34.0]|
| 22| 50| 

5. Normalizing the data (optional)

In [33]:
from pyspark.ml.feature import Normalizer
normalizer=Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
normalized_data=normalizer.transform(features_vectorized)

normalized_data.show()####LLLLLAAAAZZZZYYYYYYY!!!!!!!

+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|      class|              source|ClassIndex|   CategoryVec|        features|       features_norm|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

### CREATING PIPELINE FOR ALL THE  ABOVE IN ONE MOVE:

In [35]:
from pyspark.ml import Pipeline
pipeline=Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])#put in all the stages here.. 
model=pipeline.fit(df)

prediction=model.transform(df)
prediction.show()

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/opt/ibm/spark/python/pyspark/ml/wrapper.py", line 105, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
AttributeError: 'Normalizer' object has no attribute '_java_obj'


+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|      class|              source|ClassIndex|   CategoryVec|        features|       features_norm|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

### Hence, all the above 5 stages' final output matched with this output