# Modeling and Evaluation
In this phase, I focus on the modeling of various algorithms and subsequently conduct a thorough evaluation to gauge their performance and effectiveness in addressing the problem at hand.


To tackle the challenge of class imbalance, I have employed a weight-based model selection approach. By assigning different weights to the classes during model training, we aim to give more emphasis to the minority class, ensuring a more balanced learning process. This strategic use of weights aids in mitigating the impact of class imbalance and contributes to the overall robustness of the model.







In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, PCA, VectorAssembler, MinMaxScaler
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import Evaluator as evaluator
import pandas as pd
from pyspark.sql.types import *
import time
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

import numpy as np

In [2]:
#creating a spark session
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName('Bosch Project Train') \
    .getOrCreate()

In [3]:
#loding the numeric data
df_numeric = spark.read.csv("pca_transformed.csv", header=True, inferSchema=True)
df_test = spark.read.csv("test_numeric.csv", header=True, inferSchema=True)

In [4]:
df_numeric.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- PC0: double (nullable = true)
 |-- PC1: double (nullable = true)
 |-- PC2: double (nullable = true)
 |-- PC3: double (nullable = true)
 |-- PC4: double (nullable = true)
 |-- PC5: double (nullable = true)
 |-- PC6: double (nullable = true)
 |-- PC7: double (nullable = true)
 |-- PC8: double (nullable = true)
 |-- PC9: double (nullable = true)
 |-- PC10: double (nullable = true)
 |-- PC11: double (nullable = true)
 |-- PC12: double (nullable = true)
 |-- PC13: double (nullable = true)
 |-- PC14: double (nullable = true)
 |-- PC15: double (nullable = true)
 |-- PC16: double (nullable = true)
 |-- PC17: double (nullable = true)
 |-- PC18: double (nullable = true)
 |-- PC19: double (nullable = true)
 |-- PC20: double (nullable = true)
 |-- PC21: double (nullable = true)
 |-- PC22: double (nullable = true)
 |-- PC23: double (nullable = true)
 |-- PC24: double (nullable = true)
 |-- PC25: double (nullable = true)
 |-- PC26: double (nullable = tru

In [40]:
# adding weights to each class
df_numeric = df_numeric.withColumn("weight",F.when(df_numeric["Response"]==0,0.5).otherwise(90))


In [30]:
df_numeric.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- PC0: double (nullable = true)
 |-- PC1: double (nullable = true)
 |-- PC2: double (nullable = true)
 |-- PC3: double (nullable = true)
 |-- PC4: double (nullable = true)
 |-- PC5: double (nullable = true)
 |-- PC6: double (nullable = true)
 |-- PC7: double (nullable = true)
 |-- PC8: double (nullable = true)
 |-- PC9: double (nullable = true)
 |-- PC10: double (nullable = true)
 |-- PC11: double (nullable = true)
 |-- PC12: double (nullable = true)
 |-- PC13: double (nullable = true)
 |-- PC14: double (nullable = true)
 |-- PC15: double (nullable = true)
 |-- PC16: double (nullable = true)
 |-- PC17: double (nullable = true)
 |-- PC18: double (nullable = true)
 |-- PC19: double (nullable = true)
 |-- PC20: double (nullable = true)
 |-- PC21: double (nullable = true)
 |-- PC22: double (nullable = true)
 |-- PC23: double (nullable = true)
 |-- PC24: double (nullable = true)
 |-- PC25: double (nullable = true)
 |-- PC26: double (nullable = tru

In [41]:
assembler = VectorAssembler(inputCols=df_numeric.columns[1:-2], outputCol="features")
assembled_data = assembler.transform(df_numeric)


# Logestic Regression

In [38]:
# Separate the data based on labels
#df_label1 = assembled_data.filter(F.col("Response") == 1)
#df_label2 = assembled_data.filter(F.col("Response") == 0)


# Perform train-test split for each label
#train_label1, test_label1 = df_label1.randomSplit([0.9, 0.1], seed=42)
#train_label2, test_label2 = df_label2.randomSplit([0.9, 0.1], seed=42)

#train_data = train_label1.union(train_label2)
# Concatenate the test sets
#test_data = test_label1.union(test_label2)

# Create a Logistic Regression model
log_reg = LogisticRegression(featuresCol='features', labelCol='Response',weightCol="weight")

# Specify parameter grid for tuning
paramGrid = ParamGridBuilder()\
    .addGrid(log_reg.elasticNetParam, [1, 0, 0.5])\
    .addGrid(log_reg.regParam, [0.0, 0.2, 0.4, 0.6, 0.8, 1])\
    .addGrid(log_reg.maxIter, [10, 15, 20])\
    .build()

# Create a CrossValidator
evaluator_lr = MulticlassClassificationEvaluator(weightCol="weight",labelCol='Response')
crossval = CrossValidator(estimator=log_reg,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_lr,
                          numFolds=3)

# Fit the model on the training data
start_time = time.time()
cvModel = crossval.fit(assembled_data)
end_time = time.time()

training_time = end_time - start_time
print("The time taken to train the data is: %0.3f seconds" % training_time)


The time taken to train the data is: 152.272 seconds


In [27]:
best_model = cvModel.bestModel

# Print or access the best parameters
best_elastic_net_param = best_model.getOrDefault("elasticNetParam")
best_reg_param = best_model.getOrDefault("regParam")
best_max_iter = best_model.getOrDefault("maxIter")

print("Best Elastic Net Parameter:", best_elastic_net_param)
print("Best Regularization Parameter:", best_reg_param)
print("Best Max Iterations Parameter:", best_max_iter)

Best Elastic Net Parameter: 1.0
Best Regularization Parameter: 0.0
Best Max Iterations Parameter: 10


In [78]:
log_reg = LogisticRegression(featuresCol='features', labelCol='Response',weightCol="weight",elasticNetParam=1,regParam=0,maxIter=10)
model = log_reg.fit(assembled_data)
prediction = model.transform(assembled_data)
output = prediction.select("Response", "probability", "prediction")
conf_matrix = prediction.groupBy("Response", "prediction").count()
conf_matrix.show()


+--------+----------+------+
|Response|prediction| count|
+--------+----------+------+
|       1|       0.0|  3690|
|       0|       0.0|722631|
|       1|       1.0|  3189|
|       0|       1.0|454237|
+--------+----------+------+



In [85]:
evaluator = MulticlassClassificationEvaluator(metricName="accuracy",labelCol='Response',predictionCol="prediction")

# Calculate accuracy
accuracy = evaluator.evaluate(prediction)
accuracy

0.6131546690297842

In [88]:
tp = 3189
tn = 722631
fp = 454237
fn = 3690
mcc = (tp * tn - fp * fn) / ((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))**0.5
mcc

0.012115476519323112

# Random Forest 

In [24]:
rf = RandomForestClassifier(featuresCol="features", labelCol="Response", bootstrap= True, weightCol="weight",seed=22,featureSubsetStrategy="log2")

random_forest_paramGrid = ParamGridBuilder()\
               .addGrid(rf.numTrees, [60,100,120])\
               .addGrid(rf.maxDepth, [10,15,20])\
               .build()

# Create a CrossValidator
rf_evaluator = MulticlassClassificationEvaluator(weightCol="weight",labelCol='Response')
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=random_forest_paramGrid,
                          evaluator=rf_evaluator,
                          numFolds=2)

# Fit the model on the training data
start_time = time.time()
cvModel = crossval.fit(assembled_data)
end_time = time.time()

In [18]:
rf = RandomForestClassifier(featuresCol="features", labelCol="Response", bootstrap= True, weightCol="weight",numTrees=100,maxDepth=30, featureSubsetStrategy ="log2")
model = rf.fit(assembled_data)

In [19]:
predic = model.transform(assembled_data)

In [20]:
predic.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- PC0: double (nullable = true)
 |-- PC1: double (nullable = true)
 |-- PC2: double (nullable = true)
 |-- PC3: double (nullable = true)
 |-- PC4: double (nullable = true)
 |-- PC5: double (nullable = true)
 |-- PC6: double (nullable = true)
 |-- PC7: double (nullable = true)
 |-- PC8: double (nullable = true)
 |-- PC9: double (nullable = true)
 |-- PC10: double (nullable = true)
 |-- PC11: double (nullable = true)
 |-- PC12: double (nullable = true)
 |-- PC13: double (nullable = true)
 |-- PC14: double (nullable = true)
 |-- PC15: double (nullable = true)
 |-- PC16: double (nullable = true)
 |-- PC17: double (nullable = true)
 |-- PC18: double (nullable = true)
 |-- PC19: double (nullable = true)
 |-- PC20: double (nullable = true)
 |-- PC21: double (nullable = true)
 |-- PC22: double (nullable = true)
 |-- PC23: double (nullable = true)
 |-- PC24: double (nullable = true)
 |-- PC25: double (nullable = true)
 |-- PC26: double (nullable = tru

In [21]:
conf_matrix = predic.groupBy("Response", "prediction").count()
conf_matrix.show()


+--------+----------+------+
|Response|prediction| count|
+--------+----------+------+
|       1|       0.0|  3045|
|       0|       0.0|943388|
|       1|       1.0|  3834|
|       0|       1.0|233480|
+--------+----------+------+



In [36]:
tp = 3834
tn = 943388
fp = 233480
fn = 3045
mcc = (tp * tn - fp * fn) / ((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))**0.5
mcc

0.06814953676418936

# Neural Network 

In [2]:
from keras.layers import Dense, Dropout, Activation
from keras.models import Sequential
import optuna
from optuna.samplers import TPESampler
import tensorflow as tf
from sklearn.metrics import f1_score


In [3]:
df = pd.read_csv('pca_transformed.csv')

In [4]:
from sklearn.model_selection import train_test_split

# Assuming df is your DataFrame with features and labels
X = df.iloc[:, 1:-1]  # Features
y = df.iloc[:, -1]    # Labels

# Split the data with stratification
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)


In [49]:
from keras import backend as K

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def mcc(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    true_negatives = K.sum(K.round(K.clip((1 - y_true) * (1 - y_pred), 0, 1)))
    false_positives = K.sum(K.round(K.clip((1 - y_true) * y_pred, 0, 1)))
    false_negatives = K.sum(K.round(K.clip(y_true * (1 - y_pred), 0, 1)))

    numerator = true_positives * true_negatives - false_positives * false_negatives
    denominator = K.sqrt((true_positives + false_positives) * (true_positives + false_negatives) * (true_negatives + false_positives) * (true_negatives + false_negatives) + K.epsilon())

    mcc_value = numerator / denominator
    return mcc_value

def log(study, trial):
      print(f"Trial No.={trial.number}, HP_Set={trial.params}, \
            Score={trial.value}")
      print(f"Best Value ={study.best_value}")
    
def objective_func(trial):
    
    model = Sequential()
    hidden_layer_unit_choice = [64, 256, 512, 1024]
    hidden_layers = trial.suggest_int('hidden_layers', 5, 15)
    model.add(Dense(units=trial.suggest_categorical('layer1', [8, 16]),
                 input_shape=((30,)),
                 name='dense1'))
    model.add(Activation(activation=trial.suggest_categorical(
                                                f'activation1',
                                                       ['relu',
                                                      'elu'])))
    for i in range(1, hidden_layers):
        model.add(Dense(units=trial.suggest_categorical(f'layer{i+1}',
                                              hidden_layer_unit_choice)))
        model.add(Dropout(trial.suggest_uniform(f'dropout{i+1}', 0, 0.8)))
        
        model.add(Activation(activation=trial.suggest_categorical(
                                            f'activation{i+1}',
                                                       ['relu',
                                                      'elu'])))
    model.add(Dense(1))
    model.add(Activation(activation='sigmoid'))
    model.compile(loss='binary_crossentropy',
                  metrics=[mcc],
                  optimizer=trial.suggest_categorical('optimizer',
                  ['rmsprop', 'adam', 'sgd']))
    stop_on_negative_mcc = StopOnNegativeMCC(threshold=0.0)
    result = model.fit(X_train, y_train,
                    batch_size=4,
                    epochs=1,
                    validation_split=0.2,class_weight={0:0.5,1:86}
                                    )
     
    validation_f1_m = np.amax(result.history['mcc'])        
      
      
    print('validation mcc :', validation_f1_m)
    return validation_f1_m

study = optuna.create_study(direction='maximize', sampler=TPESampler())
study.optimize(objective_func, n_trials=50,callbacks=[log])
best_trial = study.best_trial.value
print(f"Best trial mcc : {best_trial}")
print("parameters for best trail are :")
for key, value in study.best_trial.params.items():
   print(f"{key}: {value}")

[I 2024-01-20 12:52:01,935] A new study created in memory with name: no-name-8f144bec-cd48-46ee-bde7-e4bf9b38be0b
  model.add(Dropout(trial.suggest_uniform(f'dropout{i+1}', 0, 0.8)))




[I 2024-01-20 13:03:35,874] Trial 0 finished with value: 1.4079520042287186e-05 and parameters: {'hidden_layers': 8, 'layer1': 8, 'activation1': 'elu', 'layer2': 512, 'dropout2': 0.15616557604249268, 'activation2': 'relu', 'layer3': 64, 'dropout3': 0.2763162500501685, 'activation3': 'elu', 'layer4': 256, 'dropout4': 0.6949797984513861, 'activation4': 'relu', 'layer5': 512, 'dropout5': 0.7773205336868814, 'activation5': 'elu', 'layer6': 512, 'dropout6': 0.292025652342486, 'activation6': 'relu', 'layer7': 64, 'dropout7': 0.07021480764410991, 'activation7': 'elu', 'layer8': 64, 'dropout8': 0.4039501306893736, 'activation8': 'elu', 'optimizer': 'rmsprop'}. Best is trial 0 with value: 1.4079520042287186e-05.


validation F1 score: 1.4079520042287186e-05
Trial No.=0, HP_Set={'hidden_layers': 8, 'layer1': 8, 'activation1': 'elu', 'layer2': 512, 'dropout2': 0.15616557604249268, 'activation2': 'relu', 'layer3': 64, 'dropout3': 0.2763162500501685, 'activation3': 'elu', 'layer4': 256, 'dropout4': 0.6949797984513861, 'activation4': 'relu', 'layer5': 512, 'dropout5': 0.7773205336868814, 'activation5': 'elu', 'layer6': 512, 'dropout6': 0.292025652342486, 'activation6': 'relu', 'layer7': 64, 'dropout7': 0.07021480764410991, 'activation7': 'elu', 'layer8': 64, 'dropout8': 0.4039501306893736, 'activation8': 'elu', 'optimizer': 'rmsprop'},             Score=1.4079520042287186e-05
Best Value =1.4079520042287186e-05


  model.add(Dropout(trial.suggest_uniform(f'dropout{i+1}', 0, 0.8)))




[W 2024-01-20 13:28:30,215] Trial 1 failed with parameters: {'hidden_layers': 10, 'layer1': 8, 'activation1': 'elu', 'layer2': 1024, 'dropout2': 0.6652835216312506, 'activation2': 'relu', 'layer3': 256, 'dropout3': 0.3158502490458037, 'activation3': 'elu', 'layer4': 64, 'dropout4': 0.6787881779742483, 'activation4': 'relu', 'layer5': 1024, 'dropout5': 0.056928539445858566, 'activation5': 'relu', 'layer6': 512, 'dropout6': 0.029581655580786227, 'activation6': 'elu', 'layer7': 512, 'dropout7': 0.1719089228674948, 'activation7': 'relu', 'layer8': 256, 'dropout8': 0.043020760448456846, 'activation8': 'relu', 'layer9': 512, 'dropout9': 0.7627237194312421, 'activation9': 'relu', 'layer10': 512, 'dropout10': 0.5631044770428193, 'activation10': 'relu', 'optimizer': 'adam'} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "C:\Users\akhil\anaconda3\Lib\site-packages\optuna\study\_optimize.py", line 200, in _run_trial
    value_or_values = func(trial)

KeyboardInterrupt: 

#### The training duration for the neural network is excessively long, and the achieved Matthews Correlation Coefficient (MCC) score is notably low, approximately 5 x 10^-5.

# XGBoost


In [43]:
from xgboost.spark import SparkXGBClassifier

regressor = SparkXGBClassifier(features_col="features", label_col="Response", device="cuda",weight_col="weight")

# train and return the model
model = regressor.fit(assembled_data)

# predict on test data
predict_df = model.transform(assembled_data)

2024-01-20 21:36:07,323 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cuda', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-01-20 21:36:09,011 INFO SparkXGBClassifier: _skip_stage_level_scheduling Stage-level scheduling in xgboost requires spark standalone or local-cluster mode
2024-01-20 21:36:17,488 INFO XGBoost-PySpark: _fit Finished xgboost training!


In [44]:
conf_matrix = predict_df.groupBy("Response", "prediction").count()
conf_matrix.show()

+--------+----------+------+
|Response|prediction| count|
+--------+----------+------+
|       1|       0.0|  1998|
|       0|       0.0|855091|
|       1|       1.0|  4881|
|       0|       1.0|321777|
+--------+----------+------+



In [45]:
tp = 4881
tn = 855091
fp = 321777
fn = 1998
mcc = (tp * tn - fp * fn) / ((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))**0.5
mcc

0.07416270967845472

#### Distributed XGBoost implemented with PySpark demonstrates superior performance compared to other classification algorithms, boasting an impressive Matthews Correlation Coefficient (MCC) of 0.075 with higher True Positive Rate .