Welcome to exercise one of week four of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise we’ll work on classification.

Let’s create our DataFrame again:


This notebook is designed to run in a IBM Watson Studio Apache Spark runtime. In case you are running it in an IBM Watson Studio standard runtime or outside Watson Studio, we install Apache Spark in local mode for test purposes only. Please don't use it in production.

In [None]:
!pip install --upgrade pip

In [None]:
if not ('sc' in locals() or 'sc' in globals()):
    print('It seems you are note running in a IBM Watson Studio Apache Spark Notebook. You might be running in a IBM Watson Studio Default Runtime or outside IBM Waston Studio. Therefore installing local Apache Spark environment for you. Please do not use in Production')
    
    from pip import main
    main(['install', 'pyspark==2.4.5'])
    
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession

    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
    
    spark = SparkSession \
        .builder \
        .getOrCreate()

In [1]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200412144937-0000
KERNEL_ID = 2991d032-bc7a-4882-b47f-8dc620227c80
--2020-04-12 14:49:40--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet [following]
--2020-04-12 14:49:40--  https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet [following]
--2020-04-12 14:49:40--  https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.gi

Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

In [2]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

Again, we can re-use our feature engineering pipeline

In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

Now we use LogisticRegression, a simple and basic linear classifier to obtain a classification performance baseline.

In [5]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)

If we look at the schema of the prediction dataframe we see that there is an additional column called prediction which contains the best guess for the class our model predicts.

In [6]:
prediction.printSchema()

root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- class: string (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_norm: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



Let’s evaluate performance by using a build-in functionality of Apache SparkML.

In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction) 

0.20695915616889987

So we get 20% right. This is not bad for a baseline. Note that random guessing would give us only 7%. Of course we need to improve. You might have notices that we’re dealing with a time series here. And we’re not making use of that fact right now as we look at each training example only individually. But this is ok for now. More advanced courses like “Advanced Machine Learning and Signal Processing” (https://www.coursera.org/learn/advanced-machine-learning-signal-processing/) will teach you how to improve accuracy to the nearly 100% by using algorithms like Fourier transformation or wavelet transformation. But let’s skip this for now. In the following cell, please use the RandomForest classifier (you might need to play with the “numTrees” parameter) in the code cell below. You should get an accuracy of around 44%. More on RandomForest can be found here:

https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier


In [31]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import IndexToString

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
indexer = StringIndexer(inputCol="class",outputCol="label").fit(df)

# Create vector assembler then normalize vector values
vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm")

featureIndexer = VectorIndexer(inputCol="eatures_norm", outputCol="indexedFeatures")

# Split the dataset 
# Split the data into training and test sets (30% held out for testing)
splits = df.randomSplit([0.8,0.2])
train_data = splits[0]
test_data = splits[1]

# Train the data 
rf = RandomForestClassifier(labelCol="label", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
indexToString = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=indexer.labels)



In [33]:
#piapline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf, indexToString])

pipline = Pipeline(stages=[indexer, vectorAssembler, normalizer, featureIndexer , rf, indexToString])


# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

In [34]:
predictions.show()

+---+---+---+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  x|  y|  z|              source|         class|label|       features|       features_norm|       rawPrediction|         probability|prediction|
+---+---+---+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  0| 23| 36|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,23.0,36.0]|[0.0,0.3898305084...|[1.25477692694303...|[0.20665460802587...|       0.0|
|  0| 24| 35|Accelerometer-201...| Sitdown_chair|  8.0|[0.0,24.0,35.0]|[0.0,0.4067796610...|[1.25477692694303...|[0.20665460802587...|       0.0|
|  0| 25| 40|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,25.0,40.0]|[0.0,0.3846153846...|[1.25477692694303...|[0.20665460802587...|       0.0|
|  0| 27| 37|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,27.0,37.0]|[0.0,0.421875,0.5...|[1.25477692694303...|[0.20665460

In [35]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[3]
print(rfModel)  # summary only

Test Error = 0.793607
LogisticRegression_49198ced00b3145ec995


In [36]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="class", outputCol="indexedLabel").fit(df)

vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")

pip = Pipeline(stages=[labelIndexer, vectorAssembler])
mod = pip.fit(df)
pred = mod.transform(df)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(pred)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = pred.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)


AnalysisException: "cannot resolve '`label`' given input columns: [x, indexedLabel, features, predictedLabel, rawPrediction, z, source, class, probability, y, prediction, indexedFeatures];;\n'Project [predictedLabel#1607, 'label, features#1517]\n+- Project [x#0, y#1, z#2, source#3, class#4, indexedLabel#1507, features#1517, indexedFeatures#1554, rawPrediction#1563, probability#1573, prediction#1584, if (isnull(cast(prediction#1584 as double))) null else UDF(knownotnull(cast(prediction#1584 as double))) AS predictedLabel#1607]\n   +- Project [x#0, y#1, z#2, source#3, class#4, indexedLabel#1507, features#1517, indexedFeatures#1554, rawPrediction#1563, probability#1573, UDF(rawPrediction#1563) AS prediction#1584]\n      +- Project [x#0, y#1, z#2, source#3, class#4, indexedLabel#1507, features#1517, indexedFeatures#1554, rawPrediction#1563, UDF(rawPrediction#1563) AS probability#1573]\n         +- Project [x#0, y#1, z#2, source#3, class#4, indexedLabel#1507, features#1517, indexedFeatures#1554, UDF(indexedFeatures#1554) AS rawPrediction#1563]\n            +- Project [x#0, y#1, z#2, source#3, class#4, indexedLabel#1507, features#1517, UDF(features#1517) AS indexedFeatures#1554]\n               +- Sample 0.7, 1.0, false, 7589409949849962762\n                  +- Sort [x#0 ASC NULLS FIRST, y#1 ASC NULLS FIRST, z#2 ASC NULLS FIRST, source#3 ASC NULLS FIRST, class#4 ASC NULLS FIRST, indexedLabel#1507 ASC NULLS FIRST, features#1517 ASC NULLS FIRST], false\n                     +- Project [x#0, y#1, z#2, source#3, class#4, indexedLabel#1507, UDF(named_struct(x_double_VectorAssembler_44d9970a1513d68ea99b, cast(x#0 as double), y_double_VectorAssembler_44d9970a1513d68ea99b, cast(y#1 as double), z_double_VectorAssembler_44d9970a1513d68ea99b, cast(z#2 as double))) AS features#1517]\n                        +- Project [x#0, y#1, z#2, source#3, class#4, UDF(cast(class#4 as string)) AS indexedLabel#1507]\n                           +- Relation[x#0,y#1,z#2,source#3,class#4] parquet\n"

In [37]:
# Select example rows to display.
predictions.show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

+---+---+---+--------------------+-------------+------------+---------------+---------------+--------------------+--------------------+----------+--------------+
|  x|  y|  z|              source|        class|indexedLabel|       features|indexedFeatures|       rawPrediction|         probability|prediction|predictedLabel|
+---+---+---+--------------------+-------------+------------+---------------+---------------+--------------------+--------------------+----------+--------------+
|  0| 10| 28|Accelerometer-201...|    Getup_bed|         1.0|[0.0,10.0,28.0]|[0.0,10.0,28.0]|[2.06820818114962...|[0.20682081811496...|       1.0|     Getup_bed|
|  0| 12| 39|Accelerometer-201...|Sitdown_chair|         8.0|[0.0,12.0,39.0]|[0.0,12.0,39.0]|[2.44930119655347...|[0.24493011965534...|       1.0|     Getup_bed|
|  0| 26| 15|Accelerometer-201...| Climb_stairs|         4.0|[0.0,26.0,15.0]|[0.0,26.0,15.0]|[2.06820818114962...|[0.20682081811496...|       1.0|     Getup_bed|
|  0| 26| 42|Accelerometer-2