REQUIREMENTS 

The purpose of this assignment is to learn how feature engineering boosts model performance. You will apply Discrete Fourier Transformation on the accelerometer sensor time series, transforming the dataset from time to frequency domain. 

After that, you’ll use a classification algorithm of your choice to classify sensor data in multiple categories: brushing teeth, climbing stairs.

REFERENCE   
https://en.wikipedia.org/wiki/Discrete_Fourier_transform   
https://spark.apache.org/docs/3.1.1/ml-classification-regression.html#gradient-boosted-trees-gbts 

This notebook is designed to run in a IBM Watson Studio default runtime (NOT the Watson Studio Apache Spark Runtime as the default runtime with 1 vCPU is free of charge). Therefore, we install Apache Spark in local mode for test purposes only. Don't use it in production.

If running outside Watson Studio, this should work as well. In case you are running in an Apache Spark context outside Watson Studio, remove the Apache Spark setup in the first notebook cells.



In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')
    


In [2]:
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
  Downloading pyspark-2.4.5.tar.gz (217.8 MB)
[K     |████████████████████████████████| 217.8 MB 9.6 kB/s  eta 0:00:01��█▎            | 131.0 MB 56.0 MB/s eta 0:00:02     |█████████████████████▊          | 148.2 MB 56.0 MB/s eta 0:00:02�███████████        | 162.7 MB 56.0 MB/s eta 0:00:01
[?25hCollecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[K     |████████████████████████████████| 197 kB 50.4 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=834cc163a5784b65ad4534f052d8e0a034ea9cef7d68fa69c6c1e46233608ac8
  Stored in directory: /tmp/wsuser/.cache/pip/wheels/01/c0/03/1c241c9c482b647d4d99412a98a5c7f87472728ad41ae55e1e
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [3]:
!pip install https://github.com/IBM/coursera/blob/master/systemml-1.3.0-SNAPSHOT-python.tar.gz?raw=true

Collecting https://github.com/IBM/coursera/blob/master/systemml-1.3.0-SNAPSHOT-python.tar.gz?raw=true
  Downloading https://github.com/IBM/coursera/blob/master/systemml-1.3.0-SNAPSHOT-python.tar.gz?raw=true (9.9 MB)
[K     |████████████████████████████████| 9.9 MB 14.7 MB/s eta 0:00:01
Building wheels for collected packages: systemml
  Building wheel for systemml (setup.py) ... [?25ldone
[?25h  Created wheel for systemml: filename=systemml-1.3.0-py3-none-any.whl size=9882972 sha256=4a81a27a113cbc0e25a53b9fa9e68614d9bfdedd3b0a2a0a9d6bb4ba519e8b4e
  Stored in directory: /tmp/wsuser/.cache/pip/wheels/ed/96/15/1042ed0087d53c21a17788d99d5581169482cfe683f1f6e60a
Successfully built systemml
Installing collected packages: systemml
Successfully installed systemml-1.3.0


In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()


So the first thing we need to ensure is that we are on version 1.3.0 or higher of SystemML. Use the code block below to check the version. 
Note: SystemML 1.3 contains an important and necessary fix.


In [5]:
!mkdir -p /tmp/wsuser/systemml

In [6]:
from systemml import MLContext, dml
ml = MLContext(spark)
ml.setConfigProperty("sysml.localtmpdir", "mkdir /tmp/wsuser/systemml")
print(ml.version())
    
if not ml.version() == '1.3.0-SNAPSHOT':
    raise ValueError('please upgrade to SystemML 1.3.0, or restart your Kernel (Kernel->Restart & Clear Output)')

1.3.0-SNAPSHOT


In [7]:
!wget https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
!mv shake.parquet?raw=true shake.parquet

--2021-03-18 15:55:38--  https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/blob/master/coursera_ml/shake.parquet?raw=true [following]
--2021-03-18 15:55:39--  https://github.com/IBM/skillsnetwork/blob/master/coursera_ml/shake.parquet?raw=true
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/IBM/skillsnetwork/raw/master/coursera_ml/shake.parquet [following]
--2021-03-18 15:55:39--  https://github.com/IBM/skillsnetwork/raw/master/coursera_ml/shake.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/coursera_ml/shake.parquet [following]
--2

Now it’s time to read the sensor data and create a temporary query table.

In [8]:
df=spark.read.parquet('shake.parquet')

In [9]:
df.show()

+-----+---------+-----+-----+-----+
|CLASS| SENSORID|    X|    Y|    Z|
+-----+---------+-----+-----+-----+
|    2| qqqqqqqq| 0.12| 0.12| 0.12|
|    2|aUniqueID| 0.03| 0.03| 0.03|
|    2| qqqqqqqq|-3.84|-3.84|-3.84|
|    2| 12345678| -0.1| -0.1| -0.1|
|    2| 12345678|-0.15|-0.15|-0.15|
|    2| 12345678| 0.47| 0.47| 0.47|
|    2| 12345678|-0.06|-0.06|-0.06|
|    2| 12345678|-0.09|-0.09|-0.09|
|    2| 12345678| 0.21| 0.21| 0.21|
|    2| 12345678|-0.08|-0.08|-0.08|
|    2| 12345678| 0.44| 0.44| 0.44|
|    2|    gholi| 0.76| 0.76| 0.76|
|    2|    gholi| 1.62| 1.62| 1.62|
|    2|    gholi| 5.81| 5.81| 5.81|
|    2| bcbcbcbc| 0.58| 0.58| 0.58|
|    2| bcbcbcbc|-8.24|-8.24|-8.24|
|    2| bcbcbcbc|-0.45|-0.45|-0.45|
|    2| bcbcbcbc| 1.03| 1.03| 1.03|
|    2|aUniqueID|-0.05|-0.05|-0.05|
|    2| qqqqqqqq|-0.44|-0.44|-0.44|
+-----+---------+-----+-----+-----+
only showing top 20 rows



In [10]:
!pip install pixiedust

Collecting pixiedust
  Downloading pixiedust-1.1.19.tar.gz (197 kB)
[K     |████████████████████████████████| 197 kB 18.8 MB/s eta 0:00:01
[?25hCollecting geojson
  Downloading geojson-2.5.0-py2.py3-none-any.whl (14 kB)
Collecting astunparse
  Downloading astunparse-1.6.3-py2.py3-none-any.whl (12 kB)
Collecting colour
  Downloading colour-0.1.5-py2.py3-none-any.whl (23 kB)
Building wheels for collected packages: pixiedust
  Building wheel for pixiedust (setup.py) ... [?25ldone
[?25h  Created wheel for pixiedust: filename=pixiedust-1.1.19-py3-none-any.whl size=321803 sha256=4d3ba45c4607bdbb94444dc31b45c427d3bf7febb2bd0cfd856b014082f724cb
  Stored in directory: /tmp/wsuser/.cache/pip/wheels/05/07/e7/8aca0e820027a63157a916424fd748fb2a2a3e71de5e08eeb8
Successfully built pixiedust
Installing collected packages: geojson, astunparse, colour, pixiedust
Successfully installed astunparse-1.6.3 colour-0.1.5 geojson-2.5.0 pixiedust-1.1.19


In [11]:
# REFERENCE: https://pixiedust.github.io/pixiedust/

import pixiedust
display(df)

Pixiedust database opened successfully
Table VERSION_TRACKER created successfully
Table METRICS_TRACKER created successfully

Share anonymous install statistics? (opt-out instructions)

PixieDust will record metadata on its environment the next time the package is installed or updated. The data is anonymized and aggregated to help plan for future releases, and records only the following values:

{
   "data_sent": currentDate,
   "runtime": "python",
   "application_version": currentPixiedustVersion,
   "space_id": nonIdentifyingUniqueId,
   "config": {
       "repository_id": "https://github.com/ibm-watson-data-lab/pixiedust",
       "target_runtimes": ["Data Science Experience"],
       "event_id": "web",
       "event_organizer": "dev-journeys"
   }
}
You can opt out by calling pixiedust.optOut() in a new cell.


[31mPixiedust runtime updated. Please restart kernel[0m
Table SPARK_PACKAGES created successfully
Table USER_PREFERENCES created successfully
Table service_connections created successfully


DataFrame[CLASS: bigint, SENSORID: string, X: double, Y: double, Z: double]

In [12]:
df.createOrReplaceTempView("df")

We’ll use Apache SystemML to implement Discrete Fourier Transformation. This way all computation continues to happen on the Apache Spark cluster for advanced scalability and performance.

Implementing Discrete Fourier Transformation in a linear algebra programming language is simple. Apache SystemML DML is such a language and as you can see the implementation is straightforward and doesn’t differ too much from the mathematical definition (Just note that the sum operator has been swapped with a vector dot product using the %*% syntax borrowed from R
):

<img style="float: left;" src="https://wikimedia.org/api/rest_v1/media/math/render/svg/1af0a78dc50bbf118ab6bd4c4dcc3c4ff8502223">



In [13]:
dml_script = '''
PI = 3.141592654
N = nrow(signal)

n = seq(0, N-1, 1)
k = seq(0, N-1, 1)

M = (n %*% t(k))*(2*PI/N)

Xa = cos(M) %*% signal
Xb = sin(M) %*% signal

DFT = cbind(Xa, Xb)
'''

GUIDELINES for a better understanding of the above lines:

dml_script = '''
PI = 3.141592654
N = nrow(signal) // Number of rows in the signal vector. note: signal is the input vector for dml_script.

n = seq(0, N-1, 1) // Generate vector n with numbers from 0 to N-1, incrementing step: 1
k = seq(0, N-1, 1) // Generate vector k

M = (n %*% t(k))*(2*PI/N) // Multiply vector n with the transpose of vector k. Multiply the result with a scalar. The output is a matrix M.

Xa = cos(M) %*% signal // cos(M) produces a matrix, which is multiplied with the signal vector. The output is a matrix Xa.
Xb = sin(M) %*% signal // sin(M) produces a matrix, which is multiplied with the signal vector. The output is a matrix Xb.

DFT = cbind(Xa, Xb) // column-wise matrix concatenation: concatenates the second matrix as additional columns to the first matrix.
'''

REFERENCE: http://apache.github.io/systemds/site/dml-language-reference

Now it’s time to create a function which takes a single row Apache Spark data frame as argument (the one containing the accelerometer measurement time series for one axis) and returns the Fourier transformation of it. In addition, we are adding an index column for later joining all axis together and renaming the columns to appropriate names. The result of this function is an Apache Spark DataFrame containing the Fourier Transformation of its input in two columns. 

In [14]:
from pyspark.sql.functions import monotonically_increasing_id

def dft_systemml(signal,name):
    prog = dml(dml_script).input('signal', signal).output('DFT')
    
    return (

    # execute the script inside the SystemML engine running on top of Apache Spark
    ml.execute(prog) 
     
         # read result from SystemML execution back as SystemML Matrix
        .get('DFT') 
     
         # convert SystemML Matrix to ApacheSpark DataFrame 
        .toDF() 
     
         # rename default column names
        .selectExpr('C1 as %sa' % (name), 'C2 as %sb' % (name)) 
     
         # add unique ID per row for later joining
        .withColumn("id", monotonically_increasing_id())
    )        

Now it’s time to create individual DataFrames containing only a subset of the data. For a given accelerometer, we filter each sensor axis for CLASS 1, then each sensor axis for CLASS 2. This means that we’ll get 6 DataFrames. Implement this using the relational API of DataFrames or SparkSQL. Make sure that each DataFrame has only ONE column (only the measurement, without the CLASS column).

In [20]:

x0 = df.filter(df.CLASS==1).select('X') # a DataFrame containing only measurements of class 1 from the x axis
y0 = df.filter(df.CLASS==1).select('Y') # a DataFrame containing only measurements of class 1 from the y axis
z0 = df.filter(df.CLASS==1).select('Z') # a DataFrame containing only measurements of class 1 from the z axis
x1 = ### YOUR CODE HERE ### a DataFrame containing only measurements of class 2 from the x axis
y1 = ### YOUR CODE HERE ### a DataFrame containing only measurements of class 2 from the y axis
z1 = ### YOUR CODE HERE ### a DataFrame containing only measurements of class 2 from the z axis

# Use the following commands to check the content of df and newly created DataFrames:

df.show()

x0.show()
y0.show()
z0.show()
x1.show()
y1.show()
z1.show()

+-----+---------+-----+-----+-----+
|CLASS| SENSORID|    X|    Y|    Z|
+-----+---------+-----+-----+-----+
|    2| qqqqqqqq| 0.12| 0.12| 0.12|
|    2|aUniqueID| 0.03| 0.03| 0.03|
|    2| qqqqqqqq|-3.84|-3.84|-3.84|
|    2| 12345678| -0.1| -0.1| -0.1|
|    2| 12345678|-0.15|-0.15|-0.15|
|    2| 12345678| 0.47| 0.47| 0.47|
|    2| 12345678|-0.06|-0.06|-0.06|
|    2| 12345678|-0.09|-0.09|-0.09|
|    2| 12345678| 0.21| 0.21| 0.21|
|    2| 12345678|-0.08|-0.08|-0.08|
|    2| 12345678| 0.44| 0.44| 0.44|
|    2|    gholi| 0.76| 0.76| 0.76|
|    2|    gholi| 1.62| 1.62| 1.62|
|    2|    gholi| 5.81| 5.81| 5.81|
|    2| bcbcbcbc| 0.58| 0.58| 0.58|
|    2| bcbcbcbc|-8.24|-8.24|-8.24|
|    2| bcbcbcbc|-0.45|-0.45|-0.45|
|    2| bcbcbcbc| 1.03| 1.03| 1.03|
|    2|aUniqueID|-0.05|-0.05|-0.05|
|    2| qqqqqqqq|-0.44|-0.44|-0.44|
+-----+---------+-----+-----+-----+
only showing top 20 rows

+-----+
|    X|
+-----+
|  0.0|
|  0.0|
|  0.0|
|  0.0|
|  0.0|
| 0.01|
|-0.01|
|-0.01|
|  0.0|
|-0.01|
|-0.01

Since we’ve created the DFT function before, we can just call it for each of the 6 DataFrames. Since the result of this function call is also a DataFrame, we can use a pyspark best practice by calling methods on the DataFrame sequentially. So this is what we are going to do:

- Call DFT for each class and accelerometer sensor axis.
- Join the DataFrames on the ID column. 
- Re-add a column containing the class index.
- Stack both Dataframes for each classes together.



In [21]:
from pyspark.sql.functions import lit

df_class_0 = dft_systemml(x0,'x') \
    .join(dft_systemml(y0,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z0,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_1 = dft_systemml(x1,'x') \
    .join(dft_systemml(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_0.union(df_class_1)

df_dft.show()

SystemML Statistics:
Total execution time:		0.678 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.259 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.261 sec.
Number of executed Spark inst:	0.


[Stage 41:>                                                         (0 + 1) / 1]
                                                                                
[Stage 43:>                                                         (0 + 1) / 1]
SystemML Statistics:
Total execution time:		19.940 sec.
Number of executed Spark inst:	6.

                                                                                

[Stage 49:>                                                         (0 + 1) / 1]
                                                                                
[Stage 51:>                                                         (0 + 1) / 1]
SystemML Statistics:
Total execution time:		17.989 sec.
Number of ex

Create a VectorAssembler which consumes the newly created DFT columns and produces a column called “features”.


In [22]:
from pyspark.ml.feature import VectorAssembler

In [23]:
vectorAssembler = VectorAssembler().setInputCols(["xa", "xb", "ya", "yb", "za", "zb", "class"]).setOutputCol("features") ###YOUR_CODE_GOES_HERE###

Insatiate a classifier from the SparkML package and assign it to the classifier variable. Make sure to set the “class” column as target.


In [24]:
from pyspark.ml.classification import GBTClassifier # Gradient-boosted tree classifier

In [25]:
classifier = ### YOUR CODE HERE ### Use GBTClassifier or another classifier of your choice

Let’s train and evaluate…


In [26]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [27]:
model = pipeline.fit(df_dft)

In [28]:
prediction = model.transform(df_dft)

In [29]:
prediction.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
| id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|            features|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
| 26| 0.03912775214058598|-0.09049016952668232| 0.03912775214058598|-0.09049016952668232| 0.03912775214058598|-0.09049016952668232|    0|[0.03912775214058...|[1.32590267922033...|[0.93412217565278...|       0.0|
| 29|0.006260524476137005|-0.05765058448048809|0.006260524476137005|-0.05765058448048809|0.006260524476137005|-0.05765058448048809|    0|[0.006260524476

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("class")
    
binEval.evaluate(prediction) 

1.0

A good result is > 0.8