In case you want to learn how ETL is done, please run the following notebook first and update the file name below accordingly

https://github.com/IBM/coursera/blob/master/coursera_ml/a2_w1_s3_ETL.ipynb

In [1]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20191128124910-0000
KERNEL_ID = d77e6a47-aac5-419c-a127-01ed22c84b9a
--2019-11-28 12:49:13--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet [following]
--2019-11-28 12:49:13--  https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 199.232.8.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|199.232.8.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (911K) [application/octet-stream]
Saving to: 'hmp.parquet'


2019-11-28 12:49:13 (22.2 MB/s) - 'hmp.parquet' saved [932997/932997]



In [2]:
from pyspark.ml.feature import StringIndexer


indexer = StringIndexer(inputCol="class", outputCol="classIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
indexed.select('classIndex').distinct().show()

+---+---+---+--------------------+-----------+----------+
|  x|  y|  z|              source|      class|classIndex|
+---+---+---+--------------------+-----------+----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|       6.0|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|       6.0|
| 20| 50| 34|Accelerometer-201...|Brush_teeth|       6.0|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|       6.0|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|       6.0|
| 20| 51| 35|A

In [3]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer


encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()


+---+---+---+--------------------+-----------+----------+--------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|
+---+---+---+--------------------+-----------+----------+--------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|     

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

# For your special case that has string instead of doubles you should cast them first.
# expr = [col(c).cast("Double").alias(c) 
#         for c in vectorAssembler.getInputCols()]

# df2 = df2.select(*expr)
features_vectorized = vectorAssembler.transform(encoded)
features_vectorized.show()

+---+---+---+--------------------+-----------+----------+--------------+----------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|        features|
+---+---+---+--------------------+-----------+----------+--------------+----------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.0,52.0,34.0]|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,51.0,34.0]|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[20.0,50.0,35.0]|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,34.0]|
| 22| 50| 

In [5]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData.show()



+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

In [8]:
df_train = l1NormData.drop("source").drop("class").drop("classIndex").drop("features").drop("x").drop("y").drop("z")

In [9]:
df_train.show()

+--------------+--------------------+
|   categoryVec|       features_norm|
+--------------+--------------------+
|(13,[6],[1.0])|[0.20754716981132...|
|(13,[6],[1.0])|[0.20754716981132...|
|(13,[6],[1.0])|[0.20183486238532...|
|(13,[6],[1.0])|[0.20183486238532...|
|(13,[6],[1.0])|[0.19626168224299...|
|(13,[6],[1.0])|[0.20560747663551...|
|(13,[6],[1.0])|[0.19047619047619...|
|(13,[6],[1.0])|[0.20370370370370...|
|(13,[6],[1.0])|[0.20754716981132...|
|(13,[6],[1.0])|[0.20370370370370...|
|(13,[6],[1.0])|[0.2,0.4857142857...|
|(13,[6],[1.0])|[0.19230769230769...|
|(13,[6],[1.0])|[0.20388349514563...|
|(13,[6],[1.0])|[0.20388349514563...|
|(13,[6],[1.0])|[0.18867924528301...|
|(13,[6],[1.0])|[0.17821782178217...|
|(13,[6],[1.0])|[0.18811881188118...|
|(13,[6],[1.0])|[0.15533980582524...|
|(13,[6],[1.0])|[0.17142857142857...|
|(13,[6],[1.0])|[0.17821782178217...|
+--------------+--------------------+
only showing top 20 rows



In [6]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])
model = pipeline.fit(df)
prediction = model.transform(df)

In [7]:
prediction.show()

+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|