# Build Anxiety Models 
This notebook contains all the code necessary to build, cross validate, and evaluate our models for Anxiety. 

# Load in Data and Explore Schema

In [1]:
from pyspark.sql import *
DATA_FILEPATH = 'data/anxiety.csv'

spark = SparkSession \
    .builder \
    .appName("Build Anxiety Models") \
    .getOrCreate()

train = spark.read.csv(DATA_FILEPATH,  inferSchema=True, header = True)
DATA_FILEPATH = 'data/test.csv'
test = spark.read.csv(DATA_FILEPATH,  inferSchema=True, header = True)

In [2]:
train.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- SES: double (nullable = true)
 |-- RuralVsUrban: double (nullable = true)
 |-- EducationDegree: double (nullable = true)
 |-- SecondaryVsPrimary: double (nullable = true)
 |-- TeachPublicVsOther: double (nullable = true)
 |-- YearsAsTeacher: double (nullable = true)
 |-- EmployedVsNot: double (nullable = true)
 |-- PastCOVIDpositive: double (nullable = true)
 |-- COVIDvaccinated: double (nullable = true)
 |-- PrePandemicChronIllness: double (nullable = true)
 |-- PrePandemicMentIllness: double (nullable = true)
 |-- PrePandemicNeuroDis: double (nullable = true)
 |-- Depression: integer (nullable = true)
 |-- Anxiety: integer (nullable = true)
 |-- OverallHealth: double (nullable = true)
 |-- COVIDfear: double (nullable = true)
 |-- RelatImprov: double (nullable = true)
 |-- WorkloadNowVsPreCOVID: double (nullable = true)
 |-- ResourceSatisfaction: double (nullable = true)
 |-- SufficientCOVIDmeasures: doub

# Train/Validation/Test Split

In [3]:
column_list = train.columns
column_list.remove('Anxiety')
column_list.remove('_c0')
train = train.withColumnRenamed('Anxiety', 'label')
test = test.withColumnRenamed('Anxiety', 'label')

# Benchmark Model: Logistic Regression

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator



In [5]:
assembler = VectorAssembler(inputCols=column_list,
                            outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

lr = LogisticRegression(labelCol='label',
                        featuresCol='scaledFeatures')

pipeline = Pipeline(stages = [assembler, scaler, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0, .2, .4, .5, .6, .8, 1]) \
    .addGrid(lr.regParam, [0.1, 0.01, .001]) \
    .build()

# Fit the model
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)

cvModel = crossval.setParallelism(4).fit(train) # train 4 models in parallel

In [6]:
import numpy as np
best_params = cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)]
for k, v in best_params.items():
    print(f"{k}: {v}")

LogisticRegression_943ffa6f5912__elasticNetParam: 1.0
LogisticRegression_943ffa6f5912__regParam: 0.01


In [7]:
metrics = cvModel.avgMetrics
all_params = cvModel.getEstimatorParamMaps()
params= []
for i in range(0, len(all_params)):
    param = all_params[i]
    for k, v in param.items():
        params.append({f"{k}": f"{v}"})     
for i in range(0, len(metrics)):
    s = f'Param set {i+1} Avg Metrics: {metrics[i]}\n'
    params = all_params[i]
    for k, v in params.items():
        s+= f"  {k.name}: {v}\n"
    print(s)

Param set 1 Avg Metrics: 0.8168180615298125
  elasticNetParam: 0.0
  regParam: 0.1

Param set 2 Avg Metrics: 0.8176656808738504
  elasticNetParam: 0.0
  regParam: 0.01

Param set 3 Avg Metrics: 0.8174651277141399
  elasticNetParam: 0.0
  regParam: 0.001

Param set 4 Avg Metrics: 0.8212512213366452
  elasticNetParam: 0.2
  regParam: 0.1

Param set 5 Avg Metrics: 0.8198105597707631
  elasticNetParam: 0.2
  regParam: 0.01

Param set 6 Avg Metrics: 0.8176026627215145
  elasticNetParam: 0.2
  regParam: 0.001

Param set 7 Avg Metrics: 0.8169008109949264
  elasticNetParam: 0.4
  regParam: 0.1

Param set 8 Avg Metrics: 0.8211121271851531
  elasticNetParam: 0.4
  regParam: 0.01

Param set 9 Avg Metrics: 0.8178036554557384
  elasticNetParam: 0.4
  regParam: 0.001

Param set 10 Avg Metrics: 0.8142942017931029
  elasticNetParam: 0.5
  regParam: 0.1

Param set 11 Avg Metrics: 0.8215592397987189
  elasticNetParam: 0.5
  regParam: 0.01

Param set 12 Avg Metrics: 0.8179790643694763
  elasticNetParam: 

In [8]:
prediction = cvModel.transform(test)

In [9]:
# set up evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",
                                          labelCol="label")
evaluator_2 = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="label",
                                          metricName="areaUnderROC")
#Evaluate Metrics
accuracy = evaluator.evaluate(prediction, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(prediction, {evaluator.metricName: "recallByLabel"})
f1_score = evaluator.evaluate(prediction, {evaluator.metricName: "fMeasureByLabel"})
auc = evaluator_2.evaluate(prediction)

#Print Results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")
print(f"AUROC: {auc}")

Accuracy: 0.7611464968152867
Precision: 0.7979274611398963
Recall: 0.8105263157894737
F1 Score: 0.8041775456919059
AUROC: 0.7480050933786079


# Random Forest

In [10]:
from pyspark.ml.classification import RandomForestClassifier

In [11]:
assembler = VectorAssembler(inputCols=column_list,
                            outputCol="features")

rf = RandomForestClassifier(labelCol='label',
                        featuresCol='features')

pipeline = Pipeline(stages = [assembler, rf])

paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [1, 5, 10, 15]) \
    .addGrid(rf.numTrees, [10, 15, 20, 25, 30, 35, 40, 45]) \
    .build()

# Fit the model
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)

rfModel = crossval.setParallelism(4).fit(train) # train 4 models in parallel

In [12]:
import numpy as np
best_params = rfModel.getEstimatorParamMaps()[np.argmax(rfModel.avgMetrics)]
for k, v in best_params.items():
    print(f"{k}: {v}")

RandomForestClassifier_6c0b1f91d28f__maxDepth: 15
RandomForestClassifier_6c0b1f91d28f__numTrees: 40


In [13]:
metrics = rfModel.avgMetrics
all_params = rfModel.getEstimatorParamMaps()
params= []
for i in range(0, len(all_params)):
    param = all_params[i]
    for k, v in param.items():
        params.append({f"{k}": f"{v}"})     
for i in range(0, len(metrics)):
    s = f'Param set {i+1} Avg Metrics: {metrics[i]}\n'
    params = all_params[i]
    for k, v in params.items():
        s+= f"  {k.name}: {v}\n"
    print(s)

Param set 1 Avg Metrics: 0.7810112863426171
  maxDepth: 1
  numTrees: 10

Param set 2 Avg Metrics: 0.7790771091324109
  maxDepth: 1
  numTrees: 15

Param set 3 Avg Metrics: 0.7948861153624555
  maxDepth: 1
  numTrees: 20

Param set 4 Avg Metrics: 0.799008780748075
  maxDepth: 1
  numTrees: 25

Param set 5 Avg Metrics: 0.7920682057940021
  maxDepth: 1
  numTrees: 30

Param set 6 Avg Metrics: 0.7943010265083048
  maxDepth: 1
  numTrees: 35

Param set 7 Avg Metrics: 0.7956082998686351
  maxDepth: 1
  numTrees: 40

Param set 8 Avg Metrics: 0.793205218968601
  maxDepth: 1
  numTrees: 45

Param set 9 Avg Metrics: 0.8208347628280914
  maxDepth: 5
  numTrees: 10

Param set 10 Avg Metrics: 0.8297609155170109
  maxDepth: 5
  numTrees: 15

Param set 11 Avg Metrics: 0.8326307482226427
  maxDepth: 5
  numTrees: 20

Param set 12 Avg Metrics: 0.8327495734897481
  maxDepth: 5
  numTrees: 25

Param set 13 Avg Metrics: 0.8332627906708371
  maxDepth: 5
  numTrees: 30

Param set 14 Avg Metrics: 0.83423160

In [14]:
rf_prediction = rfModel.transform(test)

In [15]:
# set up evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",
                                          labelCol="label")
evaluator_2 = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="label",
                                          metricName="areaUnderROC")
#Evaluate Metrics
accuracy = evaluator.evaluate(rf_prediction, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(rf_prediction, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(rf_prediction, {evaluator.metricName: "recallByLabel"})
f1_score = evaluator.evaluate(rf_prediction, {evaluator.metricName: "fMeasureByLabel"})
auc = evaluator_2.evaluate(rf_prediction)

#Print Results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")
print(f"AUROC: {auc}")

Accuracy: 0.7356687898089171
Precision: 0.7488372093023256
Recall: 0.8473684210526315
F1 Score: 0.7950617283950617
AUROC: 0.7059422750424448


# MLP Model

In [16]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [17]:
layers = [[len(column_list), k, k, 2] for k in [50, 75, 100]]
assembler = VectorAssembler(inputCols=column_list,
                            outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")


mlp = MultilayerPerceptronClassifier(labelCol='label', featuresCol='scaledFeatures', maxIter=1000, 
                                     blockSize=128, seed=1234, solver = 'gd')

pipeline = Pipeline(stages = [assembler, scaler, mlp])

paramGrid = ParamGridBuilder() \
    .addGrid(mlp.layers, layers)\
    .addGrid(mlp.stepSize, [.001, .005, .01, .05])\
    .build()

# Fit the model
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)

# train the model
mlpModel = crossval.setParallelism(6).fit(train) # train 6 models in parallel

In [18]:
import numpy as np
best_params = mlpModel.getEstimatorParamMaps()[np.argmax(mlpModel.avgMetrics)]
for k, v in best_params.items():
    print(f"{k}: {v}")

MultilayerPerceptronClassifier_867684e7b607__layers: [45, 100, 100, 2]
MultilayerPerceptronClassifier_867684e7b607__stepSize: 0.05


In [19]:
mlp_prediction = mlpModel.transform(test)

In [20]:
# set up evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",
                                          labelCol="label")
evaluator_2 = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="label",
                                          metricName="areaUnderROC")
#Evaluate Metrics
accuracy = evaluator.evaluate(mlp_prediction, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(mlp_prediction, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(mlp_prediction, {evaluator.metricName: "recallByLabel"})
f1_score = evaluator.evaluate(mlp_prediction, {evaluator.metricName: "fMeasureByLabel"})
auc = evaluator_2.evaluate(mlp_prediction)

#Print Results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")
print(f"AUROC: {auc}")

Accuracy: 0.7452229299363057
Precision: 0.8089887640449438
Recall: 0.7578947368421053
F1 Score: 0.782608695652174
AUROC: 0.7418505942275042


# SVM

In [21]:
from pyspark.ml.classification import LinearSVC

In [22]:
assembler = VectorAssembler(inputCols=column_list,
                            outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

svm = LinearSVC(labelCol='label',
                        featuresCol='scaledFeatures')

pipeline = Pipeline(stages = [assembler, scaler, svm])

paramGrid = ParamGridBuilder() \
    .addGrid(svm.maxIter, [10, 20, 30, 40, 50]) \
    .addGrid(svm.regParam, [0.1, 0.01, .001]) \
    .build()

# Fit the model
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)

svmModel = crossval.setParallelism(6).fit(train) # train 6 models in parallel

In [23]:
import numpy as np
best_params = svmModel.getEstimatorParamMaps()[np.argmax(svmModel.avgMetrics)]
for k, v in best_params.items():
    print(f"{k}: {v}")

LinearSVC_5b9a141b5de4__maxIter: 10
LinearSVC_5b9a141b5de4__regParam: 0.01


In [24]:
svm_prediction = svmModel.transform(test)

In [25]:
# set up evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",
                                          labelCol="label")
evaluator_2 = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="label",
                                          metricName="areaUnderROC")
#Evaluate Metrics
accuracy = evaluator.evaluate(svm_prediction, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(svm_prediction, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(svm_prediction, {evaluator.metricName: "recallByLabel"})
f1_score = evaluator.evaluate(svm_prediction, {evaluator.metricName: "fMeasureByLabel"})
auc = evaluator_2.evaluate(svm_prediction)

#Print Results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")
print(f"AUROC: {auc}")

Accuracy: 0.7547770700636943
Precision: 0.7897435897435897
Recall: 0.8105263157894737
F1 Score: 0.7999999999999999
AUROC: 0.7399405772495755


# Gradient Boosted Tree

In [26]:
from pyspark.ml.classification import GBTClassifier

In [27]:
assembler = VectorAssembler(inputCols=column_list,
                            outputCol="features")

gbt = GBTClassifier(labelCol='label',
                        featuresCol='features')

pipeline = Pipeline(stages = [assembler, gbt])

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 20, 30]) \
    .addGrid(gbt.maxBins, [8, 16, 32,64]) \
    .build()

# Fit the model
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)

gbtModel = crossval.setParallelism(6).fit(train) # train 6 models in parallel

In [28]:
import numpy as np
best_params = gbtModel.getEstimatorParamMaps()[np.argmax(gbtModel.avgMetrics)]
for k, v in best_params.items():
    print(f"{k}: {v}")

GBTClassifier_499f0517b4b1__maxDepth: 10
GBTClassifier_499f0517b4b1__maxBins: 32


In [29]:
gbt_prediction = gbtModel.transform(test)

In [31]:
# set up evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",
                                          labelCol="label")
evaluator_2 = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="label",
                                          metricName="areaUnderROC")
#Evaluate Metrics
accuracy = evaluator.evaluate(gbt_prediction, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(gbt_prediction, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(gbt_prediction, {evaluator.metricName: "recallByLabel"})
f1_score = evaluator.evaluate(gbt_prediction, {evaluator.metricName: "fMeasureByLabel"})
auc = evaluator_2.evaluate(gbt_prediction)


#Print Results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")
print(f"AUROC: {auc}")

Accuracy: 0.6910828025477707
Precision: 0.7360406091370558
Recall: 0.7631578947368421
F1 Score: 0.7493540051679587
AUROC: 0.6719015280135824
