In [1]:
import findspark
findspark.init('/opt/spark')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('./MOCK_DATA.csv', inferSchema=True, header=True)

21/09/10 00:04:41 WARN Utils: Your hostname, ARK resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlp5s0)
21/09/10 00:04:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/10 00:04:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/10 00:04:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/09/10 00:04:43 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
data.select(['time']).show()

+--------+
|    time|
+--------+
| 6:20:13|
|15:51:59|
|15:21:04|
|12:33:55|
| 8:07:46|
|23:54:55|
|11:10:14|
|16:23:18|
| 5:52:29|
| 0:42:18|
| 7:20:09|
|19:04:09|
| 0:08:57|
|19:46:00|
|21:45:05|
|17:51:44|
| 4:28:56|
|18:15:14|
| 6:03:34|
| 2:16:50|
+--------+
only showing top 20 rows



In [3]:
data.summary()

                                                                                

DataFrame[summary: string, id: string, time: string, doppler_frequency: string, weight: string, IR_Sensor: string, label: string]

In [9]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['doppler_frequency', 'weight', 'IR_Sensor'], outputCol='features')
data = assembler.transform(data)

In [19]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# The standard scaler can take two additional parameters withStd (True by default. Scales the data to unit SD) 
# and withMean (False by default. It centers the data with mean before scaling,, Fslse by default).
scaler_model = scaler.fit(data)

In [20]:
scaled_data = scaler_model.transform(data)
scaled_data.show()

+--------------------+--------+-----------------+------+---------+-----+--------------------+--------------------+
|                  id|    time|doppler_frequency|weight|IR_Sensor|label|            features|     scaled_features|
+--------------------+--------+-----------------+------+---------+-----+--------------------+--------------------+
|87e1edb0-f1ce-48a...| 6:20:13|            102.7| 75.24|      101|    0| [102.7,75.24,101.0]|[16.2372914302466...|
|5b174bee-0f6f-476...|15:51:59|           107.61| 70.26|       89|    0| [107.61,70.26,89.0]|[17.0135825784697...|
|923ea99e-74b8-4ad...|15:21:04|           100.99| 73.38|      223|    0|[100.99,73.38,223.0]|[15.9669334132484...|
|c49fe2e1-c20c-445...|12:33:55|           105.95| 77.89|       64|    0| [105.95,77.89,64.0]|[16.7511297666468...|
|49171217-57a9-48e...| 8:07:46|            89.35| 71.47|      164|    0| [89.35,71.47,164.0]|[14.1266016484181...|
|5cfd855c-86e9-4e6...|23:54:55|           105.38| 74.72|       13|    1| [105.38

In [21]:
# Using a min max scaler to normalize the sensor data
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(min=0, max=1, inputCol='features', outputCol='features_minmax')
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)
data.show()

+--------------------+--------+-----------------+------+---------+-----+--------------------+--------------------+
|                  id|    time|doppler_frequency|weight|IR_Sensor|label|            features|     features_minmax|
+--------------------+--------+-----------------+------+---------+-----+--------------------+--------------------+
|87e1edb0-f1ce-48a...| 6:20:13|            102.7| 75.24|      101|    0| [102.7,75.24,101.0]|[0.56041611096292...|
|5b174bee-0f6f-476...|15:51:59|           107.61| 70.26|       89|    0| [107.61,70.26,89.0]|[0.69138436916511...|
|923ea99e-74b8-4ad...|15:21:04|           100.99| 73.38|      223|    0|[100.99,73.38,223.0]|[0.51480394771939...|
|c49fe2e1-c20c-445...|12:33:55|           105.95| 77.89|       64|    0| [105.95,77.89,64.0]|[0.64710589490530...|
|49171217-57a9-48e...| 8:07:46|            89.35| 71.47|      164|    0| [89.35,71.47,164.0]|[0.20432115230728...|
|5cfd855c-86e9-4e6...|23:54:55|           105.38| 74.72|       13|    1| [105.38

In [23]:
from pyspark.ml.classification import RandomForestClassifier
algo = RandomForestClassifier(featuresCol='features_minmax', labelCol='label')
model_features = algo.fit(data)

In [24]:
predictions = model.transform(data)

In [25]:
predictions.select(['label', 'prediction', 'probability']).show()

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.54831819958797...|
|    0|       0.0|[0.54881404687007...|
|    0|       0.0|[0.54508387612033...|
|    0|       0.0|[0.62579006707527...|
|    0|       0.0|[0.50580277203545...|
|    1|       1.0|[0.42224816937221...|
|    0|       1.0|[0.44334677688311...|
|    1|       1.0|[0.29950673915376...|
|    0|       0.0|[0.58597507069939...|
|    0|       0.0|[0.50524503064516...|
|    1|       0.0|[0.56397055279667...|
|    0|       1.0|[0.41504013379647...|
|    0|       0.0|[0.53855179099179...|
|    0|       0.0|[0.56988497203167...|
|    1|       0.0|[0.51093850095774...|
|    1|       1.0|[0.44422170977655...|
|    1|       1.0|[0.33607539366079...|
|    1|       1.0|[0.46895939788258...|
|    0|       0.0|[0.54355591946092...|
|    0|       1.0|[0.47384827044263...|
+-----+----------+--------------------+
only showing top 20 rows



In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)

0.7392129568518278

In [27]:
# Evaluation using sci-kit learn
y_true = predictions.select(['label']).collect()
y_pred = predictions.select(['prediction']).collect()

In [28]:
from sklearn.metrics import classification_report, confusion_matrix

In [29]:
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.66      0.69      0.67       499
           1       0.68      0.65      0.66       501

    accuracy                           0.67      1000
   macro avg       0.67      0.67      0.67      1000
weighted avg       0.67      0.67      0.67      1000

