In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Mental Health Analysis with Spark ML") \
    .config("spark.executor.instances", "2") \
    .getOrCreate()

# Step 1: Load and preprocess data
# Load the CSV file
df = spark.read.csv("/opt/spark/mental_health_dataset.csv", header=True, inferSchema=True)

# Convert columns to appropriate data types
df = df.withColumn("Age", col("Age").cast("int")) \
       .withColumn("Gender", col("Gender").cast("string")) \
       .withColumn("self_employed", col("self_employed").cast("string")) \
       .withColumn("family_history", col("family_history").cast("string")) \
       .withColumn("treatment", col("treatment").cast("string")) \
       .withColumn("work_interfere", col("work_interfere").cast("string")) \
       .withColumn("no_employees", col("no_employees").cast("string"))

# Index categorical columns
categorical_columns = ["Gender", "self_employed", "family_history", "treatment", "work_interfere", "no_employees"]
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in categorical_columns]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Assemble feature vector
feature_columns = [col + "_index" for col in categorical_columns if col + "_index" in df.columns] + ["Age"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df).select("features", "treatment_index")

# Rename target column
data = data.withColumnRenamed("treatment_index", "target")

# Split data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 2: Initialize the RandomForestClassifier for classification
rf = RandomForestClassifier(labelCol="target", featuresCol="features", numTrees=50)

# Step 3: Train the model
start_time = time.time()
rf_model = rf.fit(train_data)
train_time = time.time() - start_time
print(f"Training Time: {train_time} seconds")

# Step 4: Predict and evaluate on the test set
predictions = rf_model.transform(test_data)

# Step 5: Evaluate model performance metrics
# Accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="target", metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(predictions)

# AUC
evaluator_auc = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
auc = evaluator_auc.evaluate(predictions)

# Precision and Recall
evaluator_precision = MulticlassClassificationEvaluator(labelCol="target", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="target", metricName="weightedRecall")
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)

# Print Evaluation Metrics
print(f"Accuracy: {accuracy}")
print(f"AUC: {auc}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")


ModuleNotFoundError: No module named 'pyspark'