In [0]:
#Enabling automatic logging with MLflow
import mlflow
mlflow.pyspark.ml.autolog()

In [0]:
#Listing files in a directory using Databricks Utilities (dbutils)
dbutils.fs.ls("FileStore/tables/")

Out[48]: [FileInfo(path='dbfs:/FileStore/tables/FaultDataset.csv', name='FaultDataset.csv', size=1703184, modificationTime=1682161219000),
 FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1677682877000),
 FileInfo(path='dbfs:/FileStore/tables/TS021_2021_2.csv', name='TS021_2021_2.csv', size=497239, modificationTime=1679510757000),
 FileInfo(path='dbfs:/FileStore/tables/activations/', name='activations/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations.zip', name='activations.zip', size=8411369, modificationTime=1675268719000),
 FileInfo(path='dbfs:/FileStore/tables/archive__1_-1.zip', name='archive__1_-1.zip', size=35601528, modificationTime=1675526853000),
 FileInfo(path='dbfs:/FileStore/tables/archive__1_-2.zip', name='archive__1_-2.zip', size=35601528, modificationTime=1675527260000),
 FileInfo(path='dbfs:/FileStore/tables/archive__1_.zip', name='archive__1_.zip', s

In [0]:
# loading my data set FaultDataset.csv

FaultDatasetDF = spark.read.csv("/FileStore/tables/FaultDataset.csv", header= "true", inferSchema="true")

FaultDatasetDF.show(truncate=False)

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+
|1        |2        |3        |4        |5        |6        |7        |8        |9        |10       |11       |12       |13       |14       |15       |16       |17       |18       |19       |20       |fault_detected|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+
|0.3503125|0.3496875|0.35     |0.3459375|0.3475   |0.3459375|0.341875 |0.3434375|0.355    |0.3553125|0.3459375|0.3525   |0.3575   |0.3590625|0.35875  |0.3484375|0.3590625|0.35     |0.3559375|0.3490625|0             |
|0.5090625|0.484375 |0.046875 |0.071875 |0.06     |0.0634375|0.0575   |0.0546875|0.0559375|0.058125 |0.0628125|0.065625 |0.0640625|0

In [0]:

# Checking the DataFrame Schema
FaultDatasetDF.printSchema()



root
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- 10: double (nullable = true)
 |-- 11: double (nullable = true)
 |-- 12: double (nullable = true)
 |-- 13: double (nullable = true)
 |-- 14: double (nullable = true)
 |-- 15: double (nullable = true)
 |-- 16: double (nullable = true)
 |-- 17: double (nullable = true)
 |-- 18: double (nullable = true)
 |-- 19: double (nullable = true)
 |-- 20: double (nullable = true)
 |-- fault_detected: integer (nullable = true)



In [0]:
#Reading the contents of  CSV file named "FaultDataset.csv

dbutils.fs.head("/FileStore/tables/FaultDataset.csv")

[Truncated to first 65536 bytes]
Out[51]: '1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,fault_detected\r\n0.3503125,0.3496875,0.35,0.3459375,0.3475,0.3459375,0.341875,0.3434375,0.355,0.3553125,0.3459375,0.3525,0.3575,0.3590625,0.35875,0.3484375,0.3590625,0.35,0.3559375,0.3490625,0\r\n0.5090625,0.484375,0.046875,0.071875,0.06,0.0634375,0.0575,0.0546875,0.0559375,0.058125,0.0628125,0.065625,0.0640625,0.0634375,0.0534375,0.084375,0.0615625,0.05375,0.076875,0.056875,0\r\n0.0928125,0.0975,0.1096875,0.1025,0.09625,0.1053125,0.09875,0.098125,0.091875,0.0909375,0.09875,0.103125,0.1,0.1034375,0.1015625,0.0978125,0.0990625,0.10375,0.098125,0.1040625,0\r\n0.09375,0.089375,0.091875,0.0996875,0.0909375,0.096875,0.0940625,0.096875,0.096875,0.099375,0.099375,0.0959375,0.0959375,0.0940625,0.09125,0.0996875,0.09375,0.0934375,0.0971875,0.094375,0\r\n0.036875,0.0440625,0.038125,0.0428125,0.0353125,0.0340625,0.033125,0.0403125,0.0346875,0.036875,0.035625,0.03625,0.0409375,0.039375,0.035,0.040625,0.0

In [0]:
#Checking the count of rows and columns in DataFrame

print((FaultDatasetDF.count(), len(FaultDatasetDF.columns)))

(9292, 21)


In [0]:
# Checking the Summary Statistics of DataFrame
display(FaultDatasetDF.describe())

summary,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,fault_detected
count,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0,9292.0
mean,0.3416233049935422,0.3426311612139467,0.3421213812957383,0.3421390712440796,0.3428434405940584,0.3428279366659492,0.3427147008179088,0.3430657756672401,0.3431729915518722,0.3439251842983209,0.3441075320167876,0.3439842404756772,0.3441183275936292,0.3448787599547991,0.3448999475355137,0.3458342525828662,0.3456348875376668,0.3457525290572536,0.3465221427034015,0.346670724817048,0.5
stddev,0.2891948948626078,0.2890875372793958,0.2891642249061693,0.2891635633310729,0.2889646554403878,0.2890889899729543,0.2891948159883224,0.2891918560806545,0.2893401858067147,0.289011538534877,0.2892001448749587,0.2890708129465896,0.289118047014631,0.2889821392646809,0.2891314011350137,0.2888285654988746,0.2889204033670731,0.2891502814843134,0.2887705775702368,0.2890013554393105,0.5000269070362092
min,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.024375,0.025,0.025,0.025,0.024375,0.024375,0.024375,0.024375,0.025,0.0
max,1.0809375,1.2134375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.2134375,1.0809375,1.2134375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0809375,1.0


In [0]:
#Displaying all rows of the "FaultDatasetDF" DataFrame without truncation

FaultDatasetDF.show(truncate = False)

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+
|1        |2        |3        |4        |5        |6        |7        |8        |9        |10       |11       |12       |13       |14       |15       |16       |17       |18       |19       |20       |fault_detected|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+
|0.3503125|0.3496875|0.35     |0.3459375|0.3475   |0.3459375|0.341875 |0.3434375|0.355    |0.3553125|0.3459375|0.3525   |0.3575   |0.3590625|0.35875  |0.3484375|0.3590625|0.35     |0.3559375|0.3490625|0             |
|0.5090625|0.484375 |0.046875 |0.071875 |0.06     |0.0634375|0.0575   |0.0546875|0.0559375|0.058125 |0.0628125|0.065625 |0.0640625|0

In [0]:
# Check the distribution of the target variable
display(FaultDatasetDF.groupBy('fault_detected').count())

fault_detected,count
1,4646
0,4646


Databricks visualization. Run in Databricks to view.

In [0]:
# Getting count of null values in each column

from pyspark.sql.functions import sum, col
null_counts = FaultDatasetDF.select([sum(col('{}'.format(c)).isNull().cast("int")). alias(c) for c in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', 'fault_detected']])
display(null_counts)

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,fault_detected
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
#Using RFormula to preprocess the "FaultDatasetDF" DataFrame
from pyspark.ml.feature import RFormula

preprocess = RFormula(formula ="fault_detected ~ .")

FaultDatasetDF = preprocess.fit(FaultDatasetDF).transform(FaultDatasetDF)

FaultDatasetDF.show(5)

2023/05/04 13:51:26 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'ddf3aad2bea04c589e2dd4e7864eeef6', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+
|        1|        2|        3|        4|        5|        6|        7|        8|        9|       10|       11|       12|       13|       14|       15|       16|       17|       18|       19|       20|fault_detected|            features|label|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+
|0.3

In [0]:
#Splitting the preprocessed DataFrame into training and testing DataFram

(trainingDF, testDF) = FaultDatasetDF.randomSplit([0.7, 0.3], seed=100)

In [0]:
#Displaying the training DataFrame

trainingDF.show()

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+
|        1|        2|        3|        4|        5|        6|        7|        8|        9|       10|       11|       12|       13|       14|       15|       16|       17|       18|       19|       20|fault_detected|            features|label|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+
| 0.024375|     0.03|  0.03375|  0.02625|0.0334375|0.0271875|   0.0275|  0.02625| 0.034375|0.0321875| 0.031875| 0.031875|0.0346875|0.0340625|0.0303125|  0.03375|  0.03125|0.0290625|   0.0325|0.0328125|             1|[0.024375,0.03,0....|  1.0|
|    0.025|0.0334375| 0.

In [0]:
#Display the test DataFrame
testDF.show()

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+
|        1|        2|        3|        4|        5|        6|        7|        8|        9|       10|       11|       12|       13|       14|       15|       16|       17|       18|       19|       20|fault_detected|            features|label|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+
|0.0253125| 0.039375|0.0528125| 0.056875|   0.0725| 0.041875|0.5109375|0.5628125|    0.605|   0.6025|0.6259375|      0.6|0.6090625|0.6034375| 0.594375|0.6021875| 0.580625|0.5703125|0.0503125| 0.069375|             1|[0.0253125,0.0393...|  1.0|
|  0.02625|   0.0325|0.0

In [0]:
#Creating a DecisionTreeClassifier model and fitting it to the training DataFrame

from pyspark.ml.classification import DecisionTreeClassifier

DecisionTree =  DecisionTreeClassifier(labelCol ="label", featuresCol= "features")
model =DecisionTree.fit(trainingDF)






2023/05/04 13:51:38 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '5666d1de9b7c478181c855f345ed9ace', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/04 13:51:57 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
#Using the trained model to make predictions on the testing DataFrame

predictions = model.transform(testDF)

predictions.show()

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+--------------+--------------------+----------+
|        1|        2|        3|        4|        5|        6|        7|        8|        9|       10|       11|       12|       13|       14|       15|       16|       17|       18|       19|       20|fault_detected|            features|label| rawPrediction|         probability|prediction|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+--------------+--------------------+----------+
|0.0253125| 0.039375|0.0528125| 0.056875|   0.0725| 0.041875|0.5109375|0.5628125|    0.605|   0.6025|0.6259375|      0.6|0.6090

In [0]:
#Evaluating the accuracy of the model's predictions

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName ="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Accuracy = %g " % (accuracy))

Accuracy = 0.952432 


In [0]:
#Creating a grid of hyperparameters to tune the DecisionTreeClassifier model

from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder()\
             .addGrid(DecisionTree.maxDepth, [5, 10, 15])\
             .addGrid(DecisionTree.minInstancesPerNode, [1, 5, 10])\
             .addGrid(DecisionTree.impurity, ["gini", "entropy"])\
             .addGrid(DecisionTree.maxBins, [16, 32, 64])\
             .build()

In [0]:
#Performing a train-validation split and tuning the hyperparameters using the grid search

from pyspark.ml.tuning import TrainValidationSplit

tvs = TrainValidationSplit()\
    .setSeed(100)\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(paramGrid)\
    .setEstimator(DecisionTree)\
    .setEvaluator(evaluator)

In [0]:
#Fitting the training data to the grid search model
gridsearchModel = tvs.fit(trainingDF)

2023/05/03 18:46:54 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '43adff764f934152bb466022e761e1b3', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 18:46:55 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'c97d66ec9ef1466694e07ed2b4889996', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 18:46:58 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'e3acc020d3f8486c968e16974d95b999', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 18:47:00 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'cc9056f758bc4650884f19ca597d1800', which will track hyperparameters, performance metrics, model artifacts, and 

In [0]:
#Printing the best hyperparameters of the grid search model
bestModel = gridsearchModel.bestModel

print("parameters for the best model:")
print("MaxDepth parameter:%g" %bestModel.getMaxDepth())
print("MaxBins parameter: %g" %bestModel.getMaxBins())
print("MinInstancesPerNode parameter: %g" %bestModel.getMinInstancesPerNode())
print("Impurity parameter: %s" %bestModel.getImpurity())

parameters for the best model:
MaxDepth parameter:15
MaxBins parameter: 64
MinInstancesPerNode parameter: 10
Impurity parameter: gini


In [0]:
#Calculating accuracy of a model using evaluator and bestModel.transform() method.

accuracy1=evaluator.evaluate(bestModel.transform(testDF))

print("accuracy1 =%g" % (accuracy1))

accuracy1 =0.964685


In [0]:
#Creating a RandomForestClassifier model for classification.

from pyspark.ml.classification import RandomForestClassifier

RandomForest = RandomForestClassifier(labelCol="label", featuresCol="features")

RandomModel = RandomForest.fit(trainingDF)

2023/05/03 18:47:59 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '8d21aa079cea49b4a9eb04c64c80915c', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 18:48:07 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
#Making predictions using the Random Forest Classifier model.

Randompredictions = RandomModel.transform(testDF)
Randompredictions.show()


+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+--------------------+--------------------+----------+
|        1|        2|        3|        4|        5|        6|        7|        8|        9|       10|       11|       12|       13|       14|       15|       16|       17|       18|       19|       20|fault_detected|            features|label|       rawPrediction|         probability|prediction|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+--------------------+--------------------+----------+
|0.0253125| 0.039375|0.0528125| 0.056875|   0.0725| 0.041875|0.5109375|0.5628125|    0.605|   0.6025|0.625937

In [0]:

# Evaluating the model using BinaryClassificationEvaluator

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")

auc = evaluator.evaluate(Randompredictions)

print("Area under ROC: {:.2f}".format(auc))

Area under ROC: 0.98


In [0]:
#Creating a parameter grid for Random Forest Classifier model hyperparameter tuning.

from pyspark.ml.tuning import ParamGridBuilder

paramGrid1 = ParamGridBuilder()\
             .addGrid(RandomForest.maxDepth, [5, 10, 20])\
             .addGrid(RandomForest.impurity, ["gini", "entropy"])\
             .addGrid(RandomForest.maxBins, [16, 32, 64])\
             .addGrid(RandomForest.numTrees, [10, 20, 60])\
             .build()

In [0]:
#Creating a TrainValidationSplit object for hyperparameter tuning using the parameter grid.

from pyspark.ml.tuning import TrainValidationSplit

tvs = TrainValidationSplit()\
    .setSeed(100)\
    .setTrainRatio(0.80)\
    .setEstimatorParamMaps(paramGrid1)\
    .setEstimator(RandomForest)\
    .setEvaluator(evaluator)

In [0]:
#Fitting the TrainValidationSplit object on training data.

gridsearchmodel1 = tvs.fit(trainingDF)

2023/05/03 18:48:56 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '0d76523c592b4c83933c29b2aa60cb13', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 18:54:36 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2023/05/03 18:55:46 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
#Getting the best model from the hyperparameter tuning.

bestModel1 = gridsearchmodel1.bestModel

print("parameters for the best model:")                        
print("MaxDepth parameter: %g" %bestModel1.getMaxDepth())
print("MaxBins parameter: %g" %bestModel1.getMaxBins())
print("NumTrees parameter: %g" %bestModel1.getNumTrees)
print("Impurity parameter: %s" %bestModel1.getImpurity())

parameters for the best model:
MaxDepth parameter: 20
MaxBins parameter: 64
NumTrees parameter: 60
Impurity parameter: entropy


In [0]:
#Calculating the area under ROC curve for the best model using evaluator and bestModel1.transform() method.

auc1 = evaluator.evaluate(bestModel1.transform(testDF))

print("Area under Roc = %g" % (auc1) )

Area under Roc = 0.997676


In [0]:
#TRAINING LOGISTICREGRESSION

In [0]:
from pyspark.ml.classification import LogisticRegression

# Defining  the Logistic Regression model
lr = LogisticRegression(labelCol="label", featuresCol="features")



In [0]:
# training a logistic regression model using the fit() method on a training dataset (trainingDF)

Logistic_m = lr.fit(trainingDF)

2023/05/03 18:56:42 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '090416d79cdc4c13b1620991bde38dbd', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 18:56:47 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
#Making predictions using the Logistic Regression model.

Logisticprediction = Logistic_m .transform(testDF)
Logisticprediction.show()

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+--------------------+--------------------+----------+
|        1|        2|        3|        4|        5|        6|        7|        8|        9|       10|       11|       12|       13|       14|       15|       16|       17|       18|       19|       20|fault_detected|            features|label|       rawPrediction|         probability|prediction|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+--------------------+--------------------+----------+
|0.0253125| 0.039375|0.0528125| 0.056875|   0.0725| 0.041875|0.5109375|0.5628125|    0.605|   0.6025|0.625937

In [0]:
#Calculating the area under ROC curve for Logistic Regression model predictions using BinaryClassificationEvaluator.

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")

auc2 = evaluator.evaluate(Logisticprediction)

print("Area under ROC: {:.2f}".format(auc2))

Area under ROC: 0.93


In [0]:
# Setting hyperparameters to be tested
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.1, 0.2, 0.3]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2, 0.3]) \
    .addGrid(lr.maxIter, [10, 50, 100, 200]) \
    .addGrid(lr.fitIntercept, [True, False]) \
    .build()

In [0]:
# Defining  the TrainValidationSplit object with the logistic regression model, hyperparameter grid, and evaluator

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)


In [0]:
# Fitting the TrainValidationSplit model to the training data

model = tvs.fit(trainingDF)


2023/05/03 18:57:35 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '6f3fc760b85d4d53a35a123a652e9e9b', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 19:04:23 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2023/05/03 19:05:31 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
# Getting the best model from the TrainValidationSplit

bestModel = model.bestModel

In [0]:
# Evaluating the best model on the test data using the evaluator

auc3 = evaluator.evaluate(bestModel.transform(testDF))

In [0]:
# Printing out the area under ROC for the best model
print("Area under ROC 1: {:.2f}".format(auc3))



Area under ROC 1: 0.93


In [0]:
# TRAINING LINEARSVC

In [0]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Defining the SVM model
svm = LinearSVC(labelCol="label", featuresCol="features", maxIter=100, regParam=0.1)


In [0]:

# Fiting the model to the training data
svm_model = svm.fit(trainingDF)

2023/05/03 19:06:21 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '0b22e26dbc874ad580d4abff2c298199', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 19:06:32 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
# Evaluating the model before tuning
predictions = svm_model.transform(testDF)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
auc = evaluator.evaluate(predictions)
print("Accuracy before tuning:", auc)

Accuracy before tuning: 0.8066049203781454


In [0]:
# Defining the hyperparameter grid to search over
param_grid = ParamGridBuilder() \
    .addGrid(svm.maxIter, [10, 50, 100]) \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .build()


In [0]:
# Defining the cross-validator
cv = CrossValidator(estimator=svm, estimatorParamMaps=param_grid, evaluator=BinaryClassificationEvaluator(), numFolds=5)

# Fitting the cross-validator to the training data
cv_model = cv.fit(trainingDF)

# Getting the best SVM model from the cross-validation
best_svm_model = cv_model.bestModel

2023/05/03 19:07:19 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'b4b14cf39dc34b128c371888b4c7a65b', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/05/03 19:13:49 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2023/05/03 19:14:47 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


In [0]:
# Evaluating the model after tuning

predictions = best_svm_model.transform(testDF)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
auc = evaluator.evaluate(predictions)
print("Accuracy after tuning:", auc)


Accuracy after tuning: 0.8073046700890506


In [0]:
#Loading a Logged MLflow Model.
import mlflow
logged_model = 'runs:/55a6939108644610bac8988110380974/model'

# Loading model
loaded_model = mlflow.spark.load_model(logged_model)

# Performng inference via model.transform()
loaded_predictions=loaded_model.transform(testDF)
loaded_predictions.show()

2023/05/03 19:15:37 INFO mlflow.spark: 'runs:/55a6939108644610bac8988110380974/model' resolved as 'dbfs:/databricks/mlflow-tracking/1158369830169773/55a6939108644610bac8988110380974/artifacts/model'
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+--------------------+----------+
|        1|        2|        3|        4|        5|        6|        7|        8|        9|       10|       11|       12|       13|       14|       15|       16|       17|       18|       19|       20|fault_detected|            features|label|       rawPrediction|prediction|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------+--------------------+-----+------