### Student Depression

#Bronze Layer - Data Retrival

In [0]:
spark.conf.set("fs.s3a.access.key", "<you access key>")
spark.conf.set("fs.s3a.secret.key", "<your access secret>")
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

In [0]:
s3_path = "s3://student-depression/student-data/student_depression_dataset.csv"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("City", StringType(), True),
    StructField("Profession", StringType(), True),
    StructField("Academic_Pressure", IntegerType(), True),
    StructField("Work_Pressure", IntegerType(), True),
    StructField("CGPA", DoubleType(), True),
    StructField("Study_Satisfaction", IntegerType(), True),
    StructField("Job_Satisfaction", IntegerType(), True),
    StructField("Sleep_Duration", StringType(), True),
    StructField("Dietary_Habits", StringType(), True),
    StructField("Degree", StringType(), True),
    StructField("Have_you_ever_had_suicidal_thoughts", StringType(), True),
    StructField("Work_and_Study_Hours", IntegerType(), True),
    StructField("Financial_Stress", IntegerType(), True),
    StructField("Family_History_of_Mental_Illness", StringType(), True),
    StructField("Depression", IntegerType(), True)
])


raw_data = spark.read.format("csv").option("header", "true").schema(schema).load(s3_path)

# Step 2: Add metadata for traceability
from pyspark.sql.functions import current_timestamp, input_file_name

bronze_data = raw_data.withColumn("load_date", current_timestamp()) \
                      .withColumn("source_file_name", input_file_name())

bronze_data.show()

+---+------+----+-------------+----------+-----------------+-------------+----+------------------+----------------+-------------------+--------------+----------+-----------------------------------+--------------------+----------------+--------------------------------+----------+--------------------+--------------------+
| id|Gender| Age|         City|Profession|Academic_Pressure|Work_Pressure|CGPA|Study_Satisfaction|Job_Satisfaction|     Sleep_Duration|Dietary_Habits|    Degree|Have_you_ever_had_suicidal_thoughts|Work_and_Study_Hours|Financial_Stress|Family_History_of_Mental_Illness|Depression|           load_date|    source_file_name|
+---+------+----+-------------+----------+-----------------+-------------+----+------------------+----------------+-------------------+--------------+----------+-----------------------------------+--------------------+----------------+--------------------------------+----------+--------------------+--------------------+
|  2|  Male|null|Visakhapatnam|   

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
bronze_data.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.bronze_data")


#Silver Layer - Data cleansing

In [0]:
from pyspark.sql.functions import col, trim, lower, when, regexp_replace

# Step 1: Trim and standardize string columns
string_cols = [
    "Gender", "City", "Profession", "Sleep_Duration", "Dietary_Habits",
    "Degree", "Have_you_ever_had_suicidal_thoughts", "Family_History_of_Mental_Illness"
]

silver_data = bronze_data

# Apply trimming and lowercasing to string columns
for col_name in string_cols:
    silver_data = silver_data.withColumn(col_name, lower(trim(col(col_name))))

# Step 2: Clean specific fields
silver_data = silver_data.withColumn(
    "Sleep_Duration", regexp_replace("Sleep_Duration", "'", "")  # remove single quotes
)

# Step 3: Replace yes/no with boolean 1/0 where meaningful
silver_data = silver_data.withColumn(
    "Have_you_ever_had_suicidal_thoughts",
    when(col("Have_you_ever_had_suicidal_thoughts") == "yes", 1)
    .when(col("Have_you_ever_had_suicidal_thoughts") == "no", 0)
    .otherwise(None)
)

silver_data = silver_data.withColumn(
    "Family_History_of_Mental_Illness",
    when(col("Family_History_of_Mental_Illness") == "yes", 1)
    .when(col("Family_History_of_Mental_Illness") == "no", 0)
    .otherwise(None)
)

# Step 4: Drop records with impossible or missing values
#silver_data = silver_data.filter(
#    (col("Age") >= 10) & (col("Age") <= 100) & (col("CGPA").isNotNull())
#)

# Step 5: Add transformed timestamp columns if needed
from pyspark.sql.functions import current_timestamp
silver_data = silver_data.withColumn("processed_date", current_timestamp())

# Optional: show cleaned data
silver_data.show(truncate=False)


+---+------+----+-------------+----------+-----------------+-------------+----+------------------+----------------+-----------------+--------------+----------+-----------------------------------+--------------------+----------------+--------------------------------+----------+-----------------------+-------------------------------------------------------------------+-----------------------+
|id |Gender|Age |City         |Profession|Academic_Pressure|Work_Pressure|CGPA|Study_Satisfaction|Job_Satisfaction|Sleep_Duration   |Dietary_Habits|Degree    |Have_you_ever_had_suicidal_thoughts|Work_and_Study_Hours|Financial_Stress|Family_History_of_Mental_Illness|Depression|load_date              |source_file_name                                                   |processed_date         |
+---+------+----+-------------+----------+-----------------+-------------+----+------------------+----------------+-----------------+--------------+----------+-----------------------------------+-----------------

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS silver")

silver_data.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.silver_data")


#Gold Layer - Data aggregation

In [0]:
from pyspark.sql.functions import when, col, expr

# Start from silver layer
gold_data = silver_data

# 1. Create a new column: Overall_Stress = Academic_Pressure + Work_Pressure + Financial_Stress
gold_data = gold_data.withColumn(
    "Overall_Stress",
    col("Academic_Pressure") + col("Work_Pressure") + col("Financial_Stress")
)

# 2. Create a new column: Total_Satisfaction = Study_Satisfaction + Job_Satisfaction
gold_data = gold_data.withColumn(
    "Total_Satisfaction",
    col("Study_Satisfaction") + col("Job_Satisfaction")
)

# 3. Classify Sleep Quality from Sleep_Duration
gold_data = gold_data.withColumn(
    "Sleep_Quality",
    when(col("Sleep_Duration") == "less than 5 hours", "Poor")
    .when(col("Sleep_Duration") == "5-6 hours", "Moderate")
    .when(col("Sleep_Duration") == "7-8 hours", "Good")
    .otherwise("Unknown")
)

# 4. Create Mental_Health_Risk_Flag (1 if suicidal thoughts or depression or family history)
gold_data = gold_data.withColumn(
    "Mental_Health_Risk_Flag",
    when((col("Have_you_ever_had_suicidal_thoughts") == 1) | 
         (col("Depression") == 1) | 
         (col("Family_History_of_Mental_Illness") == 1), 1).otherwise(0)
)

# 5. Optional: Label-based version of depression
gold_data = gold_data.withColumn(
    "Depression_Label",
    when(col("Depression") == 1, "Depressed").otherwise("Not Depressed")
)

# 6. Preserve processed_date and add gold_timestamp
from pyspark.sql.functions import current_timestamp
gold_data = gold_data.withColumn("gold_processed_date", current_timestamp())

gold_data.show(truncate=False)


+---+------+----+-------------+----------+-----------------+-------------+----+------------------+----------------+-----------------+--------------+----------+-----------------------------------+--------------------+----------------+--------------------------------+----------+-----------------------+-------------------------------------------------------------------+-----------------------+--------------+------------------+-------------+-----------------------+----------------+-----------------------+
|id |Gender|Age |City         |Profession|Academic_Pressure|Work_Pressure|CGPA|Study_Satisfaction|Job_Satisfaction|Sleep_Duration   |Dietary_Habits|Degree    |Have_you_ever_had_suicidal_thoughts|Work_and_Study_Hours|Financial_Stress|Family_History_of_Mental_Illness|Depression|load_date              |source_file_name                                                   |processed_date         |Overall_Stress|Total_Satisfaction|Sleep_Quality|Mental_Health_Risk_Flag|Depression_Label|gold_proces

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

gold_data.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.gold_data")
