In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession, functions as F

EXPORT_DIR = "exports"
os.makedirs(EXPORT_DIR, exist_ok=True)
spark = SparkSession.builder.appName("StudentMentalHealthBigData").getOrCreate()

In [None]:
data_path = "students_mental_health.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.printSchema()
df.show(5)
df.limit(10).toPandas()

In [None]:
missing_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
missing_counts.show()
missing_counts_pd = missing_counts.toPandas().T
missing_counts_pd.columns = ['Missing']
missing_counts_pd

In [None]:
# Show stats and value counts for Depression_Score
df.select("Depression_Score").summary().show()
df.groupBy("Depression_Score").count().orderBy("Depression_Score").show()

# Compute quantiles for labels
quantiles = df.approxQuantile("Depression_Score", [0.33, 0.5, 0.66], 0.01)
q33, median, q66 = quantiles
print(f"Quantiles: 33%={q33}, median={median}, 66%={q66}")

# Binary label for classification (above/below median)
df = df.withColumn("Depression_Class", (F.col("Depression_Score") > median).cast("int"))

# Multi-class label: Low, Moderate, High
df = df.withColumn(
    "Depression_Level",
    F.when(F.col("Depression_Score") <= q33, "Low")
     .when((F.col("Depression_Score") > q33) & (F.col("Depression_Score") <= q66), "Moderate")
     .otherwise("High")
)
df.groupBy("Depression_Class").count().show()
df.groupBy("Depression_Level").count().show()


In [None]:
key_columns = ['Age', 'CGPA', 'Depression_Score', 'Anxiety_Score']
df = df.na.drop(subset=key_columns)
fillna_cols = [
    'Gender', 'Sleep_Quality', 'Physical_Activity', 'Diet_Quality',
    'Social_Support', 'Relationship_Status', 'Substance_Use',
    'Counseling_Service_Use', 'Family_History', 'Chronic_Illness',
    'Extracurricular_Involvement', 'Residence_Type'
]
for c in fillna_cols:
    df = df.fillna({c: "Unknown"})

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

categorical_cols = fillna_cols + ['Course']
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid='keep') for c in categorical_cols]
for indexer in indexers:
    df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(
    inputCols=[c+"_idx" for c in categorical_cols],
    outputCols=[c+"_onehot" for c in categorical_cols]
)
df = encoder.fit(df).transform(df)

feature_cols = [
    'Age', 'CGPA', 'Stress_Level', 'Depression_Score', 'Anxiety_Score',
    'Financial_Stress', 'Semester_Credit_Load'
] + [c+"_idx" for c in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
df = assembler.transform(df)
scaler = StandardScaler(inputCol="features_vec", outputCol="features", withMean=True, withStd=True)
scalerModel = scaler.fit(df)
df = scalerModel.transform(df)

onehot_features = [
    'Age', 'CGPA', 'Stress_Level', 'Depression_Score', 'Anxiety_Score',
    'Financial_Stress', 'Semester_Credit_Load'
] + [c+"_onehot" for c in categorical_cols]
assembler2 = VectorAssembler(inputCols=onehot_features, outputCol="features_onehot")
df = assembler2.transform(df)

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categorical_cols = fillna_cols + ['Course']
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid='keep') for c in categorical_cols]
for indexer in indexers:
    df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCols=[c+"_idx" for c in categorical_cols], outputCols=[c+"_onehot" for c in categorical_cols])
df = encoder.fit(df).transform(df)
df.select(['Gender', 'Gender_idx', 'Gender_onehot']).show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

feature_cols = [
    'Age', 'CGPA', 'Stress_Level', 'Depression_Score', 'Anxiety_Score',
    'Financial_Stress', 'Semester_Credit_Load'
] + [c+"_idx" for c in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
df = assembler.transform(df)
scaler = StandardScaler(inputCol="features_vec", outputCol="features", withMean=True, withStd=True)
scalerModel = scaler.fit(df)
df = scalerModel.transform(df)

onehot_features = ['Age', 'CGPA', 'Stress_Level', 'Depression_Score', 'Anxiety_Score', 'Financial_Stress', 'Semester_Credit_Load'] + [c+"_onehot" for c in categorical_cols]
assembler2 = VectorAssembler(inputCols=onehot_features, outputCol="features_onehot")
df = assembler2.transform(df)

In [None]:
# Summary
summary = df.select(['Age','CGPA','Stress_Level','Depression_Score','Anxiety_Score','Financial_Stress']).summary()
summary.show()
summary_pd = summary.toPandas()
display(summary_pd)

# Barplot Gender
gender_counts = df.groupBy('Gender').count().toPandas()
sns.barplot(data=gender_counts, x='Gender', y='count')
plt.title('Gender Distribution')
plt.show()

# Sleep Quality
sleep_counts = df.groupBy('Sleep_Quality').count().toPandas()
sns.barplot(data=sleep_counts, x='Sleep_Quality', y='count')
plt.title('Sleep Quality Distribution')
plt.xticks(rotation=45)
plt.show()

# Histograms, Boxplots
numeric_cols = ['Age','CGPA','Stress_Level','Depression_Score','Anxiety_Score','Financial_Stress']
pd_df = df.select(numeric_cols).toPandas()
for col in numeric_cols:
    sns.histplot(pd_df[col].dropna(), kde=True)
    plt.title(f"Distribution of {col}")
    plt.show()
    sns.boxplot(x=pd_df[col])
    plt.title(f"Boxplot of {col}")
    plt.show()

# Correlation heatmap
corr = pd_df.corr()
plt.figure(figsize=(8,6))
sns.heatmap(corr, annot=True, cmap='coolwarm')
plt.title("Correlation Heatmap (Numeric Features)")
plt.show()

In [None]:
df.createOrReplaceTempView("students")

# 1. Simple SELECT & WHERE
result1 = spark.sql("SELECT Age, Gender, Depression_Score FROM students WHERE Depression_Score > 7")
result1.show(5)

# 2. GROUP BY aggregation
result2 = spark.sql("""
    SELECT Gender, COUNT(*) as count, AVG(Depression_Score) as avg_dep
    FROM students
    GROUP BY Gender
    ORDER BY count DESC
""")
result2.show()

# 3. DISTINCT count
result3 = spark.sql("SELECT COUNT(DISTINCT Course) as unique_courses FROM students")
result3.show()

# 4. Cross-tab
result4 = spark.sql("""
    SELECT Sleep_Quality, COUNT(*) as count
    FROM students
    GROUP BY Sleep_Quality
    ORDER BY count DESC
""")
result4.show()

# 5. Multi-condition filter
result5 = spark.sql("""
    SELECT *
    FROM students
    WHERE Depression_Score > 7 AND Anxiety_Score > 7 AND Sleep_Quality = 'Poor'
    ORDER BY Age DESC
    LIMIT 10
""")
result5.show()


In [None]:
# (A) Average and Standard Deviation of CGPA by Course
result6 = spark.sql("""
    SELECT Course, COUNT(*) as n, AVG(CGPA) as avg_cgpa, STDDEV(CGPA) as std_cgpa
    FROM students
    GROUP BY Course
    ORDER BY avg_cgpa DESC
""")
result6.show()

# (B) Gender-wise Depression and Anxiety Score Distributions
result7 = spark.sql("""
    SELECT Gender, AVG(Depression_Score) as avg_depression, AVG(Anxiety_Score) as avg_anxiety
    FROM students
    GROUP BY Gender
    ORDER BY avg_depression DESC
""")
result7.show()

# (C) Crosstab of Residence Type and Sleep Quality
result8 = spark.sql("""
    SELECT Residence_Type, Sleep_Quality, COUNT(*) as count
    FROM students
    GROUP BY Residence_Type, Sleep_Quality
    ORDER BY Residence_Type, count DESC
""")
result8.show()

# (D) Proportion of High Depression Risk by Social Support
result9 = spark.sql("""
    SELECT Social_Support,
           COUNT(*) as total,
           SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END) as high_risk,
           ROUND(100.0 * SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END)/COUNT(*),2) as high_risk_pct
    FROM students
    GROUP BY Social_Support
    ORDER BY high_risk_pct DESC
""")
result9.show()

# (E) Top 10 Students by Stress Level and CGPA
result10 = spark.sql("""
    SELECT Age, Gender, Course, CGPA, Stress_Level
    FROM students
    ORDER BY Stress_Level DESC, CGPA DESC
    LIMIT 10
""")
result10.show()

# (F) Distribution of Physical Activity by High Depression Risk
result11 = spark.sql("""
    SELECT Physical_Activity,
           SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END) as high_risk,
           COUNT(*) as total,
           ROUND(100.0 * SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END)/COUNT(*),2) as pct_high_risk
    FROM students
    GROUP BY Physical_Activity
    ORDER BY pct_high_risk DESC
""")
result11.show()

# (G) Gender vs Course vs High Depression Risk
result12 = spark.sql("""
    SELECT Gender, Course,
           SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END) as high_risk,
           COUNT(*) as total,
           ROUND(100.0 * SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END)/COUNT(*),2) as pct_high_risk
    FROM students
    GROUP BY Gender, Course
    ORDER BY pct_high_risk DESC
""")
result12.show()

# (H) Fairness Table: High Depression Risk by Gender
result13 = spark.sql("""
    SELECT Gender,
           SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END) as high_risk,
           COUNT(*) as total,
           ROUND(100.0 * SUM(CASE WHEN Depression_Score >= 7 THEN 1 ELSE 0 END)/COUNT(*),2) as pct_high_risk
    FROM students
    GROUP BY Gender
    ORDER BY pct_high_risk DESC
""")
result13.show()

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import PCA

kmeans = KMeans(featuresCol='features', k=3, seed=42)
kmeans_model = kmeans.fit(df)
df = kmeans_model.transform(df)

clustering_evaluator = ClusteringEvaluator(featuresCol='features', predictionCol='prediction', metricName='silhouette')
silhouette = clustering_evaluator.evaluate(df)
print("KMeans Silhouette Score:", silhouette)

# PCA for 2D visualization (optional for plotting)
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(df)
df = pca_model.transform(df)
pca_pd = df.select('pca_features','prediction').toPandas()
pca_pd['PCA1'] = pca_pd['pca_features'].apply(lambda x: x[0])
pca_pd['PCA2'] = pca_pd['pca_features'].apply(lambda x: x[1])
sns.scatterplot(data=pca_pd, x='PCA1', y='PCA2', hue='prediction', palette='tab10')
plt.title("KMeans Clusters (PCA 2D)")
plt.show()


In [None]:
# Cluster means for each numeric col
for col in numeric_cols:
    means = df.groupBy('prediction').agg(F.avg(col).alias('mean')).orderBy('prediction').toPandas()
    plt.bar(means['prediction'], means['mean'])
    plt.title(f"Cluster Means: {col}")
    plt.xlabel("Cluster")
    plt.ylabel(f"Mean {col}")
    plt.show()

In [None]:
# Make sure both classes exist for Depression_Class
df.groupBy("Depression_Class").count().show()

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_class_counts = train_df.groupBy("Depression_Class").count().toPandas()
if len(train_class_counts) < 2:
    print("Warning: Training set contains only one class. Classification skipped.")
else:
    for colname in ['prediction']:
        if colname in train_df.columns:
            train_df = train_df.drop(colname)
        if colname in test_df.columns:
            test_df = test_df.drop(colname)

    from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier

    log_reg = LogisticRegression(featuresCol="features", labelCol="Depression_Class", maxIter=20)
    log_model = log_reg.fit(train_df)
    preds_lr = log_model.transform(test_df)

    rf = RandomForestClassifier(featuresCol="features_onehot", labelCol="Depression_Class", numTrees=50)
    rf_model = rf.fit(train_df)
    preds_rf = rf_model.transform(test_df)

    gbt = GBTClassifier(featuresCol="features", labelCol="Depression_Class", maxIter=30)
    gbt_model = gbt.fit(train_df)
    preds_gbt = gbt_model.transform(test_df)

    print("Classification completed.")


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

def print_classification_metrics(preds, model_name="", skip_roc_auc=False):
    if not skip_roc_auc:
        evaluator_roc = BinaryClassificationEvaluator(
            rawPredictionCol="rawPrediction", labelCol="Depression_Class", metricName="areaUnderROC"
        )
        try:
            roc_auc = evaluator_roc.evaluate(preds)
        except Exception:
            roc_auc = None
    else:
        roc_auc = None

    evaluator_acc = MulticlassClassificationEvaluator(labelCol="Depression_Class", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator_acc.evaluate(preds)
    precision = MulticlassClassificationEvaluator(labelCol="Depression_Class", predictionCol="prediction", metricName="precisionByLabel").evaluate(preds)
    recall = MulticlassClassificationEvaluator(labelCol="Depression_Class", predictionCol="prediction", metricName="recallByLabel").evaluate(preds)
    f1 = MulticlassClassificationEvaluator(labelCol="Depression_Class", predictionCol="prediction", metricName="f1").evaluate(preds)
    if roc_auc is not None:
        print(f"{model_name} -- ROC AUC: {roc_auc:.3f}, Accuracy: {accuracy:.3f}, Precision: {precision:.3f}, Recall: {recall:.3f}, F1: {f1:.3f}")
    else:
        print(f"{model_name} -- Accuracy: {accuracy:.3f}, Precision: {precision:.3f}, Recall: {recall:.3f}, F1: {f1:.3f} (ROC AUC not available)")
    return [model_name, accuracy, precision, recall, f1, roc_auc]

results = []
results.append(print_classification_metrics(preds_lr, "Logistic Regression"))
results.append(print_classification_metrics(preds_rf, "Random Forest"))
results.append(print_classification_metrics(preds_gbt, "Gradient Boosted Trees", skip_roc_auc=True))

comp_df = pd.DataFrame(results, columns=['Model', 'Accuracy', 'Precision', 'Recall', 'F1', 'ROC_AUC'])
display(comp_df)


In [None]:
from sklearn.metrics import roc_curve, auc

def plot_spark_roc(predictions, label=''):
    pred_pd = predictions.select('probability','High_Depression_Risk').toPandas()
    pred_pd['prob1'] = pred_pd['probability'].apply(lambda x: x[1] if hasattr(x, "__getitem__") else float(x))
    fpr, tpr, _ = roc_curve(pred_pd['High_Depression_Risk'], pred_pd['prob1'])
    auc_val = auc(fpr, tpr)
    plt.plot(fpr, tpr, label=f'{label} (AUC={auc_val:.2f})')

plt.figure(figsize=(6,5))
plot_spark_roc(preds_lr, 'Logistic Regression')
plot_spark_roc(preds_rf, 'Random Forest')
plot_spark_roc(preds_gbt, 'GBT')
plt.plot([0,1],[0,1],'--',color='gray')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve (All Models)')
plt.legend()
plt.show()


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

def print_regression_metrics(preds, model_name=""):
    evaluator_rmse = RegressionEvaluator(labelCol="Depression_Score_Reg", predictionCol="prediction", metricName="rmse")
    evaluator_r2 = RegressionEvaluator(labelCol="Depression_Score_Reg", predictionCol="prediction", metricName="r2")
    rmse = evaluator_rmse.evaluate(preds)
    r2 = evaluator_r2.evaluate(preds)
    print(f"{model_name} -- RMSE: {rmse:.3f}, R²: {r2:.3f}")
    return [model_name, rmse, r2]

reg_results = []
reg_results.append(print_regression_metrics(preds_lr_reg, "Linear Regression"))
reg_results.append(print_regression_metrics(preds_rf_reg, "Random Forest Regressor"))
reg_results.append(print_regression_metrics(preds_gbt_reg, "Gradient Boosted Trees Regressor"))

reg_comp_df = pd.DataFrame(reg_results, columns=['Model', 'RMSE', 'R2'])
display(reg_comp_df)


In [None]:
importances = rf_model.featureImportances.toArray()
oh_cols = []
for c in onehot_features:
    if 'onehot' in c:
        try:
            n = df.select(c).head()[c].size
        except:
            n = 0
        oh_cols.extend([c+f"_{i}" for i in range(n)])
    else:
        oh_cols.append(c)
fi_df = pd.DataFrame({'Feature': oh_cols, 'Importance': importances})
fi_df = fi_df.sort_values('Importance', ascending=False).head(15)
plt.figure(figsize=(6,4))
sns.barplot(data=fi_df, x='Importance', y='Feature')
plt.title("Random Forest Feature Importances")
plt.show()

In [None]:
out_cols = [
    'Age', 'Gender', 'CGPA', 'Stress_Level', 'Depression_Score', 'Anxiety_Score',
    'Sleep_Quality', 'Physical_Activity', 'Diet_Quality', 'Social_Support',
    'Relationship_Status', 'Substance_Use', 'Counseling_Service_Use', 'Family_History',
    'Chronic_Illness', 'Extracurricular_Involvement', 'Residence_Type', 'Course',
    'prediction', 'Depression_Class', 'Depression_Level'
]
final_export = df.select([c for c in out_cols if c in df.columns]).toPandas()
final_export.to_excel("student_mental_health_all_for_tableau.xlsx", index=False)
print("Exported: student_mental_health_all_for_tableau.xlsx")
final_export.head()
