## SprakML Classifier

In [1]:
# load parquet dataset
!wget https://github.com/rahulsnair/sample_parquet/blob/master/a2.parquet?raw=true
!mv a2.parquet?raw=true a2.parquet

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20191216101154-0001
KERNEL_ID = 98cd4d00-37d4-483a-8170-d80440824ea9
--2019-12-16 10:11:56--  https://github.com/rahulsnair/sample_parquet/blob/master/a2.parquet?raw=true
Resolving github.com (github.com)... 140.82.118.4
Connecting to github.com (github.com)|140.82.118.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/rahulsnair/sample_parquet/raw/master/a2.parquet [following]
--2019-12-16 10:11:57--  https://github.com/rahulsnair/sample_parquet/raw/master/a2.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/rahulsnair/sample_parquet/master/a2.parquet [following]
--2019-12-16 10:11:57--  https://raw.githubusercontent.com/rahulsnair/sample_parquet/master/a2.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 199.232.8.133
Connecting

In [2]:
# convert parquet to a dataframe
df=spark.read.load('a2.parquet')

df.createOrReplaceTempView("df")
spark.sql("SELECT * from df").show()

+-----+-----------+-------------------+-------------------+-------------------+
|CLASS|   SENSORID|                  X|                  Y|                  Z|
+-----+-----------+-------------------+-------------------+-------------------+
|    0|         26| 380.66434005495194| -139.3470983812975|-247.93697521077704|
|    0|         29| 104.74324299209692| -32.27421440203938|-25.105013725863852|
|    0| 8589934658| 118.11469236129976| 45.916682927433534| -87.97203782706572|
|    0|34359738398| 246.55394030642543|-0.6122810693132044|-398.18662513951506|
|    0|17179869241|-190.32584900181487|  234.7849657520335|-206.34483804019288|
|    0|25769803830| 178.62396382387422| -47.07529438881511|  84.38310769821979|
|    0|25769803831|  85.03128805189493|-4.3024316644854546|-1.1841857567516714|
|    0|34359738411| 26.786262674736566| -46.33193951911338| 20.880756008396055|
|    0| 8589934592|-16.203752396859194| 51.080957032176954| -96.80526656416971|
|    0|25769803852|   47.2048142440404| 

In [3]:
# preprocessing definition
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

indexer = StringIndexer(inputCol="CLASS", outputCol="label")
vectorAssembler = VectorAssembler(inputCols=['X','Y','Z'], outputCol= 'features')

In [4]:
# classifier definition
from pyspark.ml.classification import LogisticRegression

classifier = LogisticRegression(maxIter=15, regParam=0.3, elasticNetParam=0.8)

In [5]:
# create pipeline for model execution
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, vectorAssembler, classifier])

In [6]:
# train the model
model = pipeline.fit(df)

In [7]:
# prediction on the model
prediction = model.transform(df)

In [8]:
prediction.show()

+-----+-----------+-------------------+-------------------+-------------------+-----+--------------------+--------------------+--------------------+----------+
|CLASS|   SENSORID|                  X|                  Y|                  Z|label|            features|       rawPrediction|         probability|prediction|
+-----+-----------+-------------------+-------------------+-------------------+-----+--------------------+--------------------+--------------------+----------+
|    0|         26| 380.66434005495194| -139.3470983812975|-247.93697521077704|  0.0|[380.664340054951...|[0.13828701585409...|[0.53451676528599...|       0.0|
|    0|         29| 104.74324299209692| -32.27421440203938|-25.105013725863852|  0.0|[104.743242992096...|[0.13828701585409...|[0.53451676528599...|       0.0|
|    0| 8589934658| 118.11469236129976| 45.916682927433534| -87.97203782706572|  0.0|[118.114692361299...|[0.13828701585409...|[0.53451676528599...|       0.0|
|    0|34359738398| 246.55394030642543|-

In [9]:
# model evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("CLASS")
    
binEval.evaluate(prediction) 

0.534516765285996