# DFT on Accelerometer Sensor

The purpose of this project is to learn how feature engineering boosts model performance. I will apply Discrete Fourier Transformation on the accelerometer sensor time series and therefore transforming the dataset from the time to the frequency domain. 

After that, I’ll use a classification algorithm to create a model and get predictions.

In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))

In [2]:
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 134kB/s eta 0:00:011     |███████████████████████████     | 183.6MB 502kB/s eta 0:01:09
[?25hCollecting py4j==0.10.7 (from pyspark==2.4.5)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 29.3MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /home/dsxuser/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [8]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()


So the first thing I need to ensure is that I am on the latest version of SystemML, which is 1.3.0.T


In [9]:
!mkdir -p /home/dsxuser/work/systemml

In [10]:
from systemml import MLContext, dml
ml = MLContext(spark)
ml.setConfigProperty("sysml.localtmpdir", "mkdir /home/dsxuser/work/systemml")
print(ml.version())
    
if not ml.version() == '1.3.0-SNAPSHOT':
    raise ValueError('please upgrade to SystemML 1.3.0, or restart your Kernel (Kernel->Restart & Clear Output)')

1.3.0-SNAPSHOT


In [11]:
!wget https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
!mv shake.parquet?raw=true shake.parquet

--2020-06-21 16:31:10--  https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/blob/master/coursera_ml/shake.parquet?raw=true [following]
--2020-06-21 16:31:10--  https://github.com/IBM/skillsnetwork/blob/master/coursera_ml/shake.parquet?raw=true
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/IBM/skillsnetwork/raw/master/coursera_ml/shake.parquet [following]
--2020-06-21 16:31:10--  https://github.com/IBM/skillsnetwork/raw/master/coursera_ml/shake.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/coursera_ml/shake.parquet [following]
--2

Now it’s time to read the sensor data and create a temporary query table.

In [12]:
df=spark.read.parquet('shake.parquet')

In [13]:
df.show()

+-----+---------+-----+-----+-----+
|CLASS| SENSORID|    X|    Y|    Z|
+-----+---------+-----+-----+-----+
|    2| qqqqqqqq| 0.12| 0.12| 0.12|
|    2|aUniqueID| 0.03| 0.03| 0.03|
|    2| qqqqqqqq|-3.84|-3.84|-3.84|
|    2| 12345678| -0.1| -0.1| -0.1|
|    2| 12345678|-0.15|-0.15|-0.15|
|    2| 12345678| 0.47| 0.47| 0.47|
|    2| 12345678|-0.06|-0.06|-0.06|
|    2| 12345678|-0.09|-0.09|-0.09|
|    2| 12345678| 0.21| 0.21| 0.21|
|    2| 12345678|-0.08|-0.08|-0.08|
|    2| 12345678| 0.44| 0.44| 0.44|
|    2|    gholi| 0.76| 0.76| 0.76|
|    2|    gholi| 1.62| 1.62| 1.62|
|    2|    gholi| 5.81| 5.81| 5.81|
|    2| bcbcbcbc| 0.58| 0.58| 0.58|
|    2| bcbcbcbc|-8.24|-8.24|-8.24|
|    2| bcbcbcbc|-0.45|-0.45|-0.45|
|    2| bcbcbcbc| 1.03| 1.03| 1.03|
|    2|aUniqueID|-0.05|-0.05|-0.05|
|    2| qqqqqqqq|-0.44|-0.44|-0.44|
+-----+---------+-----+-----+-----+
only showing top 20 rows



In [14]:
!pip install pixiedust



In [11]:
import pixiedust
display(df)

Pixiedust database opened successfully
Table VERSION_TRACKER created successfully
Table METRICS_TRACKER created successfully

Share anonymous install statistics? (opt-out instructions)

PixieDust will record metadata on its environment the next time the package is installed or updated. The data is anonymized and aggregated to help plan for future releases, and records only the following values:

{
   "data_sent": currentDate,
   "runtime": "python",
   "application_version": currentPixiedustVersion,
   "space_id": nonIdentifyingUniqueId,
   "config": {
       "repository_id": "https://github.com/ibm-watson-data-lab/pixiedust",
       "target_runtimes": ["Data Science Experience"],
       "event_id": "web",
       "event_organizer": "dev-journeys"
   }
}
You can opt out by calling pixiedust.optOut() in a new cell.


[31mPixiedust runtime updated. Please restart kernel[0m
Table SPARK_PACKAGES created successfully
Table USER_PREFERENCES created successfully
Table service_connections created successfully


DataFrame[CLASS: bigint, SENSORID: string, X: double, Y: double, Z: double]

In [15]:
df.createOrReplaceTempView("df")

I’ll use Apache SystemML to implement Discrete Fourier Transformation. This way all computation continues to happen on the Apache Spark cluster for advanced scalability and performance.

To implement Discrete Fourier Transformation in a linear algebra programming language is simple. Apache SystemML DML is such a language and as we can see the implementation is straightforward and doesn’t differ too much from the mathematical definition (Just note that the sum operator has been swapped with a vector dot product using the %*% syntax borrowed from R
):

<img style="float: left;" src="https://wikimedia.org/api/rest_v1/media/math/render/svg/1af0a78dc50bbf118ab6bd4c4dcc3c4ff8502223">



In [16]:
dml_script = '''
PI = 3.141592654
N = nrow(signal)

n = seq(0, N-1, 1)
k = seq(0, N-1, 1)

M = (n %*% t(k))*(2*PI/N)

Xa = cos(M) %*% signal
Xb = sin(M) %*% signal

DFT = cbind(Xa, Xb)
'''

Now it’s time to create a function which takes a single row Apache Spark data frame as argument (the one containing the accelerometer measurement time series for one axis) and returns the Fourier transformation of it. In addition, I am adding an index column for later joining all axis together and renaming the columns to appropriate names. The result of this function is an Apache Spark DataFrame containing the Fourier Transformation of its input in two columns. 


In [17]:
from pyspark.sql.functions import monotonically_increasing_id

def dft_systemml(signal,name):
    prog = dml(dml_script).input('signal', signal).output('DFT')
    
    return (

    #execute the script inside the SystemML engine running on top of Apache Spark
    ml.execute(prog) 
     
         #read result from SystemML execution back as SystemML Matrix
        .get('DFT') 
     
         #convert SystemML Matrix to ApacheSpark DataFrame 
        .toDF() 
     
         #rename default column names
        .selectExpr('C1 as %sa' % (name), 'C2 as %sb' % (name)) 
     
         #add unique ID per row for later joining
        .withColumn("id", monotonically_increasing_id())
    )
        




Now it’s time to create individual DataFrames containing only a subset of the data. I filtered simultaneously for accelerometer each sensor axis and one for each class. This means I’ll get 6 DataFrames. Implemented this using the relational API of SparkSQL. Used class 1 and 2 and not 0 and 1.


In [18]:
x0 = spark.sql("SELECT X from df where class = 0") # => Please create a DataFrame containing only measurements of class 0 from the x axis
y0 = spark.sql("SELECT Y from df where class = 0")# => Please create a DataFrame containing only measurements of class 0 from the y axis
z0 = spark.sql("SELECT Z from df where class = 0")# => Please create a DataFrame containing only measurements of class 0 from the z axis
x1 = spark.sql("SELECT X from df where class = 1")# => Please create a DataFrame containing only measurements of class 1 from the x axis
y1 = spark.sql("SELECT Y from df where class = 1")# => Please create a DataFrame containing only measurements of class 1 from the y axis
z1 = spark.sql("SELECT Z from df where class = 1")# => Please create a DataFrame containing only measurements of class 1 from the z axis

Since I’ve created this cool DFT function before, I can just call it for each of the 6 DataFrames now. And since the result of this function call is a DataFrame again I can use the pyspark best practice in simply calling methods on it sequentially. So I am going to do is the following:

- Calling DFT for each class and accelerometer sensor axis.
- Joining them together on the ID column. 
- Re-adding a column containing the class index.
- Stacking both Dataframes for each classes together



In [19]:
from pyspark.sql.functions import lit

df_class_0 = dft_systemml(x0,'x') \
    .join(dft_systemml(y0,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z0,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_1 = dft_systemml(x1,'x') \
    .join(dft_systemml(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_0.union(df_class_1)

df_dft.show()

SystemML Statistics:
Total execution time:		0.020 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.000 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.001 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.516 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.169 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.155 sec.
Number of executed Spark inst:	0.


+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|        id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|8589934592|       

Created a VectorAssembler which consumes the newly created DFT columns and produces a column “features”


In [20]:
from pyspark.ml.feature import VectorAssembler

In [21]:
vectorAssembler = VectorAssembler(
    inputCols=["xa", "xb", "ya", "yb", "za", "zb"],
    outputCol="features")

Insatiated a classifier from the SparkML package and assign it to the classifier variable. Set the “class” column as target.


In [22]:
from pyspark.ml.classification import GBTClassifier

In [23]:
classifier = classifier = GBTClassifier(labelCol="class", featuresCol="features", maxIter=20).setLabelCol("class")

Let’s train and evaluate…


In [24]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [25]:
model = pipeline.fit(df_dft)

In [26]:
prediction = model.transform(df_dft)

In [27]:
prediction.show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|        id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|            features|       rawPrediction|         probability|prediction|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|8589934592|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|           (6,[],[])|[1.54350200272498...|[0.95635347857270...|       0.0|
|         0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|       

In [28]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("class")
    
binEval.evaluate(prediction) 

1.0

I am happy with the result (even if it would be > 0.8). 


In [29]:
!rm -Rf a2_m4.json

In [30]:
prediction = prediction.repartition(1)
prediction.write.json('a2_m4.json')

In [31]:
!rm -f rklib.py
!wget wget https://raw.githubusercontent.com/IBM/coursera/master/rklib.py

--2020-06-21 16:38:20--  http://wget/
Resolving wget (wget)... failed: Name or service not known.
wget: unable to resolve host address ‘wget’
--2020-06-21 16:38:20--  https://raw.githubusercontent.com/IBM/coursera/master/rklib.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 199.232.8.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|199.232.8.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2540 (2.5K) [text/plain]
Saving to: ‘rklib.py’


2020-06-21 16:38:21 (28.4 MB/s) - ‘rklib.py’ saved [2540/2540]

FINISHED --2020-06-21 16:38:21--
Total wall clock time: 0.4s
Downloaded: 1 files, 2.5K in 0s (28.4 MB/s)


In [32]:
from rklib import zipit
zipit('a2_m4.json.zip','a2_m4.json')

In [33]:
!base64 a2_m4.json.zip > a2_m4.json.zip.base64