In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameReader, SQLContext
from sqlalchemy import create_engine
import pandas as pd
spark= SparkSession.builder.getOrCreate()
spark
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [5]:
!pip install systemds




So the first thing we need to ensure is that we are on the latest version of SystemML, which is 1.3.0 (as of 20th March'19) Please use the code block below to check if you are already on 1.3.0 or higher. 1.3 contains a necessary fix, that's we are running against the SNAPSHOT


In [6]:
import urllib.request
url = 'https://github.com/LeonardoPalaciosPando1996/Database/blob/Master/shake.parquet?raw=true'
filename = 'shake.parquet'
urllib.request.urlretrieve(url, filename)

('shake.parquet', <http.client.HTTPMessage at 0x24c89a5a160>)

Now it’s time to read the sensor data and create a temporary query table.

In [7]:
df=spark.read.parquet('shake.parquet')

In [8]:
df.show()

+-----+---------+-----+-----+-----+
|CLASS| SENSORID|    X|    Y|    Z|
+-----+---------+-----+-----+-----+
|    2| qqqqqqqq| 0.12| 0.12| 0.12|
|    2|aUniqueID| 0.03| 0.03| 0.03|
|    2| qqqqqqqq|-3.84|-3.84|-3.84|
|    2| 12345678| -0.1| -0.1| -0.1|
|    2| 12345678|-0.15|-0.15|-0.15|
|    2| 12345678| 0.47| 0.47| 0.47|
|    2| 12345678|-0.06|-0.06|-0.06|
|    2| 12345678|-0.09|-0.09|-0.09|
|    2| 12345678| 0.21| 0.21| 0.21|
|    2| 12345678|-0.08|-0.08|-0.08|
|    2| 12345678| 0.44| 0.44| 0.44|
|    2|    gholi| 0.76| 0.76| 0.76|
|    2|    gholi| 1.62| 1.62| 1.62|
|    2|    gholi| 5.81| 5.81| 5.81|
|    2| bcbcbcbc| 0.58| 0.58| 0.58|
|    2| bcbcbcbc|-8.24|-8.24|-8.24|
|    2| bcbcbcbc|-0.45|-0.45|-0.45|
|    2| bcbcbcbc| 1.03| 1.03| 1.03|
|    2|aUniqueID|-0.05|-0.05|-0.05|
|    2| qqqqqqqq|-0.44|-0.44|-0.44|
+-----+---------+-----+-----+-----+
only showing top 20 rows



In [9]:
df.createOrReplaceTempView("df")

We’ll use Apache SystemML to implement Discrete Fourier Transformation. This way all computation continues to happen on the Apache Spark cluster for advanced scalability and performance.

As you’ve learned from the lecture, implementing Discrete Fourier Transformation in a linear algebra programming language is simple. Apache SystemML DML is such a language and as you can see the implementation is straightforward and doesn’t differ too much from the mathematical definition (Just note that the sum operator has been swapped with a vector dot product using the %*% syntax borrowed from R
):

<img style="float: left;" src="https://wikimedia.org/api/rest_v1/media/math/render/svg/1af0a78dc50bbf118ab6bd4c4dcc3c4ff8502223">



In [10]:
from pyspark.sql.functions import monotonically_increasing_id
from systemds.context import SystemDSContext
import numpy as np
import pandas as pd

def dft_systemds(signal,name):


    with SystemDSContext(spark) as sds:
        size = signal.count()  
        signal = sds.from_numpy(signal.toPandas().to_numpy())
        pi = sds.scalar(3.141592654)

        n = sds.seq(0,size-1)
        k = sds.seq(0,size-1)

        M = (n @ (k.t())) * (2*pi/size)
        
        Xa = M.cos() @ signal
        Xb = M.sin() @ signal

        index = (list(map(lambda x: [x],np.array(range(0, size, 1)))))
        DFT = np.hstack((index,Xa.cbind(Xb).compute()))
        DFT_pdf = pd.DataFrame(DFT, columns=list(["id",name+'_sin',name+'_cos']))
        DFT_df = spark.createDataFrame(DFT_pdf)
        return DFT_df

Now it’s time to create a function which takes a single row Apache Spark data frame as argument (the one containing the accelerometer measurement time series for one axis) and returns the Fourier transformation of it. In addition, we are adding an index column for later joining all axis together and renaming the columns to appropriate names. The result of this function is an Apache Spark DataFrame containing the Fourier Transformation of its input in two columns. 


Now it’s time to create individual DataFrames containing only a subset of the data. We filter simultaneously for accelerometer each sensor axis and one for each class. This means you’ll get 6 DataFrames. Please implement this using the relational API of DataFrames or SparkSQL. Please use class 1 and 2 and not 0 and 1. <h1><span style="color:red">Please make sure that each DataFrame has only ONE colum (only the measurement, eg. not CLASS column)</span></h1>


In [11]:
x0 = spark.sql("SELECT X from df where class = 0")
y0 = spark.sql("SELECT Y from df where class = 0")
z0 = spark.sql("SELECT Z from df where class = 0")
x1 = spark.sql("SELECT X from df where class = 1")
y1 = spark.sql("SELECT Y from df where class = 1")
z1 = spark.sql("SELECT Z from df where class = 1")

Since we’ve created this cool DFT function before, we can just call it for each of the 6 DataFrames now. And since the result of this function call is a DataFrame again we can use the pyspark best practice in simply calling methods on it sequentially. So what we are doing is the following:

- Calling DFT for each class and accelerometer sensor axis.
- Joining them together on the ID column. 
- Re-adding a column containing the class index.
- Stacking both Dataframes for each classes together



In [12]:
from pyspark.sql.functions import lit

df_class_0 = dft_systemds(x0,'x') \
    .join(dft_systemds(y0,'y'), on=['id'], how='inner') \
    .join(dft_systemds(z0,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_1 = dft_systemds(x1,'x') \
    .join(dft_systemds(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemds(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_0.union(df_class_1)

df_dft.show()

+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|  id|               x_sin|               x_cos|               y_sin|               y_cos|               z_sin|               z_cos|class|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
| 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|
| 1.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|
| 2.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|
| 3.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|
| 4.0|                 0.0|

Please create a VectorAssembler which consumes the newly created DFT columns and produces a column “features”


In [13]:
from pyspark.ml.feature import VectorAssembler

In [14]:
vectorAssembler = VectorAssembler(inputCols=["x_sin", "x_cos", "y_sin", "y_cos", "z_sin", "z_cos"],outputCol="features")

Please insatiate a classifier from the SparkML package and assign it to the classifier variable. Make sure to set the “class” column as target.


In [15]:
from pyspark.ml.classification import GBTClassifier

In [16]:
classifier = GBTClassifier(labelCol="class", featuresCol="features", maxIter=10)

Let’s train and evaluate…


In [17]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [18]:
model = pipeline.fit(df_dft)

In [19]:
prediction = model.transform(df_dft)

In [20]:
prediction.show()

+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|  id|               x_sin|               x_cos|               y_sin|               y_cos|               z_sin|               z_cos|class|            features|       rawPrediction|         probability|prediction|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
| 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|           (6,[],[])|[1.32590267922033...|[0.93412217565278...|       0.0|
| 1.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|          

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("class")
binEval.evaluate(prediction) 

1.0

Are you happy with the result? I’m happy with > 0.8.

In [None]:
#prediction = prediction.repartition(1)
#prediction.write.json('a2_m4.json')