# First Setup:

In [None]:
# Spark Session Initialization
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Depression_Prediction_Project") \
    .getOrCreate()

spark

# Part 1 - Model Training & Evaluation

In [None]:
from pyspark.sql.functions import col, isnan, when, count, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, OneHotEncoder, ChiSqSelector, StringIndexerModel, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import pandas as pd
import pandas as pd
import seaborn as sns
import json

In [None]:
# Path to your dataset
data_path = '/home/linuxu/Desktop/Submission/Student_Depression_Dataset.csv'

# Load dataset into Spark DataFrame
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Display schema clearly to ensure correctness
df.printSchema()

# Show the first 5 rows clearly
df.limit(5).toPandas()

# Remove Unwanted Information

## Remove non student rows and columns:

In [None]:
# Check the distinct professions before removal
display(df.select("Profession").distinct().toPandas())

# Keep only rows where Profession is "Student"
df_students = df.filter(df["Profession"] == "Student")

# Verify clearly that only students remain
display(df_students.select("Profession").distinct().toPandas())

# Check new row count (should be smaller than before)
print(f"New row count after filtering students: {df_students.count()}")

In [None]:
# Drop irrelevant columns
columns_to_remove = ["Work Pressure", "Job Satisfaction", "Profession"]  # remove "Proffession" as well, since it's now redundant (all are students)
df_students_clean = df_students.drop(*columns_to_remove)

# Verify clearly columns are removed
df_students_clean.printSchema()

In [None]:
# Display first 5 rows to ensure dataset looks correct
display(df_students_clean.limit(5).toPandas())

# Show count for students with and without depression
display(df.groupBy("Depression").count().toPandas())

## Check and remove rows with missing values:

In [None]:
# Verify missing values for cleanliness check
missing_counts = df_students_clean.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df_students_clean.columns]).toPandas().T
missing_counts.columns = ["Missing Values"]
display(missing_counts)

In [None]:
# Check rows with missing Financial Stress
display(df_students_clean.filter(df_students_clean["Financial Stress"].isNull()).toPandas())

In [None]:
df_students_final = df_students_clean.na.drop(subset=["Financial Stress"])

# Exploratory Data Analysis

## Check basic statistics and categorical variables distributions:

In [None]:
df_students_final.describe().toPandas().set_index("summary")

In [None]:
display(df_students_final.groupBy("Gender").count().toPandas())
display(df_students_final.groupBy("City").count().toPandas())
display(df_students_final.groupBy("Degree").count().toPandas())
display(df_students_final.groupBy("Sleep Duration").count().toPandas())
display(df_students_final.groupBy("Dietary Habits").count().toPandas())
display(df_students_final.groupBy("Family History of Mental Illness").count().toPandas())

## Visualize numeric features correlations:

In [None]:
numeric_cols = ["Age", "Academic Pressure", "CGPA", "Study Satisfaction", "Work/Study Hours", "Financial Stress", "Depression"]

assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features")
df_vector = assembler.transform(df_students_final).select("numeric_features")

correlation_matrix = Correlation.corr(df_vector, "numeric_features").collect()[0][0]
corr_array = correlation_matrix.toArray()

# Creating readable Pandas DataFrame
corr_df = pd.DataFrame(corr_array, index=numeric_cols, columns=numeric_cols)

# Display clearly
display(corr_df)

## Visualize depression by key categorical features:

In [None]:
display(df_students_final.groupBy("Gender", "Depression").count().toPandas())
display(df_students_final.groupBy("City", "Depression").count().toPandas())
display(df_students_final.groupBy("Degree", "Depression").count().toPandas())
display(df_students_final.groupBy("Sleep Duration", "Depression").count().toPandas())
display(df_students_final.groupBy("Dietary Habits", "Depression").count().toPandas())
display(df_students_final.groupBy("Family History of Mental Illness", "Depression").count().toPandas())

## Create a correlation heatmap of numerical features:

In [None]:
# Convert to Pandas DataFrame
corr_df = pd.DataFrame(correlation_matrix.toArray(), index=numeric_cols, columns=numeric_cols)


# Plot heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(corr_df, annot=True, cmap="coolwarm")
plt.title("Correlation Heatmap")
plt.show()

## Remove rows with typos in "city" column:

In [None]:
# Find rare cities (e.g., less than 5 occurrences)
city_counts = df_students_final.groupBy("City").count()
rare_cities = city_counts.filter("count < 5").select("City").collect()
rare_city_list = [row['City'] for row in rare_cities]

# Remove rows with rare cities
df_students_final = df_students_final.filter(~col("City").isin(rare_city_list))

display(df_students_final.groupBy("City").count().toPandas())

# Encoding Categorical Features - Feature Engineering:

In [None]:
# Binary and Ordinal columns clearly indexed
indexers = [
    StringIndexer(inputCol="Gender", outputCol="GenderIdx"),
    StringIndexer(inputCol="Family History of Mental Illness", outputCol="FamilyHistoryIdx",stringOrderType="alphabetAsc"),
    StringIndexer(inputCol="Have you ever had suicidal thoughts ?", outputCol="SuicidalThoughtsIdx",stringOrderType="alphabetAsc"),
    StringIndexer(inputCol="Sleep Duration", outputCol="SleepDurationIdx"),
    StringIndexer(inputCol="Dietary Habits", outputCol="DietaryHabitsIdx")
]

# Nominal columns indexed then One-Hot encoded clearly
indexers += [
    StringIndexer(inputCol="City", outputCol="CityIdx"),
    StringIndexer(inputCol="Degree", outputCol="DegreeIdx")
]

encoders = [
    OneHotEncoder(inputCol="CityIdx", outputCol="CityVec"),
    OneHotEncoder(inputCol="DegreeIdx", outputCol="DegreeVec")
]

# Set up and run the pipeline clearly
pipeline = Pipeline(stages=indexers + encoders)
pipeline_model = pipeline.fit(df_students_final)
df_encoded = pipeline_model.transform(df_students_final)


# Verify clearly that encoding was successful
df_encoded.select(
    "Gender", "GenderIdx", 
    "Family History of Mental Illness", "FamilyHistoryIdx",
    "Have you ever had suicidal thoughts ?", "SuicidalThoughtsIdx",
    "Sleep Duration", "SleepDurationIdx",
    "Dietary Habits", "DietaryHabitsIdx",
    "City", "CityVec",
    "Degree", "DegreeVec"
).limit(10).toPandas()

## Create a correlation heatmap of all features:

In [None]:
# List of all numeric and encoded categorical columns
heatmap_cols = [
    "Age",
    "Academic Pressure",
    "CGPA",
    "Study Satisfaction",
    "Work/Study Hours",
    "Financial Stress",
    "GenderIdx",
    "FamilyHistoryIdx",
    "SuicidalThoughtsIdx",
    "SleepDurationIdx",
    "DietaryHabitsIdx",
    "CityIdx",
    "DegreeIdx",
    "Depression"
]

# Assemble these columns into a vector
assembler = VectorAssembler(inputCols=heatmap_cols, outputCol="heatmap_features")
df_heatmap_vector = assembler.transform(df_encoded).select("heatmap_features")

# Compute the correlation matrix
corr_matrix = Correlation.corr(df_heatmap_vector, "heatmap_features").collect()[0][0]

# Convert to Pandas DataFrame for visualization
corr_df = pd.DataFrame(corr_matrix.toArray(), index=heatmap_cols, columns=heatmap_cols)

# Plot heatmap
plt.figure(figsize=(14, 10))
sns.heatmap(corr_df, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Comprehensive Feature Correlation Heatmap")
plt.show()

# Feature Selection and Model Training

## Final feature preparation:

In [None]:
feature_cols = [
    "Age", "Academic Pressure", "CGPA", "Study Satisfaction",
    "Work/Study Hours", "Financial Stress", "GenderIdx", 
    "FamilyHistoryIdx", "SuicidalThoughtsIdx", "SleepDurationIdx", 
    "DietaryHabitsIdx", "CityVec", "DegreeVec"
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_final = assembler.transform(df_encoded).select("features", "Depression")

df_final.limit(5).toPandas()

## Splitting data into training and test sets:

In [None]:
(train_data, test_data) = df_final.randomSplit([0.8, 0.2], seed=42)
print("Training set count:", train_data.count())
print("Test set count:", test_data.count())

## Training a logistic regression model:

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol="Depression")
lr_model = lr.fit(train_data)

# Display model coefficients and intercept
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)

## Evaluating the model:

In [None]:
# Make predictions on test data
predictions = lr_model.transform(test_data)

# Binary evaluation for Accuracy and AUC
binary_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Depression")
auc = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderROC"})
accuracy = predictions.filter(predictions.Depression == predictions.prediction).count() / predictions.count()

# Multiclass evaluators for precision and recall
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="Depression", predictionCol="prediction", metricName="precisionByLabel")
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="Depression", predictionCol="prediction", metricName="recallByLabel")

# Calculate precision and recall for class '1' (Depression positive)
precision = precision_evaluator.evaluate(predictions, {precision_evaluator.metricLabel: 1.0})
recall = recall_evaluator.evaluate(predictions, {recall_evaluator.metricLabel: 1.0})

# Display all metrics clearly
print(f"Accuracy: {accuracy:.2f}")
print(f"AUC: {auc:.2f}")
print(f"Precision (Depression=1): {precision:.2f}")
print(f"Recall (Depression=1): {recall:.2f}")

# Part 2 - Influential Feature Analysis

## Chi-square feature selection:

In [None]:
# Chi-square selector to select top 12 features
selector = ChiSqSelector(numTopFeatures=12, featuresCol="features", labelCol="Depression", outputCol="selectedFeatures")

# Fit selector to training data
selector_model = selector.fit(train_data)

# Transform train and test datasets
train_selected = selector_model.transform(train_data).select("selectedFeatures", "Depression")
test_selected = selector_model.transform(test_data).select("selectedFeatures", "Depression")

# Display the selected features from training set
train_selected.limit(5).toPandas()

## Retrain and evaluate model on selected features:

In [None]:
# Retrain logistic regression model with selected features
lr_selected = LogisticRegression(featuresCol="selectedFeatures", labelCol="Depression")
lr_selected_model = lr_selected.fit(train_selected)

# Evaluate again
predictions_selected = lr_selected_model.transform(test_selected)

# Binary evaluation for Accuracy and AUC
binary_evaluator_2 = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Depression")
auc_2 = binary_evaluator_2.evaluate(predictions_selected, {binary_evaluator_2.metricName: "areaUnderROC"})
accuracy_2 = predictions_selected.filter(predictions_selected.Depression == predictions_selected.prediction).count() / predictions_selected.count()

# Multiclass evaluators for precision and recall
precision_evaluator_2 = MulticlassClassificationEvaluator(
    labelCol="Depression", predictionCol="prediction", metricName="precisionByLabel")
recall_evaluator_2 = MulticlassClassificationEvaluator(
    labelCol="Depression", predictionCol="prediction", metricName="recallByLabel")

# Calculate precision and recall for class '1' (Depression positive)
precision_2 = precision_evaluator_2.evaluate(predictions_selected, {precision_evaluator_2.metricLabel: 1.0})
recall_2 = recall_evaluator_2.evaluate(predictions_selected, {recall_evaluator_2.metricLabel: 1.0})

# Display all metrics clearly
print(f"Accuracy: {accuracy_2:.2f}")
print(f"AUC: {auc_2:.2f}")
print(f"Precision (Depression=1): {precision_2:.2f}")
print(f"Recall (Depression=1): {recall_2:.2f}")

Precision got better by 0.01 when selected the top 12 features using Chi-square feature selection!

## Extract actual feature names from chi-square selector:

In [None]:
# Sizes of One-Hot Encoded vectors
city_vec_size = df_encoded.select("CityVec").first()["CityVec"].size
degree_vec_size = df_encoded.select("DegreeVec").first()["DegreeVec"].size

# Reconstruct full feature names in correct order (as used in original assembler)
full_feature_names = (
    ["Age", "Academic Pressure", "CGPA", "Study Satisfaction",
     "Work/Study Hours", "Financial Stress", "GenderIdx",
     "FamilyHistoryIdx", "SuicidalThoughtsIdx", "SleepDurationIdx", "DietaryHabitsIdx"]
    + [f"CityVec_{i}" for i in range(city_vec_size)]
    + [f"DegreeVec_{i}" for i in range(degree_vec_size)]
)

# Get selected feature indices from Chi-Square
selected_feature_indices = selector_model.selectedFeatures

# Load label mappings for City and Degree
fitted_stages = pipeline.fit(df_students_final).stages
city_indexer_model = [stage for stage in fitted_stages if isinstance(stage, StringIndexerModel) and stage.getInputCol() == "City"][0]
degree_indexer_model = [stage for stage in fitted_stages if isinstance(stage, StringIndexerModel) and stage.getInputCol() == "Degree"][0]
city_labels = city_indexer_model.labels
degree_labels = degree_indexer_model.labels

# Map feature names
selected_feature_names = []
pretty_feature_names = []

for idx in selected_feature_indices:
    raw_name = full_feature_names[idx]
    selected_feature_names.append(raw_name)

    if raw_name.startswith("CityVec_"):
        i = int(raw_name.split("_")[1])
        city = city_labels[i] if i < len(city_labels) else "Unknown"
        pretty_feature_names.append(f"{raw_name} ({city})")
    elif raw_name.startswith("DegreeVec_"):
        i = int(raw_name.split("_")[1])
        degree = degree_labels[i] if i < len(degree_labels) else "Unknown"
        pretty_feature_names.append(f"{raw_name} ({degree})")
    else:
        pretty_feature_names.append(raw_name)

print("Selected Features:", pretty_feature_names)

## Map coefficients to selected features:

In [None]:
coefficients = lr_selected_model.coefficients.toArray()

# Create DataFrame showing importance
feature_importance_df = pd.DataFrame({
    "Feature": pretty_feature_names,
    "Coefficient": coefficients,
    "Abs_Coefficient": abs(coefficients)
}).sort_values(by="Abs_Coefficient", ascending=False).reset_index(drop=True)

# Rank the features properly
feature_importance_df.index = feature_importance_df.index + 1  # Start from 1

display(feature_importance_df)

## Classify features (behavioral vs. demographic):

In [None]:
# Customize classification for selected features
behavioral_keywords = [
    "Academic Pressure", "CGPA", "Study Satisfaction", "Work/Study Hours",
    "SuicidalThoughtsIdx", "SleepDurationIdx", "DietaryHabitsIdx"
]

def classify_feature(f):
    return "Behavioral" if any(b in f for b in behavioral_keywords) else "Demographic"

feature_importance_df["Category"] = feature_importance_df["Feature"].apply(classify_feature)

display(feature_importance_df)

## Visualize influential features:

In [None]:
plt.figure(figsize=(10, 6))
sns.barplot(
    data=feature_importance_df,
    x="Abs_Coefficient",
    y="Feature",
    hue="Category",
    dodge=False
)

plt.title("Top 12 Influential Features by Logistic Regression Coefficient")
plt.xlabel("Absolute Coefficient Value")
plt.ylabel("Feature")
plt.legend(title="Feature Type")
plt.tight_layout()
plt.show()

# Part 2 Summary - Identifying and Classifying Influential Features:

### In this stage, we aimed to determine which features contribute most significantly to predicting student depression, using the trained logistic regression model and Spark ML tools studied in class.


#### 1. Feature Selection:

We applied Chi-Square feature selection on the training data and selected the top 12 most predictive features. 
These included a mix of numerical, indexed categorical, and one-hot encoded variables.


#### 2. Feature Importance Analysis:

Using the coefficients from the retrained logistic regression model, we ranked the selected features based on the absolute value of their coefficients, which represent their impact on the prediction.


#### 3. Behavioral vs. Demographic Classification:

We grouped the influential features into:
 - Behavioral Factors: e.g., Suicidal Thoughts, Academic Pressure, Dietary Habits, etc.
 - Demographic Factors: e.g., Age, Financial Stress, City, Degree.


#### 4. Key Findings:

- Most influential feature: "SuicidalThoughtsIdx" had the highest coefficient, indicating a strong correlation with depression.
 
- Other highly impactful features included Academic Pressure, Financial Stress, and Dietary Habits.
 
- Both behavioral and demographic factors contribute meaningfully, but behavioral features generally had higher absolute coefficients, highlighting their critical role in depression prediction.
 
- Notably, CityVec_2 (Hyderabad) and DegreeVec_0 (Class 12) were the most influential one-hot encoded demographic attributes.

# Part 3 - Real-Time Depression Prediction Using Kafka & Spark Streaming

## Spark Consumer:

In [None]:
# Stop previous streaming query if exists
try:
    query.stop()
    print("Stopped previous query.")
except NameError:
    print("No active query to stop.")

In [None]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler

# Define the schema for incoming student JSON data
student_schema = StructType([
    StructField("id", IntegerType()),
    StructField("Gender", StringType()),
    StructField("Age", DoubleType()),
    StructField("City", StringType()),
    StructField("Academic Pressure", DoubleType()),
    StructField("CGPA", DoubleType()),
    StructField("Study Satisfaction", DoubleType()),
    StructField("Sleep Duration", StringType()),
    StructField("Dietary Habits", StringType()),
    StructField("Degree", StringType()),
    StructField("Have you ever had suicidal thoughts ?", StringType()),
    StructField("Work/Study Hours", DoubleType()),
    StructField("Financial Stress", DoubleType()),
    StructField("Family History of Mental Illness", StringType()),
    StructField("Depression", IntegerType())
])

# Step 1: Read from Kafka
raw_kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "depression-stream") \
    .option("startingOffsets", "latest") \
    .load()

# Step 2: Parse the JSON
json_df = raw_kafka_df.selectExpr("CAST(value AS STRING) as json_string") \
    .select(from_json(col("json_string"), student_schema).alias("data")) \
    .select("data.*") \
    .dropDuplicates(["id"])

# Step 3: Apply preprocessing pipeline
preprocessed_df = pipeline_model.transform(json_df)

# Step 4: Assemble the features
feature_cols = [
    "Age", "Academic Pressure", "CGPA", "Study Satisfaction",
    "Work/Study Hours", "Financial Stress", "GenderIdx", 
    "FamilyHistoryIdx", "SuicidalThoughtsIdx", "SleepDurationIdx", 
    "DietaryHabitsIdx", "CityVec", "DegreeVec"
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
features_df = assembler.transform(preprocessed_df)

# Step 5: Select top features and predict
selected_df = selector_model.transform(features_df) \
    .select("selectedFeatures", "Depression", "id", "Gender", "City")
predictions_df = lr_selected_model.transform(selected_df)

# Step 6: Output predictions to console
query = predictions_df.select(
    col("id"),
    col("Gender"),
    col("City"),
    col("Depression").alias("Actual"),
    col("prediction").alias("Predicted")
).writeStream \
    .trigger(processingTime="5 seconds") \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()