## Define ML Classification models for the detection of Stress conditions

### Prepare Spark environment 

In [35]:
import findspark
findspark.init()

In [36]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [37]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = ' pyspark-shell'

In [38]:
from pyspark.sql import SparkSession

# Set Spark configurations and create a Spark session
spark = (SparkSession.builder
         .appName("StressAlert.ipynb")
         .config("spark.driver.memory", "4g")
         .getOrCreate())

In [39]:
from pyspark.sql.functions import col, collect_list, rand, when
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

### Define columns to drop

In [40]:
columns_to_drop = ["subject id", "SSSQ class", "SSSQ Label"]

### Load and analyze Electrodermal Activity dataset

In [41]:
eda_path = "wesad-chest-combined-classification-eda.csv"
df_eda = spark.read.csv(eda_path, header=True, inferSchema=True)
df_eda.limit(10).toPandas()

Unnamed: 0,MEAN,MAX,MIN,RANGE,KURT,SKEW,MEAN_1ST_GRAD,STD_1ST_GRAD,MEAN_2ND_GRAD,STD_2ND_GRAD,...,RMSC_YEO_JONSON,STD_1ST_GRAD_YEO_JONSON,RANGE_SQRT,RMSC_SQUARED,MEAN_2ND_GRAD_CUBE,INSC_APSC,condition,SSSQ class,SSSQ Label,condition label
0,1.872854,2.114105,1.421738,0.692368,-0.937305,0.36963,1.570384e-06,0.007657,1.139868e-07,0.006519,...,0.65499,0.032437,0.832086,3.515246,1.481029e-21,111883.868597,baseline,low,0,0
1,3.54041,3.94249,3.097916,0.844574,-1.006881,-0.097972,-2.68482e-06,0.006157,-2.783821e-07,0.005311,...,0.78436,0.019009,0.919007,12.55839,-2.157366e-20,59202.342697,stress,low,0,2
2,4.551093,5.1651,3.959274,1.205826,-0.730921,-0.41641,-3.86556e-06,0.006059,-3.633045e-09,0.005318,...,0.987237,0.014851,1.098101,20.775159,-4.795262e-26,46003.472148,stress,low,0,2
3,4.307184,5.098724,3.757095,1.341629,-0.856817,0.504575,5.04085e-06,0.006952,-1.026335e-07,0.005975,...,4.458571,0.002856,1.158287,18.639046,-1.081105e-21,48527.628638,baseline,high,2,0
4,5.51004,5.620193,5.24025,0.379944,-0.743814,-0.40243,-2.761114e-07,0.00436,1.498631e-08,0.003777,...,0.849475,0.001257,0.616396,30.361335,3.365768e-24,38111.248823,baseline,low,0,0
5,6.323011,6.559753,6.009293,0.550461,-1.024826,0.539063,1.32152e-06,0.003533,-5.994524e-08,0.003035,...,2.512088,0.000491,0.74193,39.987024,-2.154092e-22,33206.581881,baseline,medium,1,0
6,0.821117,0.964355,0.515747,0.448608,4.315578,1.165453,-1.771109e-07,0.009361,9.536743e-09,0.008126,...,0.415702,0.116791,0.669782,0.674449,8.673617e-25,255667.415736,baseline,low,0,0
7,1.711677,1.923752,1.353073,0.570679,-0.871243,-0.127887,-1.047225e-06,0.006417,-9.763808e-08,0.005471,...,0.663279,0.041374,0.755433,2.933405,-9.30803e-22,122537.528919,baseline,medium,1,0
8,5.376535,5.522537,5.158234,0.364304,-0.676116,0.54372,-6.530398e-07,0.004483,-2.316066e-08,0.003853,...,0.846229,0.001261,0.603576,28.909055,-1.2423760000000002e-23,39056.014078,baseline,low,0,0
9,3.646182,3.837204,3.387833,0.449371,0.251447,-0.815357,-6.67572e-07,0.005805,2.81561e-08,0.005085,...,0.840716,0.030505,0.670352,13.297762,2.23212e-23,57580.990827,stress,medium,1,2


In [42]:
df_eda = df_eda.drop(*columns_to_drop)

In [43]:
df_eda.printSchema()

root
 |-- MEAN: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MIN: double (nullable = true)
 |-- RANGE: double (nullable = true)
 |-- KURT: double (nullable = true)
 |-- SKEW: double (nullable = true)
 |-- MEAN_1ST_GRAD: double (nullable = true)
 |-- STD_1ST_GRAD: double (nullable = true)
 |-- MEAN_2ND_GRAD: double (nullable = true)
 |-- STD_2ND_GRAD: double (nullable = true)
 |-- ALSC: double (nullable = true)
 |-- INSC: double (nullable = true)
 |-- APSC: double (nullable = true)
 |-- RMSC: double (nullable = true)
 |-- MEAN_LOG: double (nullable = true)
 |-- INSC_LOG: double (nullable = true)
 |-- APSC_LOG: double (nullable = true)
 |-- RMSC_LOG: double (nullable = true)
 |-- RANGE_LOG: double (nullable = true)
 |-- ALSC_LOG: double (nullable = true)
 |-- MIN_LOG: double (nullable = true)
 |-- MEAN_1ST_GRAD_LOG: double (nullable = true)
 |-- MEAN_2ND_GRAD_LOG: double (nullable = true)
 |-- MIN_LOG_LOG: double (nullable = true)
 |-- MEAN_1ST_GRAD_LOG_LOG: double (n

### Load and analyze Heart Rate Variability dataset

In [44]:
hrv_path = "wesad-chest-combined-classification-hrv.csv"
df_hrv = spark.read.csv(hrv_path, header=True, inferSchema=True)
df_hrv.limit(10).toPandas()

                                                                                

Unnamed: 0,MEAN_RR,MEDIAN_RR,SDRR,RMSSD,SDSD,SDRR_RMSSD,HR,pNN25,pNN50,SD1,...,MEAN_RR_MEAN_MEAN_REL_RR,SD2_LF,HR_LF,HR_HF,HF_VLF,subject id,condition,SSSQ class,SSSQ Label,condition label
0,660.756625,657.242305,34.857082,7.408307,7.408129,4.705134,91.050077,1.75,0.0,5.244915,...,10928560.0,0.13441,0.249678,7.06844,0.033515,13,baseline,medium,1,0
1,762.004543,769.028815,106.035926,13.59178,13.580221,7.801475,80.267696,6.0,1.0,9.614723,...,-1039564.0,0.125816,0.067484,5.971604,0.003921,9,baseline,low,0,0
2,978.587122,973.028365,80.323113,20.318045,20.316451,3.953289,61.720802,20.5,1.5,14.383937,...,-3073518.0,0.069888,0.038281,10.945519,0.003154,4,amusement,medium,1,1
3,690.333891,669.54846,79.677822,15.17368,15.173087,5.251055,87.99627,8.75,1.5,10.742463,...,-4341078.0,0.094345,0.074014,2.435759,0.009692,8,stress,low,0,2
4,724.270331,721.597785,71.663062,13.580379,13.58037,5.276956,83.637107,5.0,0.75,9.614828,...,13111400.0,0.076385,0.063323,3.259253,0.014438,4,stress,medium,1,2
5,720.45951,717.918925,80.255342,16.184569,16.184498,4.958757,84.311188,9.75,1.25,11.458537,...,-22307740.0,0.092779,0.069274,3.381279,0.008509,4,stress,medium,1,2
6,899.380126,895.144395,88.156912,17.028414,17.026013,5.177048,67.356005,12.75,0.0,12.054324,...,-3108511.0,0.11167,0.060615,3.099106,0.009211,16,baseline,low,0,0
7,663.783777,640.516725,78.40655,15.474744,15.470725,5.066743,91.568244,9.0,1.25,10.953189,...,-1179011.0,0.052694,0.043729,2.736483,0.019757,8,stress,low,0,2
8,739.661574,729.443895,70.944729,10.76638,10.766372,6.589469,81.849742,3.5,0.0,7.622533,...,-22283160.0,0.18837,0.154117,5.367461,0.006001,15,baseline,medium,1,0
9,827.177718,824.218205,60.616615,19.143861,19.140281,3.166374,72.917243,15.75,1.5,13.551215,...,1227272.0,0.071478,0.061573,1.617537,0.042293,2,baseline,high,2,0


In [45]:
df_hrv = df_hrv.drop(*columns_to_drop)

In [46]:
df_hrv.printSchema()

root
 |-- MEAN_RR: double (nullable = true)
 |-- MEDIAN_RR: double (nullable = true)
 |-- SDRR: double (nullable = true)
 |-- RMSSD: double (nullable = true)
 |-- SDSD: double (nullable = true)
 |-- SDRR_RMSSD: double (nullable = true)
 |-- HR: double (nullable = true)
 |-- pNN25: double (nullable = true)
 |-- pNN50: double (nullable = true)
 |-- SD1: double (nullable = true)
 |-- SD2: double (nullable = true)
 |-- KURT: double (nullable = true)
 |-- SKEW: double (nullable = true)
 |-- MEAN_REL_RR: double (nullable = true)
 |-- MEDIAN_REL_RR: double (nullable = true)
 |-- SDRR_REL_RR: double (nullable = true)
 |-- RMSSD_REL_RR: double (nullable = true)
 |-- SDSD_REL_RR: double (nullable = true)
 |-- SDRR_RMSSD_REL_RR: double (nullable = true)
 |-- KURT_REL_RR: double (nullable = true)
 |-- SKEW_REL_RR: double (nullable = true)
 |-- VLF: double (nullable = true)
 |-- VLF_PCT: double (nullable = true)
 |-- LF: double (nullable = true)
 |-- LF_PCT: double (nullable = true)
 |-- LF_NU: dou

### Checking the distribution of label values

In [47]:
column_name = "condition"
distinct_values = df_hrv.select(column_name).distinct().collect()
for value in distinct_values:
    print(value[0])

baseline
stress
amusement


In [48]:
condition_column = "condition"
condition_label_column = "condition label"

# Group by "condition" and collect corresponding "condition label" values
grouped_df = df_hrv.groupBy(condition_column).agg(collect_list(condition_label_column).alias("condition_labels"))

grouped_df.show()

+---------+--------------------+
|condition|    condition_labels|
+---------+--------------------+
| baseline|[0, 0, 0, 0, 0, 0...|
|   stress|[2, 2, 2, 2, 2, 2...|
|amusement|[1, 1, 1, 1, 1, 1...|
+---------+--------------------+



In [49]:
condition_column = "condition"
condition_label_column = "condition label"

# Group by "condition" and collect corresponding "condition label" values
grouped_df1 = df_eda.groupBy(condition_column).agg(collect_list(condition_label_column).alias("condition_labels"))

grouped_df1.show()

+---------+--------------------+
|condition|    condition_labels|
+---------+--------------------+
| baseline|[0, 0, 0, 0, 0, 0...|
|   stress|[2, 2, 2, 2, 2, 2...|
|amusement|[1, 1, 1, 1, 1, 1...|
+---------+--------------------+



### Analyzing signficant correlations with the response variable in the two datasets

In [50]:
response_variable = "condition label"
df_hrv = df_hrv.drop("condition")

# Index the response variable to convert it to numerical format
indexer = StringIndexer(inputCol=response_variable, outputCol="label")
df_hrv_indexed = indexer.fit(df_hrv).transform(df_hrv)

# Compute the correlation of each column with the response variable
correlations = []
for column in df_hrv.columns:
    if column != response_variable:
        correlation = df_hrv_indexed.corr(column, "label")
        correlations.append((column, correlation))

# Filter columns with correlation stronger than 0.15 or -0.15
significant_correlations = [(column, correlation) for column, correlation in correlations if abs(correlation) > 0.15]

# Print the significant correlations
print(f"Significant Correlations with {response_variable} (>|0.15|) for Heart Rate Variability features:")
for column, correlation in significant_correlations:
    print(f"{column}: {correlation}")

Significant Correlations with condition label (>|0.15|) for Heart Rate Variability features:
MEAN_RR: -0.186856779417934
MEDIAN_RR: -0.19395421549162511
SDRR_RMSSD: 0.1610164569630079
HR: 0.2034270630811824
MEDIAN_REL_RR: 0.26526477190787895
HF: 0.19457815803848869
MEAN_RR_LOG: -0.1950400986157958
MEAN_RR_SQRT: -0.19177801113815912
MEDIAN_REL_RR_LOG: 0.26531910338126735
HF_LOG: 0.1791821918757007
LF_HF_LOG: -0.17516380902217013
SDRR_RMSSD_LOG: 0.15644375486132953
HF_BOXCOX: 0.155131224431366
HR_SQRT: 0.20413045399235033


In [51]:
response_variable = "condition label"
df_eda = df_eda.drop("condition")

# Index the response variable to convert it to numerical format
indexer = StringIndexer(inputCol=response_variable, outputCol="label")
df_eda_indexed = indexer.fit(df_eda).transform(df_eda)

# Compute the correlation of each column with the response variable
correlations = []
for column in df_eda.columns:
    if column != response_variable:
        correlation = df_eda_indexed.corr(column, "label")
        correlations.append((column, correlation))

# Filter columns with correlation stronger than 0.15 or -0.15
significant_correlations = [(column, correlation) for column, correlation in correlations if abs(correlation) > 0.15]

# Print the significant correlations
print(f"Significant Correlations with {response_variable} (>|0.15|) for Electrodermal Activity features:")
for column, correlation in significant_correlations:
    print(f"{column}: {correlation}")

Significant Correlations with condition label (>|0.15|) for Electrodermal Activity features:
MAX: 0.16118046469546218
MEAN_LOG: 0.16379956316020963
INSC_LOG: 0.163314248585941
APSC_LOG: 0.165940801808046
RMSC_LOG: 0.16416291537592015
RANGE_LOG: 0.18625636202521512
APSC_LOG_LOG: 0.1662152056331066
RANGE_BOXCOX: 0.19209397468582648
RANGE_SQRT: 0.18130691051976275


### Delete IDs and randomize the order of rows

In [52]:
df_hrv = df_hrv.drop("subject id")

# Shuffle the rows using the rand function
df_hrv_shuffled = df_hrv.orderBy(rand())

In [53]:
df_eda = df_eda.drop("subject id")

# Shuffle the rows using the rand function
df_eda_shuffled = df_eda.orderBy(rand())

### Model and optimize HRV dataset and analyze best parameters 

In [None]:
# Mapping labels for binary classification
df_hrv = df_hrv.withColumn("label", when(col("condition label").isin([0, 1]), 0).otherwise(1))

#Define pre-modelling processing
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
assembler = VectorAssembler(inputCols=df_hrv.columns[:-2], outputCol="features")

# Define the Decision Tree Classifier
classifier = DecisionTreeClassifier(featuresCol="features", labelCol="indexedLabel")

# Create a pipeline for HRV
hrv_pipeline = Pipeline(stages=[indexer, assembler, classifier])

# Split the data into training and testing sets
(train_data, test_data) = df_hrv.randomSplit([0.8, 0.2], seed=42)

# Define the parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(classifier.maxDepth, [5, 10, 15]) \
    .addGrid(classifier.maxBins, [20, 30, 40]) \
    .build()

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")

# Perform cross-validation using the parameter grid
crossval = CrossValidator(estimator=hrv_pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

# Fit the model
cv_model = crossval.fit(train_data)

# Get the best model and parameters from cross-validation
best_model_hrv = cv_model.bestModel
best_params = best_model_hrv.stages[-1].extractParamMap()

# Generate predictions on the test set
predictions = cv_model.transform(test_data)

# Evaluate the model using MulticlassClassificationEvaluator optimized for F1-score and Recall
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

print("Best parameters:")
for param, value in best_params.items():
    print(f"{param.name}: {value}")

print(f"F1-score: {f1_score}")
print(f"Weighted Recall: {recall}")

                                                                                

### Model and optimize EDA dataset and analyze best parameters

In [27]:
# Mapping labels for binary classification
df_eda = df_eda.withColumn("label", when(col("condition label").isin([0, 1]), 0).otherwise(1))

#Define pre-modelling processing
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
assembler = VectorAssembler(inputCols=df_eda.columns[:-2], outputCol="features")

# Define RandomForestClassifier
classifier = RandomForestClassifier(featuresCol="features", labelCol="indexedLabel")

# Create a pipeline for EDA
eda_pipeline = Pipeline(stages=[indexer, assembler, classifier])

# Split the data into training and testing sets
(train_data, test_data) = df_eda.randomSplit([0.8, 0.2], seed=42)

# Define the parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(classifier.numTrees, [10, 20]) \
    .addGrid(classifier.maxDepth, [3, 5]) \
    .build()

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")

# Define cross-validation
crossval = CrossValidator(estimator=eda_pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)  # Use 3 folds for cross-validation

# Fit the model
cv_model = crossval.fit(train_data)

# Get the best model and paraneters from cross-validation
best_model_eda = cv_model.bestModel
best_params = best_model_eda.stages[-1].extractParamMap()

# Generate predictions on the test set
predictions = cv_model.transform(test_data)

# Evaluate the model using MulticlassClassificationEvaluator optimized for F1-score and Recall
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

print("Best parameters:")
for param, value in best_params.items():
    print(f"{param.name}: {value}")

print(f"F1-score: {f1_score}")
print(f"Weighted Recall: {recall}")

Best parameters:
bootstrap: True
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: auto
featuresCol: features
impurity: gini
labelCol: indexedLabel
leafCol: 
maxBins: 32
maxDepth: 5
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
numTrees: 20
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: -7573080781903287948
subsamplingRate: 1.0
F1-score: 0.9758834036126002
Weighted Recall: 0.9759418974126192
