In [0]:
# MENTAL HEALTH RISK MODEL TRAINING

from pyspark.sql.functions import col, when, count, round
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
import mlflow
import os

print("✅ Imports loaded")

# ---------------- MLflow Setup ----------------
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/ecommerce/mental_health/mlflow_volume/mlflow_tmp"
mlflow.set_experiment("/Shared/Mental_Health_Risk_Prediction")

print("✅ MLflow configured")

# ---------------- Load Data ----------------
df = spark.table("ecommerce.mental_health.gold_ml_features")
print("✅ Data loaded:", df.count(), "rows")

# ---------------- Feature Vector ----------------
feature_cols = [
    "High_Screen_Time",
    "Sleep_Deprived",
    "LateNight_Sleep_Risk",
    "Passive_Usage_Risk",
    "Social_Comparison_Risk"
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# ---------------- Model ----------------
rf = RandomForestClassifier(
    labelCol="High_Mental_Health_Risk",
    featuresCol="features",
    numTrees=50,
    maxDepth=5
)

pipeline = Pipeline(stages=[assembler, rf])

# ---------------- Train + Log ----------------
with mlflow.start_run(run_name="Mental_Health_Risk_Model"):

    model = pipeline.fit(df)
    predictions = model.transform(df)

    mlflow.spark.log_model(model, "mental_health_model")

    print("✅ Model trained and logged")

# ---------------- AI Insight Table ----------------
gender_risk = predictions.groupBy("Gender") \
    .agg(
        round(
            (count(when(col("prediction") == 1, True)) / count("*")) * 100,
            2
        ).alias("High_Risk_Percentage")
    )

display(gender_risk)

#-----------------Log Metrics ----------------------
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Accuracy
accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol="High_Mental_Health_Risk",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = accuracy_evaluator.evaluate(predictions)

# AUC
auc_evaluator = BinaryClassificationEvaluator(
    labelCol="High_Mental_Health_Risk",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = auc_evaluator.evaluate(predictions)

print(f"Accuracy: {accuracy}")
print(f"AUC: {auc}")


#----------------- Confusion metrics ---------------
confusion_matrix = predictions.groupBy(
    "High_Mental_Health_Risk", "prediction"
).count()

display(confusion_matrix)


# ---------------- Save Predictions ----------------
pred_df_fixed = predictions.select(
    "Age",
    "Gender",
    "prediction",
    "probability",
    "High_Mental_Health_Risk"
)

pred_df_fixed.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ecommerce.mental_health.gold_user_risk_predictions_fixed")

print("🎯 Predictions saved to GOLD")


✅ Imports loaded
✅ MLflow configured
✅ Data loaded: 8000 rows




✅ Model trained and logged


Gender,High_Risk_Percentage
Female,34.56
Male,33.56


Accuracy: 0.824625
AUC: 0.867984455489115


High_Mental_Health_Risk,prediction,count
0,0.0,4306
1,0.0,969
1,1.0,2291
0,1.0,434


🎯 Predictions saved to GOLD
