#DATA CLEANING

In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/rsomashe@gmu.edu/diabetes_health_indicators.csv")
df.display()

Diabetes_012,HighBP,HighChol,CholCheck,BMI,Smoker,Stroke,HeartDiseaseorAttack,PhysActivity,Fruits,Veggies,HvyAlcoholConsump,AnyHealthcare,NoDocbcCost,GenHlth,MentHlth,PhysHlth,DiffWalk,Sex,Age,Education,Income
0.0,1.0,1.0,1.0,40.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,5.0,18.0,15.0,1.0,0.0,9.0,4.0,3.0
0.0,0.0,0.0,0.0,25.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,3.0,0.0,0.0,0.0,0.0,7.0,6.0,1.0
0.0,1.0,1.0,1.0,28.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,1.0,5.0,30.0,30.0,1.0,0.0,9.0,4.0,8.0
0.0,1.0,0.0,1.0,27.0,0.0,0.0,0.0,1.0,1.0,1.0,0.0,1.0,0.0,2.0,0.0,0.0,0.0,0.0,11.0,3.0,6.0
0.0,1.0,1.0,1.0,24.0,0.0,0.0,0.0,1.0,1.0,1.0,0.0,1.0,0.0,2.0,3.0,0.0,0.0,0.0,11.0,5.0,4.0
0.0,1.0,1.0,1.0,25.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,1.0,0.0,2.0,0.0,2.0,0.0,1.0,10.0,6.0,8.0
0.0,1.0,0.0,1.0,30.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,3.0,0.0,14.0,0.0,0.0,9.0,6.0,7.0
0.0,1.0,1.0,1.0,25.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,3.0,0.0,0.0,1.0,0.0,11.0,4.0,4.0
2.0,1.0,1.0,1.0,30.0,1.0,0.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,5.0,30.0,30.0,1.0,0.0,9.0,5.0,1.0
0.0,0.0,0.0,1.0,24.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,2.0,0.0,0.0,0.0,1.0,8.0,4.0,3.0


#GENERATION OF A NEW DATASET ON A NEW CSV FILE

In [0]:
from pyspark.sql.functions import col, mean, when, count, stddev , abs
from pyspark.sql.window import Window

print("Head of the dataset:")
df.show(5)

# Check for missing values
print("\nMissing values:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Drop rows with missing values
data = df.na.drop()

# Impute missing values (example: filling missing numeric values with mean)
numeric_columns = ['BMI', 'HighBP', 'CholCheck', 'PhysActivity', 'Fruits', 'Veggies', 'HvyAlcoholConsump']
for col_name in numeric_columns:
    data = data.withColumn(col_name, when(col(col_name).isNull(), mean(col(col_name)).over(Window.partitionBy().rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).otherwise(col(col_name)))

# Identify outliers using z-score method for numerical columns
z_scores = data.select(*[((col(c) - mean(col(c)).over(Window.partitionBy())) / stddev(col(c)).over(Window.partitionBy())).alias(c) for c in numeric_columns])
outliers = z_scores.select(*[(abs(col(c)) > 3).alias(c) for c in numeric_columns])

# Count outliers per column
outlier_count = outliers.agg(*[count(when(col(c), 1)).alias(c) for c in numeric_columns])
print("\nOutliers (count per column):")
outlier_count.show()

# Print summary statistics
print("\nSummary statistics:")
data.describe().show()

# Print the shape of the dataset
print("\nShape of the dataset:", (data.count(), len(data.columns)))

# Save the cleaned dataset
data.write.csv("dbfs:/FileStore/cleaned_diabetes_dataset.csv", header=True)

Head of the dataset:
+------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|Diabetes_012|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex| Age|Education|Income|
+------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|         0.0|   1.0|     1.0|      1.0|40.0|   1.0|   0.0|                 0.0|         0.0|   0.0|    1.0|              0.0|          1.0|        0.0|    5.0|    18.0|    15.0|     1.0|0.0| 9.0|      4.0|   3.0|
|         0.0|   0.0|     0.0|      0.0|25.0|   1.0|   0.0|                 0.0|         1.0|   0.0|    0.0|              0