In [0]:
#import mlflow library and autolog machine learning tools
import mlflow

mlflow.pyspark.ml.autolog()

In [0]:
#read data into spark DF
FaultDataset = spark.read.csv("/FileStore/tables/FaultDataset.csv",header=False,inferSchema=True)

#view data in the output
FaultDataset.display()

In [0]:
#changing column name as the data contains no header
FaultDataset = FaultDataset.withColumnRenamed("_c0","vsr" )\
.withColumnRenamed("_c1","vsr_i" )\
.withColumnRenamed("_c2","vsr_ii" )\
.withColumnRenamed("_c3","vsr_iii" )\
.withColumnRenamed("_c4","vsr_iv" )\
.withColumnRenamed("_c5","vsr_v" )\
.withColumnRenamed("_c6","vsr_vi" )\
.withColumnRenamed("_c7","vsr_vii" )\
.withColumnRenamed("_c8","vsr_viii" )\
.withColumnRenamed("_c9","vsr_ix" )\
.withColumnRenamed("_c10","vsr_x" )\
.withColumnRenamed("_c11","vsr_xi" )\
.withColumnRenamed("_c12","vsr_xii" )\
.withColumnRenamed("_c13","vsr_xiii" )\
.withColumnRenamed("_c14","vsr_xiv" )\
.withColumnRenamed("_c15","vsr_xv" )\
.withColumnRenamed("_c16","vsr_xvi" )\
.withColumnRenamed("_c17","vsr_xvii" )\
.withColumnRenamed("_c18","vsr_xviii" )\
.withColumnRenamed("_c19","vsr_xix" )\
.withColumnRenamed("_c20","fault_detected")

FaultDataset.show(3, truncate=False)

In [0]:
##using Databricks in-built visualisations to explore some variables using scatterplot
FaultDataset.display()

In [0]:
#using Databricks in-built visualisations to explore some variables using histogram
display(FaultDataset)

In [0]:
#using Databricks in-built visualisations to explore some variables using histogram
display(FaultDataset)

In [0]:
display(FaultDataset)

In [0]:
#creating temp view for SQL querying
FaultDataset.createOrReplaceTempView("fault_view")

In [0]:
%sql

--using SQL to query the data

SELECT fault_detected, AVG(vsr), AVG(vsr_iii), AVG(vsr_v), AVG(vsr_vii), AVG(vsr_ix), AVG(vsr_xi), AVG(vsr_xiv), AVG(vsr_xvi), AVG(vsr_xviii), AVG(vsr_xix)
FROM fault_view
GROUP BY fault_detected

In [0]:
%sql

--using SQL to query the data

SELECT fault_detected,MAX(vsr),MAX(vsr_i),MAX(vsr_ii),MAX(vsr_iii),MAX(vsr_iv),MAX(vsr_v),MAX(vsr_vi),MAX(vsr_vii),MAX(vsr_viii),MAX(vsr_ix),MAX(vsr_x),MAX(vsr_xi),MAX(vsr_xii),MAX(vsr_xiii),MAX(vsr_xiv),MAX(vsr_xv),MAX(vsr_xvi),MAX(vsr_xvii),MAX(vsr_xviii),MAX(vsr_xix),MIN(vsr),MIN(vsr_i),MIN(vsr_ii),MIN(vsr_iii),MIN(vsr_iv),MIN(vsr_v),MIN(vsr_vi),MIN(vsr_vii),MIN(vsr_viii),MIN(vsr_ix),MIN(vsr_x),MIN(vsr_xi),MIN(vsr_xii),MIN(vsr_xiii),MIN(vsr_xiv),MIN(vsr_xv),MIN(vsr_xvi),MIN(vsr_xvii),MIN(vsr_xviii),MIN(vsr_xix)
FROM fault_view
GROUP BY fault_detected

In [0]:
#printing the schema of the data
FaultDataset.printSchema()

In [0]:
#FaultDataset = FaultDataset.drop(FaultDataset.features)
#FaultDataset = FaultDataset.drop(FaultDataset.label)

In [0]:
#data preprocessing into the correct format
from pyspark.ml.feature import RFormula

preprocess = RFormula(formula="fault_detected ~ .")

FaultDataset = preprocess.fit(FaultDataset).transform(FaultDataset)

FaultDataset.show(2)

In [0]:
#split into train-test datasets
(training_df,test_df) = FaultDataset.randomSplit([0.7,0.3],seed=100)

In [0]:
#training model
from pyspark.ml.classification import DecisionTreeClassifier
decision_tree = DecisionTreeClassifier(labelCol="label",featuresCol="features")
#training model
model = decision_tree.fit(training_df)

In [0]:
#model evaluation
predictions = model.transform(test_df)
predictions.show(3)

In [0]:
#test data prediction accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

In [0]:
#training the model with different hyperparameters to see which ones produce the best result

#in trying different hyperparameters,
#- values of 3,4, and 6 -different from the default 0f 5- will be used for the maxDepth;
#- values of 16,24, and 64 -different from the default of 32- will be used for the maxBins;
#- gini and entropy will be used as the options for impurity.

In [0]:
#hyperparameter 1
from pyspark.ml.tuning import ParamGridBuilder
#create parameter grid
parameters = ParamGridBuilder()\
.addGrid(decision_tree.impurity,["gini","entropy"])\
.addGrid(decision_tree.maxDepth,[3,4,6])\
.addGrid(decision_tree.maxBins,[16,24,64])\
.build()

In [0]:
from pyspark.ml.tuning import TrainValidationSplit

#define TrainValidationSplit
tvs = TrainValidationSplit()\
.setSeed(100)\
.setTrainRatio(0.75)\
.setEstimatorParamMaps(parameters)\
.setEstimator(decision_tree)\
.setEvaluator(evaluator)

In [0]:
#train model using grid search
gsm = tvs.fit(training_df)

In [0]:
#selecting best model and identifying its parameters
bestModel = gsm.bestModel

print("Parameters for the best model:")
print("MaxDepth Parameter: %g" %bestModel.getMaxDepth())
print("Impurity Parameter: %s" %bestModel.getImpurity())
print("MaxBins Parameter: %g" %bestModel.getMaxBins())

In [0]:
#use bestModel to predict test_set
evaluator.evaluate(bestModel.transform(test_df))

In [0]:
import mlflow
logged_model = 'runs:/577e476b3b6a46a0aa72fcc3754ef14d/best_model'

# Load model
loaded_model = mlflow.spark.load_model(logged_model)

# Perform inference via model.transform()
loaded_predictions = loaded_model.transform(test_df)

loaded_predictions.show(3)

In [0]:
#split into train-test datasets 2
(training_df,test_df) = FaultDataset.randomSplit([0.8,0.2],seed=100)

In [0]:
#training model
from pyspark.ml.classification import DecisionTreeClassifier
decision_tree = DecisionTreeClassifier(labelCol="label",featuresCol="features")
#training model
model = decision_tree.fit(training_df)

In [0]:
#make predictions on the test dataset
predictions = model.transform(test_df)
predictions.show(3)

In [0]:
#use evaluator to measure accuracy of predictions on test data
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

In [0]:
#training the model with different hyperparameters to see which ones produce the best result

#in trying different hyperparameters,
#- values of 5,7, and 8 -different from the default 0f 5- will be used for the maxDepth;
#- values of 20,35, and 55 -different from the default of 32- will be used for the maxBins;
#- gini and entropy will be used as the options for impurity.

In [0]:
#hyperparameter 2
from pyspark.ml.tuning import ParamGridBuilder
#create parameter grid
parameters = ParamGridBuilder()\
.addGrid(decision_tree.impurity,["gini","entropy"])\
.addGrid(decision_tree.maxDepth,[5,7,8])\
.addGrid(decision_tree.maxBins,[20,35,55])\
.build()

In [0]:
from pyspark.ml.tuning import TrainValidationSplit

#define TrainValidationSplit
tvs = TrainValidationSplit()\
.setSeed(100)\
.setTrainRatio(0.75)\
.setEstimatorParamMaps(parameters)\
.setEstimator(decision_tree)\
.setEvaluator(evaluator)

In [0]:
#train model using grid search
gsm = tvs.fit(training_df)

In [0]:
#selecting best model and identifying its parameters
bestModel = gsm.bestModel

print("Parameters for the best model:")
print("MaxDepth Parameter: %g" %bestModel.getMaxDepth())
print("Impurity Parameter: %s" %bestModel.getImpurity())
print("MaxBins Parameter: %g" %bestModel.getMaxBins())

In [0]:
#use bestModel to predict test_set
evaluator.evaluate(bestModel.transform(test_df))