In [10]:
# Import required libraries
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier, MultilayerPerceptronClassifier, NaiveBayes, OneVsRest
from pyspark.ml.classification import GBTClassifier  # If using GBT in OneVsRest
import mlflow
import pandas as pd

In [11]:
# Configure SparkSession
spark = SparkSession.builder \
    .appName("Spark_MLlib_MultiClass_Models") \
    .master("local[*]") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "6g") \
    .getOrCreate()

In [12]:
# Configure MLflow for local tracking
mlflow.set_tracking_uri("file:./mlruns")  # Use local directory for tracking
experiment_name = "MLlib_MultiClass_Classifier"

# Create the experiment if it doesn't exist
if not mlflow.get_experiment_by_name(experiment_name):
    mlflow.create_experiment(experiment_name)
mlflow.set_experiment(experiment_name)

# Configure paths for saving models and logs
MODEL_SAVE_DIR = "./models"
LOG_DIR = "./logs"

os.makedirs(MODEL_SAVE_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

In [13]:
# Load Reference Table
REFERENCE_TABLE_PATH = "../EDA/Results/reference_table_real.csv"
reference_table_missing_values_real = spark.read.csv(REFERENCE_TABLE_PATH, header=True)

# Sort by 'Value Type' column (optional)
reference_table_missing_values_real = reference_table_missing_values_real.orderBy("Value Type")
reference_table_missing_values_real.show(truncate=False, n=99)

+-------------+------------------------------------------------------------+--------------------+-----------+--------------------+-------------------------+
|Tag          |Name                                                        |Unit                |Value Type |Unique Values (Real)|Missing Values (%) (Real)|
+-------------+------------------------------------------------------------+--------------------+-----------+--------------------+-------------------------+
|ESTADO-DHSV  |State of the DHSV (downhole safety valve)                   |[0, 0.5, 1]         |Categorical|2                   |57.8                     |
|ESTADO-M1    |State of the PMV (production master valve)                  |[0, 0.5, 1]         |Categorical|2                   |45.9                     |
|ESTADO-PXO   |State of the PXO (pig-crossover) valve                      |[0, 0.5, 1]         |Categorical|2                   |44.7                     |
|ESTADO-M2    |State of the AMV (annulus master valve)    

                                                                                

In [14]:
# Read datasets into PySpark DataFrames
data_dir = "../Cleaning & Preparation/Train Test (Scaled) Data"
train_data_spark = spark.read.parquet(os.path.join(data_dir, "scaled_train_data.parquet"))
test_data_spark = spark.read.parquet(os.path.join(data_dir, "scaled_test_data.parquet"))

# Define columns to exclude (non-feature columns)
exclude_columns = ['Instance', 'label', 'timestamp', 'class', 'target', 'state', 'well', 'id', 'DataType', '__index_level_0__']

# Extract all columns from the training data
all_columns = train_data_spark.columns

# Define feature names by excluding non-feature columns
feature_names = [col_name for col_name in all_columns if col_name not in exclude_columns]

target_name = "class"  # Set target variable to 'class'

# Print the feature names
print("Feature Names:")
for feature in feature_names:
    print(feature)

Feature Names:
ABER-CKGL_missing
ABER-CKP_missing
ESTADO-DHSV_missing
ESTADO-M1_missing
ESTADO-PXO_missing
ESTADO-M2_missing
ESTADO-SDV-GL_missing
ESTADO-SDV-P_missing
ESTADO-W1_missing
ESTADO-W2_missing
P-ANULAR_missing
ESTADO-XO_missing
P-JUS-CKGL_missing
P-MON-CKP_missing
P-JUS-CKP_missing
P-MON-CKGL_missing
P-PDG_missing
P-TPT_missing
QGL_missing
T-JUS-CKP_missing
T-MON-CKP_missing
T-PDG_missing
T-TPT_missing
ABER-CKGL
ABER-CKP
ESTADO-DHSV
ESTADO-M1
ESTADO-PXO
ESTADO-M2
ESTADO-SDV-GL
ESTADO-SDV-P
ESTADO-W1
ESTADO-W2
P-ANULAR
ESTADO-XO
P-JUS-CKGL
P-MON-CKP
P-JUS-CKP
P-MON-CKGL
P-PDG
P-TPT
QGL
T-JUS-CKP
T-MON-CKP
T-PDG
T-TPT


In [15]:
# Define the list of classes to keep
classes_to_keep = [3, 4, 101, 102, 105, 106, 107, 108, 109]

# Function to transform 'class' column
def transform_class_column(df):
    # Replace values not in the list with 0
    df = df.withColumn(target_name, when(col(target_name).isin(classes_to_keep), col(target_name)).otherwise(0))
    # Handle missing or invalid values in 'class'
    df = df.na.fill({target_name: 0})
    return df

# Transform 'class' column in each dataset
train_data_spark = transform_class_column(train_data_spark)
test_data_spark = transform_class_column(test_data_spark)

# Fit StringIndexer on training data only
string_indexer = StringIndexer(inputCol=target_name, outputCol="indexed_" + target_name, handleInvalid='keep')
string_indexer_model = string_indexer.fit(train_data_spark)

# Transform datasets
train_data_spark = string_indexer_model.transform(train_data_spark)
test_data_spark = string_indexer_model.transform(test_data_spark)

# Assemble features into a feature vector in each dataset
assembler = VectorAssembler(inputCols=feature_names, outputCol="features")

train_data = assembler.transform(train_data_spark)
test_data = assembler.transform(test_data_spark)

# Verify that 'Instance's are unique to each dataset
train_instances = train_data.select('Instance').distinct()
test_instances = test_data.select('Instance').distinct()

common_instances_train_test = train_instances.join(test_instances, on='Instance', how='inner')

if common_instances_train_test.count() > 0:
    print("There are common Instances between train and test datasets!")
else:
    print("No common Instances between train and test datasets.")



No common Instances between train and test datasets.


                                                                                

In [7]:
# Import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier

# Define the classifier
rf_classifier = RandomForestClassifier(
    featuresCol="features",
    labelCol="indexed_" + target_name,  # Use 'indexed_class' as label column
    predictionCol="prediction",
    probabilityCol="probability",
    rawPredictionCol="rawPrediction",
    maxDepth=10,
    numTrees=100,
    seed=123
)

# Start an MLflow run for Random Forest
with mlflow.start_run(run_name="Random_Forest_Classifier"):
    # Train the model on the training set
    rf_model = rf_classifier.fit(train_data)
    
    # Predict on the test set
    rf_test_predictions = rf_model.transform(test_data)
    
    # Evaluate the model on the test set
    evaluator = MulticlassClassificationEvaluator(
        labelCol="indexed_" + target_name,
        predictionCol="prediction",
        metricName="accuracy"
    )
    
    accuracy_test = evaluator.evaluate(rf_test_predictions)
    f1_score_test = evaluator.evaluate(rf_test_predictions, {evaluator.metricName: "f1"})
    weighted_precision_test = evaluator.evaluate(rf_test_predictions, {evaluator.metricName: "weightedPrecision"})
    weighted_recall_test = evaluator.evaluate(rf_test_predictions, {evaluator.metricName: "weightedRecall"})
    
    print(f"Random Forest - Test Accuracy: {accuracy_test}")
    print(f"Random Forest - Test F1 Score: {f1_score_test}")
    print(f"Random Forest - Test Weighted Precision: {weighted_precision_test}")
    print(f"Random Forest - Test Weighted Recall: {weighted_recall_test}")
    
    # Compute confusion matrix
    confusion_matrix_test_rf = rf_test_predictions.groupBy("indexed_" + target_name, "prediction").count().orderBy("indexed_" + target_name, "prediction")
    print("Random Forest - Test Confusion Matrix:")
    confusion_matrix_test_rf.show()
    
    # Save confusion matrix to a file
    confusion_matrix_rf_pd = confusion_matrix_test_rf.toPandas()
    confusion_matrix_rf_path = os.path.join(LOG_DIR, "confusion_matrix_test_rf.csv")
    confusion_matrix_rf_pd.to_csv(confusion_matrix_rf_path, index=False)
    mlflow.log_artifact(confusion_matrix_rf_path, artifact_path="confusion_matrix")
    
    # Capture feature importance
    importances_rf = rf_model.featureImportances.toArray()
    feature_importance_rf = pd.DataFrame({
        "feature": feature_names,
        "importance": importances_rf
    }).sort_values(by="importance", ascending=False)
    
    # Save feature importances to a file
    feature_importance_rf_path = os.path.join(LOG_DIR, "feature_importance_rf.csv")
    feature_importance_rf.to_csv(feature_importance_rf_path, index=False)
    mlflow.log_artifact(feature_importance_rf_path, artifact_path="feature_importance")
    
    # Save the model
    model_path_rf = os.path.join(MODEL_SAVE_DIR, "random_forest_model")
    rf_model.write().overwrite().save(model_path_rf)
    mlflow.log_artifact(model_path_rf, artifact_path="models")
    
    # Log parameters and metrics to MLflow
    mlflow.log_param("model_type", "RandomForestClassifier")
    mlflow.log_param("maxDepth", rf_classifier.getOrDefault("maxDepth"))
    mlflow.log_param("numTrees", rf_classifier.getOrDefault("numTrees"))
    
    mlflow.log_metric("accuracy_test", accuracy_test)
    mlflow.log_metric("f1_score_test", f1_score_test)
    mlflow.log_metric("weighted_precision_test", weighted_precision_test)
    mlflow.log_metric("weighted_recall_test", weighted_recall_test)
    
    # End the MLflow run
    mlflow.end_run()
    
print("\nRandom Forest Classifier processing complete.")

24/11/27 16:23:28 WARN MemoryStore: Not enough space to cache rdd_68_10 in memory! (computed 480.0 MiB so far)
24/11/27 16:23:28 WARN MemoryStore: Not enough space to cache rdd_68_6 in memory! (computed 480.0 MiB so far)
24/11/27 16:23:28 WARN MemoryStore: Not enough space to cache rdd_68_1 in memory! (computed 480.0 MiB so far)
24/11/27 16:23:28 WARN MemoryStore: Not enough space to cache rdd_68_12 in memory! (computed 480.0 MiB so far)
24/11/27 16:23:28 WARN MemoryStore: Not enough space to cache rdd_68_14 in memory! (computed 480.0 MiB so far)
24/11/27 16:23:28 WARN BlockManager: Persisting block rdd_68_10 to disk instead.
24/11/27 16:23:28 WARN BlockManager: Persisting block rdd_68_6 to disk instead.
24/11/27 16:23:28 WARN BlockManager: Persisting block rdd_68_1 to disk instead.
24/11/27 16:23:28 WARN BlockManager: Persisting block rdd_68_14 to disk instead.
24/11/27 16:23:28 WARN BlockManager: Persisting block rdd_68_12 to disk instead.
24/11/27 16:23:28 WARN MemoryStore: Not enou

Random Forest - Test Accuracy: 0.8656467367308538
Random Forest - Test F1 Score: 0.8542216903904085
Random Forest - Test Weighted Precision: 0.8496324148092613
Random Forest - Test Weighted Recall: 0.8656467367308538
Random Forest - Test Confusion Matrix:


24/11/27 17:03:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/11/27 17:03:28 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/11/27 17:04:44 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

+-------------+----------+-------+
|indexed_class|prediction|  count|
+-------------+----------+-------+
|          0.0|       0.0|3317039|
|          0.0|       1.0| 247960|
|          0.0|       2.0|  30808|
|          0.0|       3.0| 111560|
|          0.0|       4.0|  45190|
|          0.0|       5.0|   7227|
|          0.0|       7.0|   5775|
|          1.0|       1.0|1281249|
|          2.0|       0.0|   9491|
|          2.0|       2.0| 518240|
|          3.0|       0.0| 264446|
|          3.0|       3.0| 226256|
|          3.0|       4.0|   2725|
|          4.0|       0.0|  17976|
|          4.0|       4.0| 107543|
|          5.0|       0.0|   3678|
|          5.0|       5.0|  39512|
|          6.0|       0.0|  82179|
|          6.0|       4.0|   8611|
|          7.0|       0.0|   5845|
+-------------+----------+-------+
only showing top 20 rows



24/11/27 17:04:45 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/11/27 17:06:01 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/11/27 17:06:01 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/11/27 17:06:01 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB



Random Forest Classifier processing complete.


In [None]:
# Import MultilayerPerceptronClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Define the classifier
num_classes_mlp = train_data.select("indexed_" + target_name).distinct().count()
layers_mlp = [len(feature_names), 100, 50, num_classes_mlp]

mlp_classifier = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="indexed_" + target_name,
    predictionCol="prediction",
    maxIter=100,
    layers=layers_mlp,
    blockSize=128,
    seed=123
)

# Start an MLflow run for Multilayer Perceptron
with mlflow.start_run(run_name="Multilayer_Perceptron_Classifier"):
    # Train the model on the training set
    mlp_model = mlp_classifier.fit(train_data)
    
    # Predict on the test set
    mlp_test_predictions = mlp_model.transform(test_data)
    
    # Evaluate the model on the test set
    evaluator = MulticlassClassificationEvaluator(
        labelCol="indexed_" + target_name,
        predictionCol="prediction",
        metricName="accuracy"
    )
    
    accuracy_test_mlp = evaluator.evaluate(mlp_test_predictions)
    f1_score_test_mlp = evaluator.evaluate(mlp_test_predictions, {evaluator.metricName: "f1"})
    weighted_precision_test_mlp = evaluator.evaluate(mlp_test_predictions, {evaluator.metricName: "weightedPrecision"})
    weighted_recall_test_mlp = evaluator.evaluate(mlp_test_predictions, {evaluator.metricName: "weightedRecall"})
    
    print(f"Multilayer Perceptron - Test Accuracy: {accuracy_test_mlp}")
    print(f"Multilayer Perceptron - Test F1 Score: {f1_score_test_mlp}")
    print(f"Multilayer Perceptron - Test Weighted Precision: {weighted_precision_test_mlp}")
    print(f"Multilayer Perceptron - Test Weighted Recall: {weighted_recall_test_mlp}")
    
    # Compute confusion matrix
    confusion_matrix_test_mlp = mlp_test_predictions.groupBy("indexed_" + target_name, "prediction").count().orderBy("indexed_" + target_name, "prediction")
    print("Multilayer Perceptron - Test Confusion Matrix:")
    confusion_matrix_test_mlp.show()
    
    # Save confusion matrix to a file
    confusion_matrix_mlp_pd = confusion_matrix_test_mlp.toPandas()
    confusion_matrix_mlp_path = os.path.join(LOG_DIR, "confusion_matrix_test_mlp.csv")
    confusion_matrix_mlp_pd.to_csv(confusion_matrix_mlp_path, index=False)
    mlflow.log_artifact(confusion_matrix_mlp_path, artifact_path="confusion_matrix")
    
    # Note: Multilayer Perceptron does not provide feature importances
    
    # Save the model
    model_path_mlp = os.path.join(MODEL_SAVE_DIR, "mlp_model")
    mlp_model.write().overwrite().save(model_path_mlp)
    mlflow.log_artifact(model_path_mlp, artifact_path="models")
    
    # Log parameters and metrics to MLflow
    mlflow.log_param("model_type", "MultilayerPerceptronClassifier")
    mlflow.log_param("layers", layers_mlp)
    mlflow.log_param("maxIter", mlp_classifier.getOrDefault("maxIter"))
    mlflow.log_param("blockSize", mlp_classifier.getOrDefault("blockSize"))
    
    mlflow.log_metric("accuracy_test", accuracy_test_mlp)
    mlflow.log_metric("f1_score_test", f1_score_test_mlp)
    mlflow.log_metric("weighted_precision_test", weighted_precision_test_mlp)
    mlflow.log_metric("weighted_recall_test", weighted_recall_test_mlp)
    
    # End the MLflow run
    mlflow.end_run()
    
print("\nMultilayer Perceptron Classifier processing complete.")

24/11/27 17:17:43 WARN MemoryStore: Not enough space to cache rdd_266_4 in memory! (computed 417.6 MiB so far)
24/11/27 17:17:43 WARN BlockManager: Persisting block rdd_266_4 to disk instead.
24/11/27 17:17:45 WARN MemoryStore: Not enough space to cache rdd_266_10 in memory! (computed 417.6 MiB so far)
24/11/27 17:17:45 WARN BlockManager: Persisting block rdd_266_10 to disk instead.
24/11/27 17:17:45 WARN MemoryStore: Not enough space to cache rdd_266_2 in memory! (computed 417.6 MiB so far)
24/11/27 17:17:45 WARN BlockManager: Persisting block rdd_266_2 to disk instead.
24/11/27 17:17:46 WARN MemoryStore: Not enough space to cache rdd_266_7 in memory! (computed 417.6 MiB so far)
24/11/27 17:17:46 WARN BlockManager: Persisting block rdd_266_7 to disk instead.
24/11/27 17:17:47 WARN MemoryStore: Not enough space to cache rdd_266_15 in memory! (computed 417.6 MiB so far)
24/11/27 17:17:47 WARN BlockManager: Persisting block rdd_266_15 to disk instead.
24/11/27 17:18:20 WARN MemoryStore: 

Multilayer Perceptron - Test Accuracy: 0.8783463749461456
Multilayer Perceptron - Test F1 Score: 0.8776945886929792
Multilayer Perceptron - Test Weighted Precision: 0.8859232212717691
Multilayer Perceptron - Test Weighted Recall: 0.8783463749461455
Multilayer Perceptron - Test Confusion Matrix:


                                                                                

+-------------+----------+-------+
|indexed_class|prediction|  count|
+-------------+----------+-------+
|          0.0|       0.0|3211662|
|          0.0|       1.0| 220687|
|          0.0|       2.0|  32102|
|          0.0|       3.0| 248666|
|          0.0|       4.0|  21585|
|          0.0|       5.0|   4846|
|          0.0|       6.0|  16061|
|          0.0|       7.0|   9950|
|          1.0|       0.0|  14065|
|          1.0|       1.0|1267184|
|          2.0|       0.0|   6951|
|          2.0|       2.0| 520780|
|          3.0|       0.0|  85387|
|          3.0|       3.0| 407235|
|          3.0|       4.0|    805|
|          4.0|       0.0|  21386|
|          4.0|       1.0|   2996|
|          4.0|       3.0|   1059|
|          4.0|       4.0| 100078|
|          5.0|       0.0|   5640|
+-------------+----------+-------+
only showing top 20 rows



                                                                                


Multilayer Perceptron Classifier processing complete.


24/11/28 08:22:02 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 324274 ms exceeds timeout 120000 ms
24/11/28 08:22:02 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/28 08:22:05 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
# stop the SparkSession
spark.stop()