# PLEASE NOTE: Please run this notebook OUTSIDE a Spark notebook as it should run in a plain Default Python 3.6 Free Environment

This is the last assignment for the Coursera course "Advanced Machine Learning and Signal Processing"

Just execute all cells one after the other and you are done - just note that in the last one you should update your email address (the one you've used for coursera) and obtain a submission token, you get this from the programming assignment directly on coursera.

Please fill in the sections labelled with "###YOUR_CODE_GOES_HERE###"

The purpose of this assignment is to learn how feature engineering boosts model performance. You will apply Discrete Fourier Transformation on the accelerometer sensor time series and therefore transforming the dataset from the time to the frequency domain. 

After that, you’ll use a classification algorithm of your choice to create a model and submit the new predictions to the grader. Done.



In [2]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')
    


In [3]:
!pip install pyspark==2.4.5



In [4]:
!pip install https://github.com/IBM/coursera/blob/master/systemml-1.3.0-SNAPSHOT-python.tar.gz?raw=true

Collecting https://github.com/IBM/coursera/blob/master/systemml-1.3.0-SNAPSHOT-python.tar.gz?raw=true
  Using cached https://github.com/IBM/coursera/blob/master/systemml-1.3.0-SNAPSHOT-python.tar.gz?raw=true
Building wheels for collected packages: systemml
  Building wheel for systemml (setup.py) ... [?25ldone
[?25h  Stored in directory: /home/dsxuser/.cache/pip/wheels/aa/bf/28/4344dd13abd8b9b6cbd4032baf4b851873d2e2288a65631fd2
Successfully built systemml


In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()


So the first thing we need to ensure is that we are on the latest version of SystemML, which is 1.3.0 (as of 20th March'19) Please use the code block below to check if you are already on 1.3.0 or higher. 1.3 contains a necessary fix, that's we are running against the SNAPSHOT


In [6]:
!mkdir -p /home/dsxuser/work/systemml


In [7]:
from systemml import MLContext
ml = MLContext(spark)
ml.setConfigProperty("sysml.localtmpdir", "mkdir /home/dsxuser/work/systemml")
print(ml.version())
    
if not ml.version() == '1.3.0-SNAPSHOT':
    raise ValueError('please upgrade to SystemML 1.3.0, or restart your Kernel (Kernel->Restart & Clear Output)')

1.3.0-SNAPSHOT


In [8]:
!wget https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
!mv shake.parquet?raw=true shake.parquet

--2020-04-22 01:24:54--  https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/blob/master/coursera_ml/shake.parquet?raw=true [following]
--2020-04-22 01:24:55--  https://github.com/IBM/skillsnetwork/blob/master/coursera_ml/shake.parquet?raw=true
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/IBM/skillsnetwork/raw/master/coursera_ml/shake.parquet [following]
--2020-04-22 01:24:55--  https://github.com/IBM/skillsnetwork/raw/master/coursera_ml/shake.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/coursera_ml/shake.parquet [following]
--2

Now it’s time to read the sensor data and create a temporary query table.

In [9]:
df=spark.read.parquet('shake.parquet')

In [10]:
df.show()

+-----+---------+-----+-----+-----+
|CLASS| SENSORID|    X|    Y|    Z|
+-----+---------+-----+-----+-----+
|    2| qqqqqqqq| 0.12| 0.12| 0.12|
|    2|aUniqueID| 0.03| 0.03| 0.03|
|    2| qqqqqqqq|-3.84|-3.84|-3.84|
|    2| 12345678| -0.1| -0.1| -0.1|
|    2| 12345678|-0.15|-0.15|-0.15|
|    2| 12345678| 0.47| 0.47| 0.47|
|    2| 12345678|-0.06|-0.06|-0.06|
|    2| 12345678|-0.09|-0.09|-0.09|
|    2| 12345678| 0.21| 0.21| 0.21|
|    2| 12345678|-0.08|-0.08|-0.08|
|    2| 12345678| 0.44| 0.44| 0.44|
|    2|    gholi| 0.76| 0.76| 0.76|
|    2|    gholi| 1.62| 1.62| 1.62|
|    2|    gholi| 5.81| 5.81| 5.81|
|    2| bcbcbcbc| 0.58| 0.58| 0.58|
|    2| bcbcbcbc|-8.24|-8.24|-8.24|
|    2| bcbcbcbc|-0.45|-0.45|-0.45|
|    2| bcbcbcbc| 1.03| 1.03| 1.03|
|    2|aUniqueID|-0.05|-0.05|-0.05|
|    2| qqqqqqqq|-0.44|-0.44|-0.44|
+-----+---------+-----+-----+-----+
only showing top 20 rows



In [11]:
!pip install pixiedust



In [12]:
import pixiedust
display(df)

Pixiedust database opened successfully


DataFrame[CLASS: bigint, SENSORID: string, X: double, Y: double, Z: double]

In [13]:
df.createOrReplaceTempView("df")

We’ll use Apache SystemML to implement Discrete Fourier Transformation. This way all computation continues to happen on the Apache Spark cluster for advanced scalability and performance.

In [14]:
from systemml import MLContext, dml
ml = MLContext(spark)

As you’ve learned from the lecture, implementing Discrete Fourier Transformation in a linear algebra programming language is simple. Apache SystemML DML is such a language and as you can see the implementation is straightforward and doesn’t differ too much from the mathematical definition (Just note that the sum operator has been swapped with a vector dot product using the %*% syntax borrowed from R
):

<img style="float: left;" src="https://wikimedia.org/api/rest_v1/media/math/render/svg/1af0a78dc50bbf118ab6bd4c4dcc3c4ff8502223">



In [15]:
dml_script = '''
PI = 3.141592654
N = nrow(signal)

n = seq(0, N-1, 1)
k = seq(0, N-1, 1)

M = (n %*% t(k))*(2*PI/N)

Xa = cos(M) %*% signal
Xb = sin(M) %*% signal

DFT = cbind(Xa, Xb)
'''

Now it’s time to create a function which takes a single row Apache Spark data frame as argument (the one containing the accelerometer measurement time series for one axis) and returns the Fourier transformation of it. In addition, we are adding an index column for later joining all axis together and renaming the columns to appropriate names. The result of this function is an Apache Spark DataFrame containing the Fourier Transformation of its input in two columns. 


In [16]:
from pyspark.sql.functions import monotonically_increasing_id

def dft_systemml(signal,name):
    prog = dml(dml_script).input('signal', signal).output('DFT')
    
    return (

    #execute the script inside the SystemML engine running on top of Apache Spark
    ml.execute(prog) 
     
         #read result from SystemML execution back as SystemML Matrix
        .get('DFT') 
     
         #convert SystemML Matrix to ApacheSpark DataFrame 
        .toDF() 
     
         #rename default column names
        .selectExpr('C1 as %sa' % (name), 'C2 as %sb' % (name)) 
     
         #add unique ID per row for later joining
        .withColumn("id", monotonically_increasing_id())
    )
        




Now it’s time to create individual DataFrames containing only a subset of the data. We filter simultaneously for accelerometer each sensor axis and one for each class. This means you’ll get 6 DataFrames. Please implement this using the relational API of DataFrames or SparkSQL. Please use class 1 and 2 and not 0 and 1. <h1><span style="color:red">Please make sure that each DataFrame has only ONE colum (only the measurement, eg. not CLASS column)</span></h1>


In [17]:
df.filter(df.CLASS == 1).select('X').collect()

[Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.01),
 Row(X=-0.01),
 Row(X=-0.01),
 Row(X=0.0),
 Row(X=-0.01),
 Row(X=-0.01),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=-0.01),
 Row(X=0.0),
 Row(X=0.01),
 Row(X=-0.01),
 Row(X=-0.01),
 Row(X=0.01),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=-0.02),
 Row(X=0.01),
 Row(X=0.01),
 Row(X=-0.01),
 Row(X=-0.01),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.01),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.01),
 Row(X=-0.01),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.01),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=-0.01),
 Row(X=0.01),
 Row(X=0.0),
 Row(X=0.0),
 Row(X=-0.01),
 Row(X=0.0),
 Row(X=-0.02),
 Row(X=0.0),
 

In [32]:
x0 = df[df.CLASS == 1].select('X') #=> Please create a DataFrame containing only measurements of class 0 from the x axis
y0 = df[df.CLASS == 1].select('Y') # => Please create a DataFrame containing only measurements of class 0 from the y axis
z0 = df[df.CLASS == 1].select('Z') # => Please create a DataFrame containing only measurements of class 0 from the z axis
x1 = df[df.CLASS == 2].select('X') # => Please create a DataFrame containing only measurements of class 1 from the x axis
y1 = df[df.CLASS == 2].select('Y') # => Please create a DataFrame containing only measurements of class 1 from the y axis
z1 = df[df.CLASS == 2].select('Z') # => Please create a DataFrame containing only measurements of class 1 from the z axis

Since we’ve created this cool DFT function before, we can just call it for each of the 6 DataFrames now. And since the result of this function call is a DataFrame again we can use the pyspark best practice in simply calling methods on it sequentially. So what we are doing is the following:

- Calling DFT for each class and accelerometer sensor axis.
- Joining them together on the ID column. 
- Re-adding a column containing the class index.
- Stacking both Dataframes for each classes together



In [33]:
from pyspark.sql.functions import lit

df_class_0 = dft_systemml(x0,'x') \
    .join(dft_systemml(y0,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z0,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_1 = dft_systemml(x1,'x') \
    .join(dft_systemml(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_0.union(df_class_1)

df_dft.show()

SystemML Statistics:
Total execution time:		0.110 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.087 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.099 sec.
Number of executed Spark inst:	0.


[Stage 107:>                                                        (0 + 2) / 2]
                                                                                
[Stage 109:>                                                        (0 + 2) / 2]
SystemML Statistics:
Total execution time:		10.036 sec.
Number of executed Spark inst:	6.

                                                                                

[Stage 115:>                                                        (0 + 2) / 2]
                                                                                
[Stage 117:>                                                        (0 + 2) / 2]
SystemML Statistics:
Total execution time:		9.972 sec.
Number of exe

Please create a VectorAssembler which consumes the newly created DFT columns and produces a column “features”


In [34]:
from pyspark.ml.feature import VectorAssembler

In [40]:
vectorAssembler = VectorAssembler(inputCols=['xa', 'xb', 'ya', 'yb', 'za', 'zb'], outputCol='features')

Please insatiate a classifier from the SparkML package and assign it to the classifier variable. Make sure to set the “class” column as target.


In [41]:
from pyspark.ml.classification import GBTClassifier

In [42]:
classifier = GBTClassifier(labelCol='class')

Let’s train and evaluate…


In [43]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [44]:
model = pipeline.fit(df_dft)

In [45]:
prediction = model.transform(df_dft)

In [46]:
prediction.show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|        id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|            features|       rawPrediction|         probability|prediction|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|        26|0.007432298669951747|-0.00394663166701...|0.007432298669951747|-0.00394663166701...|0.007432298669951747|-0.00394663166701...|    0|[0.00743229866995...|[0.93281819477188...|[0.86595256350908...|       0.0|
|        29| 0.02589077158423112|-0.02414578651495463| 0.02589077158423112|-0.02414578651495463| 0.02589077158423112|-0.0241

In [47]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("class")
    
binEval.evaluate(prediction) 

0.9981761070017225

If you are happy with the result (I’m happy with > 0.8) please submit your solution to the grader by executing the following cells, please don’t forget to obtain an assignment submission token (secret) from the Courera’s graders web page and paste it to the “secret” variable below, including your email address you’ve used for Coursera. 


In [48]:
!rm -Rf a2_m4.json

In [49]:
prediction = prediction.repartition(1)
prediction.write.json('a2_m4.json')

In [50]:
!rm -f rklib.py
!wget wget https://raw.githubusercontent.com/IBM/coursera/master/rklib.py

--2020-04-22 01:38:35--  http://wget/
Resolving wget (wget)... failed: Name or service not known.
wget: unable to resolve host address ‘wget’
--2020-04-22 01:38:35--  https://raw.githubusercontent.com/IBM/coursera/master/rklib.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2540 (2.5K) [text/plain]
Saving to: ‘rklib.py’


2020-04-22 01:38:35 (29.9 MB/s) - ‘rklib.py’ saved [2540/2540]

FINISHED --2020-04-22 01:38:35--
Total wall clock time: 0.3s
Downloaded: 1 files, 2.5K in 0s (29.9 MB/s)


In [51]:
from rklib import zipit
zipit('a2_m4.json.zip','a2_m4.json')

In [52]:
!base64 a2_m4.json.zip > a2_m4.json.zip.base64

In [53]:
from rklib import submit
key = "-fBiYHYDEeiR4QqiFhAvkA"
part = "IjtJk"
email = 'andy.lightcap@gmail.com'
submission_token = 'xRvM7qujQjVo1iHz' # (have a look here if you need more information on how to obtain the token https://youtu.be/GcDo0Rwe06U?t=276)

with open('a2_m4.json.zip.base64', 'r') as myfile:
    data=myfile.read()
submit(email, submission_token, key, part, [part], data)

Submission successful, please check on the coursera grader page for the status
-------------------------
{"elements":[{"itemId":"B8wXV","id":"f_F-qCtuEei_fRLwaVDk3g~B8wXV~JgYtdoQ6Eeq_9gr0kej7HQ","courseId":"f_F-qCtuEei_fRLwaVDk3g"}],"paging":{},"linked":{}}
-------------------------
