In [0]:
# Import the required packages for the project
import mlflow
mlflow.pyspark.ml.autolog()
from pyspark.sql.types import *
from pyspark.sql.functions import isnan, count, when, col
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier, LogisticRegression, RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [0]:
# create schema
mySchema = StructType ([
    StructField("data1", FloatType()),
    StructField("data2", FloatType()),
    StructField("data3", FloatType()),
    StructField("data4", FloatType()),
    StructField("data5", FloatType()),
    StructField("data6", FloatType()),
    StructField("data7", FloatType()),
    StructField("data8", FloatType()),
    StructField("data9", FloatType()),
    StructField("data10", FloatType()),
    StructField("data11", FloatType()),
    StructField("data12", FloatType()),
    StructField("data13", FloatType()),
    StructField("data14", FloatType()),
    StructField("data15", FloatType()),
    StructField("data16", FloatType()),
    StructField("data17", FloatType()),
    StructField("data18", FloatType()),
    StructField("data19", FloatType()),
    StructField("data20", FloatType()),
    StructField("fault_detected", IntegerType())
])

# create an RDD from the CSV file
myRDD = sc.textFile("/FileStore/tables/FaultDataset.csv")

# remove the header row of the dataset
RDD2 = myRDD.mapPartitionsWithIndex(lambda id_x, iter: list(iter)[1:] if(id_x == 0) else iter)

# map the RDD, split it by comma and assign the datatype to the column
RDD3 = RDD2.map(lambda line : line.split(",")). \
            map(lambda values : [float(values[0]), float(values[1]),float(values[2]),float(values[3]),float(values[4]),float(values[5]),
                                 float(values[6]),float(values[7]),float(values[8]),float(values[9]),float(values[10]),float(values[11]),
                                 float(values[12]),float(values[13]),float(values[14]),float(values[15]),float(values[16]),float(values[17]), 
                                 float(values[18]), float(values[19]), int(values[20])])
# Create a datafram and assign schema
FaultDatasetDF = spark.createDataFrame(RDD3, mySchema)

In [0]:
FaultDatasetDF.show(truncate=False)

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+
|data1    |data2    |data3    |data4    |data5    |data6    |data7    |data8    |data9    |data10   |data11   |data12   |data13   |data14   |data15   |data16   |data17   |data18   |data19   |data20   |fault_detected|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+
|0.3503125|0.3496875|0.35     |0.3459375|0.3475   |0.3459375|0.341875 |0.3434375|0.355    |0.3553125|0.3459375|0.3525   |0.3575   |0.3590625|0.35875  |0.3484375|0.3590625|0.35     |0.3559375|0.3490625|0             |
|0.5090625|0.484375 |0.046875 |0.071875 |0.06     |0.0634375|0.0575   |0.0546875|0.0559375|0.058125 |0.0628125|0.065625 |0.0640625|0

In [0]:
# check and count the number of empty row cells in each column
CountMissingValues = FaultDatasetDF.select([count(when(col(ColumnName).isNull(), ColumnName)).alias(ColumnName) for ColumnName in FaultDatasetDF.columns])

CountMissingValues.show()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+--------------+
|data1|data2|data3|data4|data5|data6|data7|data8|data9|data10|data11|data12|data13|data14|data15|data16|data17|data18|data19|data20|fault_detected|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+--------------+
|    0|    0|    0|    0|    0|    0|    0|    0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     0|     0|     0|             0|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+--------------+



In [0]:
FaultDatasetDF.createOrReplaceTempView("faultDatasetView")

In [0]:
%sql
-- minimum values for each variable for when the room is occupied and for when the room is unoccupied
SELECT 
MIN(data1), MIN(data2), MIN(data3), MIN(data4), MIN(data5), MIN(data6), MIN(data7), MIN(data8), 
MIN(data9), MIN(data10), MIN(data11), MIN(data12), MIN(data13), MIN(data14), MIN(data15), 
MIN(data16), MIN(data17), MIN(data18), MIN(data19), MIN(data20), fault_detected 
FROM faultDatasetView 
GROUP BY fault_detected

min(data1),min(data2),min(data3),min(data4),min(data5),min(data6),min(data7),min(data8),min(data9),min(data10),min(data11),min(data12),min(data13),min(data14),min(data15),min(data16),min(data17),min(data18),min(data19),min(data20),fault_detected
0.025625,0.024375,0.0259375,0.0259375,0.0246875,0.025,0.025,0.0246875,0.024375,0.025,0.02625,0.025625,0.025,0.025,0.025,0.024375,0.024375,0.024375,0.024375,0.0259375,0
0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.025,0.025,0.025,0.025,0.025,0.025,0.025,0.025,1


In [0]:
%sql
SELECT 
AVG(data1), AVG(data2), AVG(data3), AVG(data4), AVG(data5), AVG(data6), AVG(data7), AVG(data8), 
AVG(data9), AVG(data10), AVG(data11), AVG(data12), AVG(data13), AVG(data14), AVG(data15), 
AVG(data16), AVG(data17), AVG(data18), AVG(data19), AVG(data20), fault_detected 
FROM faultDatasetView 
GROUP BY fault_detected

avg(data1),avg(data2),avg(data3),avg(data4),avg(data5),avg(data6),avg(data7),avg(data8),avg(data9),avg(data10),avg(data11),avg(data12),avg(data13),avg(data14),avg(data15),avg(data16),avg(data17),avg(data18),avg(data19),avg(data20),fault_detected
0.148732646307505,0.1500714996302828,0.1488361627928306,0.1483677492488626,0.1493353826457693,0.1493828697514961,0.1484459077328398,0.1482800392111666,0.1478870264336697,0.1487573987182536,0.148715090828463,0.1480762349041535,0.147183464210219,0.1478757263472752,0.1473991739753276,0.1484001023386971,0.1472404352074559,0.1467593090955105,0.1475128470456971,0.1468147330520914,0
0.5345139639521057,0.5351908230786045,0.5354066000993953,0.5359103936895253,0.5363514989288143,0.5362730039932312,0.5369834942214778,0.5378515124176655,0.5384589569876471,0.5390929701135486,0.5394999734440749,0.5398922463561465,0.5410531912527391,0.5418817938353147,0.5424007214084621,0.5432684032652997,0.544029340092646,0.5447457493589554,0.5455314386603946,0.5465267168548803,1


In [0]:
%sql
SELECT 
MAX(data1), MAX(data2), MAX(data3), MAX(data4), MAX(data5), MAX(data6), MAX(data7), 
MAX(data8), MAX(data9), MAX(data10), MAX(data11), MAX(data12), MAX(data13), MAX(data14), 
MAX(data15), MAX(data16), MAX(data17), MAX(data18), MAX(data19), MAX(data20), fault_detected 
FROM faultDatasetView 
GROUP BY fault_detected

max(data1),max(data2),max(data3),max(data4),max(data5),max(data6),max(data7),max(data8),max(data9),max(data10),max(data11),max(data12),max(data13),max(data14),max(data15),max(data16),max(data17),max(data18),max(data19),max(data20),fault_detected
0.7684375,1.2134376,0.82875,0.786875,0.7621875,0.82875,0.82875,0.8203125,1.0709375,0.93125,1.0709375,1.2134376,0.82875,1.2134376,1.0709375,0.93125,0.73125,1.0709375,0.7240625,1.0709375,0
1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1


## Data Pre-processing

In [0]:
# preprocess data into correct format by transform data into vector
preprocess = RFormula(formula="fault_detected ~ .")

FaultDatasetDF = preprocess.fit(FaultDatasetDF).transform(FaultDatasetDF)

FaultDatasetDF.show(truncate=False)

2023/04/23 20:28:00 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'd834fbccf46f45dc8e24741f28a0d4a2', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|data1    |data2    |data3    |data4    |data5    |data6    |data7    |data8    |data9    |data10   |data11   |data12   |data13   |data14   |data15   |data16   |data17   |data18   |data19   |data20   |fault_detected|features                                                                                                                                            

In [0]:
# split data into training and test datasets
(trainingDF, testDF) = FaultDatasetDF.randomSplit([0.7, 0.3], seed=100)

In [0]:
# Define the decision tree algorithm
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# train the model
ModelDT = dt.fit(trainingDF)

# make prediction on the test dataset
PredictionDT = ModelDT.transform(testDF)

# use evaluator to measure accuracy of prediction on test data
EvaluatorDT = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
AccuracyDT = EvaluatorDT.evaluate(PredictionDT)

# Display the accuracy
print("Accuracy = %g " % (AccuracyDT))


2023/04/23 20:28:02 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'd21e395a712e42ce9d4bd99f296c6bf3', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/04/23 20:28:10 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Accuracy = 0.955492 


### Using Mlflow ParamGridBulder & TrainValidationSplit for Grid Search

- Values of 3, 5 and 7 for the maxDepth
- Values of 16, 32 and 64 for the maxBins
- Gini and entropy as the options for impurity

In [0]:
# Create a parameter grid
parameters = ParamGridBuilder().addGrid(dt.impurity, ["gini", "entropy"])\
                                .addGrid(dt.maxDepth, [3,5,7])\
                                .addGrid(dt.maxBins, [16,32,64])\
                                .build()

# Define TrainValidationSplit
tvs = TrainValidationSplit().setSeed(100)\
                            .setTrainRatio(0.75)\
                            .setEstimatorParamMaps(parameters)\
                            .setEstimator(dt)\
                            .setEvaluator(EvaluatorDT)
# Train model using grid search
gridsearchModel = tvs.fit(trainingDF)

# select best model and identify the parameters
bestModel = gridsearchModel.bestModel

2023/04/23 20:29:05 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '416f3b593fa84d9a8a27b01fadd459c3', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/04/23 20:30:11 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2023/04/23 20:31:14 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
print("Parameters for the best model:")
print("MaxDepth Parameter: %g" %bestModel.getMaxDepth())
print("Impurity Parameter: %s" %bestModel.getImpurity())
print("MaxBins Parameter: %g" %bestModel.getMaxBins())

Parameters for the best model:
MaxDepth Parameter: 7
Impurity Parameter: entropy
MaxBins Parameter: 32


In [0]:
# Evaluate the best model based on its prediction on the test dataset
EvaluatorDT.evaluate(bestModel.transform(testDF))

0.9565685570710696

In [0]:
# Train and evaluate using Linear SVC Model
svc = LinearSVC(labelCol="label", featuresCol="features")
ModelSVC = svc.fit(trainingDF)
PredictionSVC = ModelSVC.transform(testDF)
EvaluatorSVC = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
AccuracySVC = EvaluatorSVC.evaluate(PredictionSVC)

# Specify evaluator algorithm
EvaluatorBCE = BinaryClassificationEvaluator(labelCol="label")

# Train and evaluate using Logistic Regression Model
lr = LogisticRegression(featuresCol="features", labelCol="label")
ModelLR = lr.fit(trainingDF)
PredictionLR = ModelLR.transform(testDF)
AccuracyLR = EvaluatorBCE.evaluate(PredictionLR)

# Train and evaluate using Random Forest Model
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
ModelRF = rf.fit(trainingDF)
PredictionRF = ModelRF.transform(testDF)
AccuracyRF = EvaluatorBCE.evaluate(PredictionRF)

# Train and evaluate using Gradient-Boosted Tree Model
gbt = GBTClassifier(featuresCol="features", labelCol="label")
ModelGBT = gbt.fit(trainingDF)
PredictionGBT = ModelGBT.transform(testDF)
AccuracyGBT = EvaluatorBCE.evaluate(PredictionGBT)

2023/04/23 20:32:09 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '26633369c0c3436ca8115a6c19f31ead', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/04/23 20:32:56 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2023/04/23 20:33:50 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '1a0782faa4ab4774ae9010f2e5ed2e9f', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/04/23 20:33:59 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or

In [0]:
print("Accuracy Linear SVC = %g " % (AccuracySVC))
print("Accuracy Logistic Regression = %g " % (AccuracyLR))
print("Accuracy Random Forest = %g " % (AccuracyRF))
print("Accuracy Gradient-Boosted Tree = %g " % (AccuracyGBT))

Accuracy Linear SVC = 0.806533 
Accuracy Logistic Regression = 0.927898 
Accuracy Random Forest = 0.985615 
Accuracy Gradient-Boosted Tree = 0.996752 


In [0]:
# Create a parameter grid
parametersGBT = ParamGridBuilder().addGrid(dt.impurity, ["variance", "gini", "entropy"])\
                                  .addGrid(dt.maxDepth, [3,5,10])\
                                  .addGrid(dt.maxBins, [16,32,64])\
                                  .addGrid(dt.minInstancesPerNode, [1,3,5])\
                                  .build()
tvsGBT = TrainValidationSplit().setSeed(100)\
                              .setTrainRatio(0.75)\
                              .setEstimatorParamMaps(parametersGBT)\
                              .setEstimator(gbt)\
                              .setEvaluator(EvaluatorBCE)
# Train model using grid search
gridsearchModelGBT = tvsGBT.fit(trainingDF)
bestModelGBT = gridsearchModelGBT.bestModel

2023/04/23 20:37:32 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '6474b82875fe4f2cb5b43603a0e66aa9', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [0]:
print("Parameters for the best model:")
print("Impurity Parameter: %s" %bestModelGBT.getImpurity())
print("MaxDepth Parameter: %s" %bestModelGBT.getMaxDepth())
print("MaxBins Parameter: %g" %bestModelGBT.getMaxBins())
print("MinInstancesPerNode Parameter: %g" %bestModelGBT.getMinInstancesPerNode())

Parameters for the best model:
Impurity Parameter: variance
MaxDepth Parameter: 5
MaxBins Parameter: 32
MinInstancesPerNode Parameter: 1


In [0]:
# Evaluate the best model based on its prediction on the test dataset
EvaluatorBCE.evaluate(bestModelGBT.transform(testDF))

0.9967522562079215