In [1]:
!git clone https://github.com/wchill/HMP_Dataset.git

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200217072606-0000
KERNEL_ID = 31a583ee-517f-4188-b0c4-27b958f16f59
Cloning into 'HMP_Dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 0 bytes/s, done.
Checking out files: 100% (848/848), done.


In [2]:
!ls HMP_Dataset

Brush_teeth	Drink_glass  Liedown_bed  Sitdown_chair  final.py
Climb_stairs	Eat_meat     MANUAL.txt   Standup_chair  impdata.py
Comb_hair	Eat_soup     Pour_water   Use_telephone
Descend_stairs	Getup_bed    README.txt   Walk


In [7]:
!ls HMP_Dataset/Climb_stairs | head -n 10

Accelerometer-2011-03-24-10-24-39-climb_stairs-f1.txt
Accelerometer-2011-03-24-10-25-44-climb_stairs-f1.txt
Accelerometer-2011-03-29-09-55-46-climb_stairs-f1.txt
Accelerometer-2011-04-05-18-21-22-climb_stairs-f1.txt
Accelerometer-2011-04-05-18-32-29-climb_stairs-f1.txt
Accelerometer-2011-04-11-11-44-35-climb_stairs-f1.txt
Accelerometer-2011-04-11-11-57-50-climb_stairs-f1.txt
Accelerometer-2011-04-11-11-58-30-climb_stairs-f1.txt
Accelerometer-2011-05-30-08-21-38-climb_stairs-f1.txt
Accelerometer-2011-05-30-08-30-37-climb_stairs-f1.txt


In [10]:
# define the schema for the dataset

from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([
                StructField("x", IntegerType(), True),
                StructField("y", IntegerType(), True),
                StructField("z", IntegerType(), True)                
                    ])

In [14]:
import os
filelist = os.listdir('HMP_Dataset')
filelist

['.git',
 '.idea',
 'Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'MANUAL.txt',
 'Pour_water',
 'README.txt',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone',
 'Walk',
 'final.py',
 'impdata.py']

In [15]:
# select the ones with an "_"

filelist_filtered = [f for f in filelist if '_' in f]
filelist_filtered

['Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'Pour_water',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone']

In [20]:
df = None

from pyspark.sql.functions import lit

for category in filelist_filtered:
    data_files = os.listdir('HMP_Dataset/'+category) # subdirectores
    
    # iterate over the files in the subdirectories and create a temp dataframe
    for data_file in data_files:
        #print(data_file)
        temp_df = spark.read\
                    .option('header', 'false')\ # since there are no headers
                    .option('delimiter', ' ')\ # space separated
                    .csv('HMP_Dataset/'+category+'/'+data_file, schema= schema)
        
        
    # adding literals or strings to the dataframe to identify the source, import the 'lit' function
        
        # here 'class' denotes the folder name where the data comes from
        temp_df = temp_df.withColumn('class', lit(category))
        
        # similary do it for the 'source file'
        temp_df = temp_df.withColumn('source', lit(data_file))
        
        if df is None: # first iteration
            df = temp_df
        else:
            df = df.union(temp_df) # append the data vertically      
        
        

In [21]:
df.show()

+---+---+---+-----------+--------------------+
|  x|  y|  z|      class|              source|
+---+---+---+-----------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 20| 51| 35|Brush_teeth|Accelerometer-201...|
| 18| 49| 34|Brush_teeth|Accelerometer-201...|
| 19| 48| 34|Brush_teeth|Accelerometer-201...|
| 16| 53| 34|Brush_teeth|Accelerometer-201...|
| 18| 52| 35|

In [24]:
# Data transformation
# Create integer represenation of the class
# StringIndexer is an estimator, as it needs to parse through the entire data and remember what integer is assigned to each category string
# Hence StringIndexer can remember the state

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='class', outputCol = 'classIndex')

# use the fit() and transform method

indexed = indexer.fit(df).transform(df)
indexed.show()

+---+---+---+-----------+--------------------+----------+
|  x|  y|  z|      class|              source|classIndex|
+---+---+---+-----------+--------------------+----------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 51| 35|B

In [26]:
# Data tranformation: Step 2
# One Hot encoding of the data
# One hot representation in Apache Spark is not the typical one hot representation
# so here (12,[5],[1.0]), means:
    # total 12 elements
    # at position [5], there is 1.0


from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol='classIndex', outputCol='categoryVec')
#encoded = encoder.fit(indexed).transform(indexed)
# fit() method not available for OnehotEncoder as it is not an 'estimator', only a 'transformer'

encoded = encoder.transform(indexed)
encoded.show()

+---+---+---+-----------+--------------------+----------+--------------+
|  x|  y|  z|      class|              source|classIndex|   categoryVec|
+---+---+---+-----------+--------------------+----------+--------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|     

In [29]:
# Data transformation - Step 3
# Convert the (x, y, z) representation into a vectorized form since Spark ML can work only on vectors

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# the column 'features' will be used by the ML algorithm
vectorAssembler =VectorAssembler(inputCols = ['x', 'y', 'z'], outputCol = 'features')

features_vectorized = vectorAssembler.transform(encoded)

features_vectorized.show()

# the column 'features' created is an 'Apache Spark feature Object'

+---+---+---+-----------+--------------------+----------+--------------+----------------+
|  x|  y|  z|      class|              source|classIndex|   categoryVec|        features|
+---+---+---+-----------+--------------------+----------+--------------+----------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[20.0,50.0,35.0]|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,34.0]|
| 22| 50| 

In [30]:
# Data transformation - Step 4
# Data normalization if required

from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol='features', outputCol='features_norm', p=1.0)
normalized = normalizer.transform(features_vectorized)
normalized.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|      class|              source|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

In [32]:
# Create Pipelines

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])

model = pipeline.fit(df)
prediction = model.transform(df)
prediction.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|      class|              source|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

In [36]:
# Now drop unnecessary columns

# Another way
#df_train = prediction.drop('x').drop('y').drop('z').drop('class')...
columns_to_drop = ['x', 'y', 'z', 'class', 'source', 'features']

df_train = prediction.drop(*columns_to_drop)
df_train.show()

+----------+--------------+--------------------+
|classIndex|   categoryVec|       features_norm|
+----------+--------------+--------------------+
|       5.0|(12,[5],[1.0])|[0.20754716981132...|
|       5.0|(12,[5],[1.0])|[0.20754716981132...|
|       5.0|(12,[5],[1.0])|[0.20183486238532...|
|       5.0|(12,[5],[1.0])|[0.20183486238532...|
|       5.0|(12,[5],[1.0])|[0.19626168224299...|
|       5.0|(12,[5],[1.0])|[0.20560747663551...|
|       5.0|(12,[5],[1.0])|[0.19047619047619...|
|       5.0|(12,[5],[1.0])|[0.20370370370370...|
|       5.0|(12,[5],[1.0])|[0.20754716981132...|
|       5.0|(12,[5],[1.0])|[0.20370370370370...|
|       5.0|(12,[5],[1.0])|[0.2,0.4857142857...|
|       5.0|(12,[5],[1.0])|[0.19230769230769...|
|       5.0|(12,[5],[1.0])|[0.20388349514563...|
|       5.0|(12,[5],[1.0])|[0.20388349514563...|
|       5.0|(12,[5],[1.0])|[0.18867924528301...|
|       5.0|(12,[5],[1.0])|[0.17821782178217...|
|       5.0|(12,[5],[1.0])|[0.18811881188118...|
|       5.0|(12,[5],