

In case you want to learn how ETL is done, please run the following notebook first and update the file name below accordingly

https://github.com/IBM/coursera/blob/master/coursera_ml/a2_w1_s3_ETL.ipynb


In [2]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2019-12-29 00:42:38--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet [following]
--2019-12-29 00:42:38--  https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (911K) [application/octet-stream]
Saving to: 'hmp.parquet'


2019-12-29 00:42:39 (19.1 MB/s) - 'hmp.parquet' saved [932997/932997]



In [3]:
df.show()

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|
| 20| 50| 34|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 20| 51| 35|Accelerometer-201...|Brush_teeth|
| 18| 49| 34|Accelerometer-201...|Brush_teeth|
| 19| 48| 34|Accelerometer-201...|Brush_teeth|
| 16| 53| 34|Accelerometer-201...|Brush_teeth|
| 18| 52| 35|

In [4]:
#Query to discover distinct class values 
test = spark.sql("SELECT DISTINCT class from df")
test.show()

+--------------+
|         class|
+--------------+
| Use_telephone|
| Standup_chair|
|      Eat_meat|
|     Getup_bed|
|   Drink_glass|
|    Pour_water|
|     Comb_hair|
|          Walk|
|  Climb_stairs|
| Sitdown_chair|
|   Liedown_bed|
|Descend_stairs|
|   Brush_teeth|
|      Eat_soup|
+--------------+



In [5]:
df_two_class = spark.sql("select * from df where class in ('Use_telephone','Standup_chair')")

In [6]:
splits = df_two_class.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

In [8]:
# Evaluating the pipeline steps
# 1 - Indexing the class columm
indexed = indexer.fit(df_train).transform(df_train)
indexed.show()

+---+---+---+--------------------+-------------+-----+
|  x|  y|  z|              source|        class|label|
+---+---+---+--------------------+-------------+-----+
|  0| 30| 24|Accelerometer-201...|Standup_chair|  0.0|
|  0| 31| 17|Accelerometer-201...|Standup_chair|  0.0|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|
|  0| 34| 29|Accelerometer-201...|Standup_chair|  0.0|
|  0| 35| 28|Accelerometer-201...|Standup_chair|  0.0|
|  0| 36| 37|Accelerometer-201...|Standup_chair|  0.0|
|  0| 37| 26|Accelerometer-201...|Standup_chair|  0.0|
|  0| 37| 30|Accelerometer-201...|Standup_chair|  0.0|
|  0| 37| 35|Accelerometer-201...|Standup_chair|  0.0|
|  0| 40| 34|Accelerometer-201...|Standup_chair|  0.0|
|  0| 42| 34|Accelerometer-201...|Standup_chair|  0.0|
|  0| 43| 34|Accelerometer-201...|Standup_chair|  0.0|
|  0| 43| 34|Accelerometer-201...|Standup_chair|  0.0|
|  0| 44| 21|Accelerometer-201...|Standup_chair|  0.0|
|  0| 56| 

In [9]:
# Evaluating the pipeline steps
# 2 - VectorIndexing the class columm
vectorized = vectorAssembler.transform(indexed)
vectorized.show()

+---+---+---+--------------------+-------------+-----+---------------+
|  x|  y|  z|              source|        class|label|       features|
+---+---+---+--------------------+-------------+-----+---------------+
|  0| 30| 24|Accelerometer-201...|Standup_chair|  0.0|[0.0,30.0,24.0]|
|  0| 31| 17|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,17.0]|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,32.0]|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,32.0]|
|  0| 34| 29|Accelerometer-201...|Standup_chair|  0.0|[0.0,34.0,29.0]|
|  0| 35| 28|Accelerometer-201...|Standup_chair|  0.0|[0.0,35.0,28.0]|
|  0| 36| 37|Accelerometer-201...|Standup_chair|  0.0|[0.0,36.0,37.0]|
|  0| 37| 26|Accelerometer-201...|Standup_chair|  0.0|[0.0,37.0,26.0]|
|  0| 37| 30|Accelerometer-201...|Standup_chair|  0.0|[0.0,37.0,30.0]|
|  0| 37| 35|Accelerometer-201...|Standup_chair|  0.0|[0.0,37.0,35.0]|
|  0| 40| 34|Accelerometer-201...|Standup_chair|  0.0|[0.0,40.0,34.0]|
|  0| 

In [10]:
# Evaluating the pipeline steps
# 3 - Normalizing the features columm
normalized = normalizer.transform(vectorized)
normalized.show()


+---+---+---+--------------------+-------------+-----+---------------+--------------------+
|  x|  y|  z|              source|        class|label|       features|       features_norm|
+---+---+---+--------------------+-------------+-----+---------------+--------------------+
|  0| 30| 24|Accelerometer-201...|Standup_chair|  0.0|[0.0,30.0,24.0]|[0.0,0.5555555555...|
|  0| 31| 17|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,17.0]|[0.0,0.6458333333...|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,32.0]|[0.0,0.4920634920...|
|  0| 31| 32|Accelerometer-201...|Standup_chair|  0.0|[0.0,31.0,32.0]|[0.0,0.4920634920...|
|  0| 34| 29|Accelerometer-201...|Standup_chair|  0.0|[0.0,34.0,29.0]|[0.0,0.5396825396...|
|  0| 35| 28|Accelerometer-201...|Standup_chair|  0.0|[0.0,35.0,28.0]|[0.0,0.5555555555...|
|  0| 36| 37|Accelerometer-201...|Standup_chair|  0.0|[0.0,36.0,37.0]|[0.0,0.4931506849...|
|  0| 37| 26|Accelerometer-201...|Standup_chair|  0.0|[0.0,37.0,26.0]|[0.0,0.587

In [11]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features_norm", maxIter=10)

In [12]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer,gbt])

In [13]:
model = pipeline.fit(df_train)

In [14]:
prediction = model.transform(df_train)

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("label")
    
binEval.evaluate(prediction) 

0.910097840132915

In [16]:
prediction = model.transform(df_test)

In [17]:
binEval.evaluate(prediction) 

0.9074938574938575

In [18]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [19]:
#paramGrid = ParamGridBuilder() \
#    .addGrid(normalizer.p, [1.0, 2.0, 10.0]) \
#    .addGrid(gbt.maxBins, [2,4,8,16]) \
#    .addGrid(gbt.maxDepth, [2,4,8,16]) \
#    .build()

paramGrid = ParamGridBuilder() \
    .addGrid(normalizer.p, [1.0, 2.0]) \
    .addGrid(gbt.maxBins, [2,16]) \
    .addGrid(gbt.maxDepth, [2,16]) \
    .build()

In [20]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2)

In [21]:
cvModel = crossval.fit(df_train)

In [22]:
prediction = cvModel.transform(df_test)

In [23]:
binEval.evaluate(prediction)

0.9142506142506143

In [24]:
cvModel.bestModel.stages[-1].explainParams()

"cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (undefined)\nfeaturesCol: features column name (default: features, current: features_norm)\nimpurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (undefined)\nlabelCol: label column name (default: label, current: label)\nlossType: Loss function which GBT tries to minimize (case-insensitive). Suppor