## Read bronze data

In [0]:
bronze_df = spark.table("bronze.prediabetes_raw")
display(bronze_df.limit(5))

sex,age,HE_wt,HE_ht,EC1_1,EC_stt_1,EC_occp,EC_wht_23,BE3_31,BE5_1,BE3_72,BE3_82,BE3_76,BE3_86,BO3_01,diabetes
1,47,82.9,171.8,1,2,5,36,7,0,0,0,0,2,0,0
0,53,45.6,157.5,1,2,5,20,7,0,0,0,0,0,0,1
1,40,82.6,173.5,1,1,5,40,2,3,0,3,5,0,1,0
1,36,79.2,177.2,1,1,2,45,6,0,0,0,0,0,0,1
1,19,73.0,178.5,1,1,9,8,7,4,1,0,1,0,0,0


## Change columm names

In [0]:
#Create a mapping dictionary 
rename_map = {
    "sex": "gender",
    "age": "age",
    "HE_wt": "weight",
    "HE_ht": "height",
    "EC1_1": "employment_status",
    "EC_stt_1": "employment_type",
    "EC_occp": "occupation",
    "EC_wht_23": "weekly_work_hours",
    "BE3_31": "walking_days",
    "BE5_1": "strength_training_days",
    "BE3_72": "work_high_intensity_days",
    "BE3_82": "work_moderate_intensity_days",
    "BE3_76": "leisure_high_intensity_days",
    "BE3_86": "leisure_moderate_intensity_days",
    "BO3_01": "weight_control_exercise",
    "diabetes": "diabetes_status"
}

#Change the column names
df_renamed = bronze_df
for old_name, new_name in rename_map.items():
    df_renamed = df_renamed.withColumnRenamed(old_name, new_name)

In [0]:
display(df_renamed.limit(5))

gender,age,weight,height,employment_status,employment_type,occupation,weekly_work_hours,walking_days,strength_training_days,work_high_intensity_days,work_moderate_intensity_days,leisure_high_intensity_days,leisure_moderate_intensity_days,weight_control_exercise,diabetes_status
1,47,82.9,171.8,1,2,5,36,7,0,0,0,0,2,0,0
0,53,45.6,157.5,1,2,5,20,7,0,0,0,0,0,0,1
1,40,82.6,173.5,1,1,5,40,2,3,0,3,5,0,1,0
1,36,79.2,177.2,1,1,2,45,6,0,0,0,0,0,0,1
1,19,73.0,178.5,1,1,9,8,7,4,1,0,1,0,0,0


## Handle missing values

In [0]:
print(f"Total Rows: {df_renamed.count()}")
display(df_renamed.describe())

Total Rows: 16137


summary,gender,age,weight,height,employment_status,employment_type,occupation,weekly_work_hours,walking_days,strength_training_days,work_high_intensity_days,work_moderate_intensity_days,leisure_high_intensity_days,leisure_moderate_intensity_days,weight_control_exercise,diabetes_status
count,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0,16137.0
mean,0.4987915969511061,42.06296089731673,66.06522897688545,166.17460494515652,1.0,1.2841296399578608,4.391956373551466,41.16799900848981,3.851087562744005,0.7972981347214476,0.1066493152382723,0.4699138625518994,0.4686744748094441,0.9613930718225197,0.4799529032657867,0.3754725165768111
stddev,0.5000140327815483,11.009962000181211,13.020579416743606,8.811481969336523,0.0,0.5382186598218585,2.423339711133829,15.510670824879789,2.5982570553472044,1.5141859807665976,0.6955956297651774,1.4302846320481464,1.2546496306835808,1.709450312115738,0.4996134328791001,0.4842596804964338
min,0.0,19.0,30.6,128.8,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
max,1.0,60.0,138.1,195.0,1.0,4.0,10.0,145.0,7.0,5.0,7.0,7.0,7.0,7.0,1.0,1.0


In [0]:
from pyspark.sql import functions as F
from functools import reduce
from operator import or_

cols = df_renamed.columns

#(col1 is null) OR (col2 is null) OR ... (colN is null)
null_condition = reduce(or_, [F.col(c).isNull() for c in cols])

#Add a new column 'is_valid' (if a row has null value "False", otherwise "True")
validated_df = df_renamed.withColumn(
    "is_valid", 
    F.when(null_condition, False).otherwise(True)
)

display(validated_df.groupBy("is_valid").count())
display(validated_df.limit(5))

is_valid,count
True,16137


gender,age,weight,height,employment_status,employment_type,occupation,weekly_work_hours,walking_days,strength_training_days,work_high_intensity_days,work_moderate_intensity_days,leisure_high_intensity_days,leisure_moderate_intensity_days,weight_control_exercise,diabetes_status,is_valid
1,47,82.9,171.8,1,2,5,36,7,0,0,0,0,2,0,0,True
0,53,45.6,157.5,1,2,5,20,7,0,0,0,0,0,0,1,True
1,40,82.6,173.5,1,1,5,40,2,3,0,3,5,0,1,0,True
1,36,79.2,177.2,1,1,2,45,6,0,0,0,0,0,0,1,True
1,19,73.0,178.5,1,1,9,8,7,4,1,0,1,0,0,0,True


## Handle outliers

In [0]:
from pyspark.sql import functions as F

numeric_cols = ["age", "weight", "height", "weekly_work_hours"]
outlier_bounds = {}

print(f"{'Column':<20} | {'Q1':<7} | {'Q3':<7} | {'IQR':<7} | {'Lower Bound':<12} | {'Upper Bound':<12}")
print("-" * 85)

#Calculate IQR for each numeric column
for col_name in numeric_cols:

    quantiles = validated_df.approxQuantile(col_name, [0.25, 0.75], 0.01)
    q1 = quantiles[0]
    q3 = quantiles[1]
    iqr = q3 - q1
    
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    
    outlier_bounds[col_name] = (lower_bound, upper_bound)
    
    print(f"{col_name:<20} | {q1:<7.2f} | {q3:<7.2f} | {iqr:<7.2f} | {lower_bound:<12.2f} | {upper_bound:<12.2f}")

Column               | Q1      | Q3      | IQR     | Lower Bound  | Upper Bound 
-------------------------------------------------------------------------------------
age                  | 34.00   | 51.00   | 17.00   | 8.50         | 76.50       
weight               | 56.20   | 74.00   | 17.80   | 29.50        | 100.70      
height               | 159.40  | 172.60  | 13.20   | 139.60       | 192.40      
weekly_work_hours    | 35.00   | 49.00   | 14.00   | 14.00        | 70.00       


In [0]:
from operator import and_

bounds = {
    "age": (8.50, 76.50),
    "weight": (29.50, 100.70),
    "height": (139.60, 192.40),
    "weekly_work_hours": (14.00, 70.00)
}

iqr_conditions = []
for col_name, (low, up) in bounds.items():
    condition = F.col(col_name).between(low, up)
    iqr_conditions.append(condition)

all_iqr_pass = reduce(and_, iqr_conditions)

validated_df = validated_df.withColumn(
    "is_valid", 
    F.when(~null_condition & all_iqr_pass, True).otherwise(False)
)

display(validated_df.limit(5))

gender,age,weight,height,employment_status,employment_type,occupation,weekly_work_hours,walking_days,strength_training_days,work_high_intensity_days,work_moderate_intensity_days,leisure_high_intensity_days,leisure_moderate_intensity_days,weight_control_exercise,diabetes_status,is_valid
1,47,82.9,171.8,1,2,5,36,7,0,0,0,0,2,0,0,True
0,53,45.6,157.5,1,2,5,20,7,0,0,0,0,0,0,1,True
1,40,82.6,173.5,1,1,5,40,2,3,0,3,5,0,1,0,True
1,36,79.2,177.2,1,1,2,45,6,0,0,0,0,0,0,1,True
1,19,73.0,178.5,1,1,9,8,7,4,1,0,1,0,0,0,False


## Add BMI column

In [0]:
validated_df = validated_df.withColumn(
    "bmi", 
    F.round(F.col("weight") / F.pow(F.col("height") / 100, 2), 2)
)

height,weight,bmi
171.8,82.9,28.09
157.5,45.6,18.38
173.5,82.6,27.44
177.2,79.2,25.22
178.5,73.0,22.91


In [0]:
display(validated_df.limit(5))

gender,age,weight,height,employment_status,employment_type,occupation,weekly_work_hours,walking_days,strength_training_days,work_high_intensity_days,work_moderate_intensity_days,leisure_high_intensity_days,leisure_moderate_intensity_days,weight_control_exercise,diabetes_status,is_valid,bmi
1,47,82.9,171.8,1,2,5,36,7,0,0,0,0,2,0,0,True,28.09
0,53,45.6,157.5,1,2,5,20,7,0,0,0,0,0,0,1,True,18.38
1,40,82.6,173.5,1,1,5,40,2,3,0,3,5,0,1,0,True,27.44
1,36,79.2,177.2,1,1,2,45,6,0,0,0,0,0,0,1,True,25.22
1,19,73.0,178.5,1,1,9,8,7,4,1,0,1,0,0,0,False,22.91


## Save the data as a table

In [0]:
#Save the full validated dataset to Silver Layer (Contains both True/False)
validated_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.prediabetes_clean_full")

In [0]:
#Create a subset of ONLY valid records for analysis
clean_true_df = validated_df.filter(F.col("is_valid") == True)

#Save the clean subset as a separate table
clean_true_df .write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.prediabetes_clean_true")

##Silver layer report

In [0]:
#Quality Audit Report
total_count = validated_df.count()
valid_count = validated_df.filter("is_valid == True").count()
invalid_count = total_count - valid_count

print(f"- Total Records: {total_count:,}")
print(f"- Valid Records: {valid_count:,} ({round(valid_count/total_count*100, 1)}%)")
print(f"- Invalid Records (Outliers/Nulls): {invalid_count:,} ({round(invalid_count/total_count*100, 1)}%)")

#Verify Saved Data
display(spark.table("silver.prediabetes_clean_true").limit(5))

- Total Records: 16,137
- Valid Records: 14,570 (90.3%)
- Invalid Records (Outliers/Nulls): 1,567 (9.7%)


gender,age,weight,height,employment_status,employment_type,occupation,weekly_work_hours,walking_days,strength_training_days,work_high_intensity_days,work_moderate_intensity_days,leisure_high_intensity_days,leisure_moderate_intensity_days,weight_control_exercise,diabetes_status,is_valid,bmi
1,47,82.9,171.8,1,2,5,36,7,0,0,0,0,2,0,0,True,28.09
0,53,45.6,157.5,1,2,5,20,7,0,0,0,0,0,0,1,True,18.38
1,40,82.6,173.5,1,1,5,40,2,3,0,3,5,0,1,0,True,27.44
1,36,79.2,177.2,1,1,2,45,6,0,0,0,0,0,0,1,True,25.22
0,44,55.7,156.7,1,1,9,43,7,0,0,0,0,0,0,0,True,22.68
