In [None]:
!pip install pyspark==2.4.5

In [None]:
!pip install https://github.com/IBM/coursera/blob/master/systemml-1.3.0-SNAPSHOT-python.tar.gz?raw=true

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

In [4]:
!mkdir -p /home/dsxuser/work/systemml


In [5]:
from systemml import MLContext, dml
ml = MLContext(spark)
ml.setConfigProperty("sysml.localtmpdir", "mkdir /home/dsxuser/work/systemml")
print(ml.version())
    
if not ml.version() == '1.3.0-SNAPSHOT':
    raise ValueError('please upgrade to SystemML 1.3.0, or restart your Kernel (Kernel->Restart & Clear Output)')

1.3.0-SNAPSHOT


In [None]:
!wget https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
!mv shake.parquet?raw=true shake.parquet

Now it’s time to read the sensor data and create a temporary query table.

In [7]:
df=spark.read.parquet('shake.parquet')

In [8]:
df.show()

+-----+---------+-----+-----+-----+
|CLASS| SENSORID|    X|    Y|    Z|
+-----+---------+-----+-----+-----+
|    2| qqqqqqqq| 0.12| 0.12| 0.12|
|    2|aUniqueID| 0.03| 0.03| 0.03|
|    2| qqqqqqqq|-3.84|-3.84|-3.84|
|    2| 12345678| -0.1| -0.1| -0.1|
|    2| 12345678|-0.15|-0.15|-0.15|
|    2| 12345678| 0.47| 0.47| 0.47|
|    2| 12345678|-0.06|-0.06|-0.06|
|    2| 12345678|-0.09|-0.09|-0.09|
|    2| 12345678| 0.21| 0.21| 0.21|
|    2| 12345678|-0.08|-0.08|-0.08|
|    2| 12345678| 0.44| 0.44| 0.44|
|    2|    gholi| 0.76| 0.76| 0.76|
|    2|    gholi| 1.62| 1.62| 1.62|
|    2|    gholi| 5.81| 5.81| 5.81|
|    2| bcbcbcbc| 0.58| 0.58| 0.58|
|    2| bcbcbcbc|-8.24|-8.24|-8.24|
|    2| bcbcbcbc|-0.45|-0.45|-0.45|
|    2| bcbcbcbc| 1.03| 1.03| 1.03|
|    2|aUniqueID|-0.05|-0.05|-0.05|
|    2| qqqqqqqq|-0.44|-0.44|-0.44|
+-----+---------+-----+-----+-----+
only showing top 20 rows



In [None]:
!pip install pixiedust

In [10]:
import pixiedust
display(df)

CLASS,SENSORID,X,Y,Z
2,bcbcbcbc,-11.25,-11.25,-11.25
2,Qwertyui,-2.01,-2.01,-2.01
2,pj123456,-0.12,-0.12,-0.12
2,Bbbbbbb,-0.02,-0.02,-0.02
2,qqqqqqqq,-0.34,-0.34,-0.34
2,pj123456,-0.1,-0.1,-0.1
2,bcbcbcbc,-16.08,-16.08,-16.08
2,aaaaaaaa,0.0,0.0,0.0
2,aaaaaaaa,0.0,0.0,0.0
2,aaaaaaaa,0.0,0.0,0.0


In [11]:
df.createOrReplaceTempView("df")

We’ll use Apache SystemML to implement Discrete Fourier Transformation. This way all computation continues to happen on the Apache Spark cluster for advanced scalability and performance.

As you’ve learned from the lecture, implementing Discrete Fourier Transformation in a linear algebra programming language is simple. Apache SystemML DML is such a language and as you can see the implementation is straightforward and doesn’t differ too much from the mathematical definition (Just note that the sum operator has been swapped with a vector dot product using the %*% syntax borrowed from R
):

<img style="float: left;" src="https://wikimedia.org/api/rest_v1/media/math/render/svg/1af0a78dc50bbf118ab6bd4c4dcc3c4ff8502223">



In [12]:
dml_script = '''
PI = 3.141592654
N = nrow(signal)

n = seq(0, N-1, 1)
k = seq(0, N-1, 1)

M = (n %*% t(k))*(2*PI/N)

Xa = cos(M) %*% signal
Xb = sin(M) %*% signal

DFT = cbind(Xa, Xb)
'''

In [13]:
from pyspark.sql.functions import monotonically_increasing_id

def dft_systemml(signal,name):
    prog = dml(dml_script).input('signal', signal).output('DFT')
    
    return (

    #execute the script inside the SystemML engine running on top of Apache Spark
    ml.execute(prog) 
     
         #read result from SystemML execution back as SystemML Matrix
        .get('DFT') 
     
         #convert SystemML Matrix to ApacheSpark DataFrame 
        .toDF() 
     
         #rename default column names
        .selectExpr('C1 as %sa' % (name), 'C2 as %sb' % (name)) 
     
         #add unique ID per row for later joining
        .withColumn("id", monotonically_increasing_id())
    )
        




In [14]:
x0 = spark.sql("Select X from df where class =0")
y0 = spark.sql("Select Y from df where class =0")
z0 = spark.sql("Select Z from df where class =0")
x1 = spark.sql("Select X from df where class =1")
y1 = spark.sql("Select Y from df where class =1")
z1 = spark.sql("Select Z from df where class =1")

Since we’ve created this cool DFT function before, we can just call it for each of the 6 DataFrames now. And since the result of this function call is a DataFrame again we can use the pyspark best practice in simply calling methods on it sequentially. So what we are doing is the following:

- Calling DFT for each class and accelerometer sensor axis.
- Joining them together on the ID column. 
- Re-adding a column containing the class index.
- Stacking both Dataframes for each classes together



In [15]:
from pyspark.sql.functions import lit

df_class_0 = dft_systemml(x0,'x') \
    .join(dft_systemml(y0,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z0,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_1 = dft_systemml(x1,'x') \
    .join(dft_systemml(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_0.union(df_class_1)

df_dft.show()

SystemML Statistics:
Total execution time:		0.015 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.001 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.000 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.250 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.102 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.117 sec.
Number of executed Spark inst:	0.


+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|        id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|8589934592|       

Please create a VectorAssembler which consumes the newly created DFT columns and produces a column “features”


In [17]:
from pyspark.ml.feature import VectorAssembler

In [19]:
vectorAssembler = VectorAssembler(inputCols = ["xa","xb","ya","yb","za","zb"], outputCol="features")

Please insatiate a classifier from the SparkML package and assign it to the classifier variable. Make sure to set the “class” column as target.


In [20]:
from pyspark.ml.classification import GBTClassifier

In [21]:
classifier = GBTClassifier(labelCol="class",featuresCol="features")

Let’s train and evaluate…


In [22]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [23]:
model = pipeline.fit(df_dft)

In [24]:
prediction = model.transform(df_dft)

In [25]:
prediction.show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|        id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|            features|       rawPrediction|         probability|prediction|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|8589934592|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|    0|           (6,[],[])|[1.54350200272498...|[0.95635347857270...|       0.0|
|         0|                 0.0|                 0.0|                 0.0|                 0.0|                 0.0|       

In [26]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("class")
    
binEval.evaluate(prediction) 

1.0