In [42]:
import pandas as pd
import numpy as np

In [46]:
#Khởi tạo SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Diabetes").getOrCreate()

df = spark.read.csv(
    "diabetes_dataset.csv",
    header=True,
    inferSchema=True
)

In [47]:
#Drop unnecessary columns
cols_to_drop = [
    "ethnicity",
    "gender",
    "education_level",
    "smoking_status",
    "alcohol_consumption_per_week",
    "sleep_hours_per_day",
    "screen_time_hours_per_day",
    "hypertension_history",
    "cardiovascular_history",
    "waist_to_hip_ratio",
    "diastolic_bp",
    "heart_rate",
    "cholesterol_total",
    "ldl_cholesterol",
    "glucose_postprandial",
    "insulin_level",
    "income_level",
    "employment_status",
    "diabetes_stage",
    "diagnosed_diabetes"
]

df = df.drop(*cols_to_drop)

In [48]:
#Kiểm tra dữ liệu bị thiếu
from pyspark.sql.functions import col, sum

df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
]).show()

+---+----------------------------------+----------+-----------------------+---+-----------+---------------+-------------+---------------+-----+-------------------+
|age|physical_activity_minutes_per_week|diet_score|family_history_diabetes|bmi|systolic_bp|hdl_cholesterol|triglycerides|glucose_fasting|hba1c|diabetes_risk_score|
+---+----------------------------------+----------+-----------------------+---+-----------+---------------+-------------+---------------+-----+-------------------+
|  0|                                 0|         0|                      0|  0|          0|              0|            0|              0|    0|                  0|
+---+----------------------------------+----------+-----------------------+---+-----------+---------------+-------------+---------------+-----+-------------------+



In [None]:
# Kiểm tra dữ liệu trùng
total = df.count()
distinct_all = df.dropDuplicates().count()
duplicates_all = total - distinct_all
print(f"Total rows: {total}")
print(f"Distinct rows: {distinct_all}")
print(f"Duplicate rows: {duplicates_all}")
if duplicates_all > 0:
    from pyspark.sql.functions import col
    # Group by all columns to find groups with count>1 (may be expensive on large datasets)
    dup_groups = df.groupBy(df.columns).count().filter(col("count") > 1).orderBy(col("count").desc())
    print("Duplicate groups:")
    dup_groups.show(10, truncate=False)
else:
    print("No duplicates found.")

Total rows: 100000
Distinct rows: 100000
Duplicate rows: 0
No duplicates found.


In [41]:
outlier_cols = [
    "bmi",
    "age",
    "physical_activity_minutes_per_week",
    "systolic_bp",
    "triglycerides",
    "glucose_fasting",
    "hba1c",
    "diabetes_risk_score"
]

for col_name in outlier_cols:
    q1, q3 = df.approxQuantile(col_name, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr
    
    count_outlier = df.filter(
        (df[col_name] < lower) | (df[col_name] > upper)
    ).count()
    
    print(f"{col_name}: {count_outlier} outliers")

    pdf = df.select(outlier_cols).toPandas()

bmi: 809 outliers
age: 0 outliers
physical_activity_minutes_per_week: 3352 outliers
systolic_bp: 360 outliers
triglycerides: 389 outliers
glucose_fasting: 745 outliers
hba1c: 687 outliers
diabetes_risk_score: 1182 outliers


In [50]:
df.show()

+---+----------------------------------+----------+-----------------------+----+-----------+---------------+-------------+---------------+-----+-------------------+
|age|physical_activity_minutes_per_week|diet_score|family_history_diabetes| bmi|systolic_bp|hdl_cholesterol|triglycerides|glucose_fasting|hba1c|diabetes_risk_score|
+---+----------------------------------+----------+-----------------------+----+-----------+---------------+-------------+---------------+-----+-------------------+
| 58|                               215|       5.7|                      0|30.5|        134|             41|          145|            136| 8.18|               29.6|
| 48|                               143|       6.7|                      0|23.1|        129|             55|           30|             93| 5.63|               23.0|
| 60|                                57|       6.4|                      1|22.2|        115|             66|           36|            118| 7.51|               44.7|
| 74|     

In [52]:
# Export cleaned DataFrame as single CSV file
import os, glob, shutil
out_dir = "output/cleaned_data_tmp"
# Coalesce to a single partition and write out
df.coalesce(1).write.csv(out_dir, header=True, mode="overwrite")
# Find the single part file Spark produced and move it
part_files = glob.glob(os.path.join(out_dir, "part-*.csv"))
if part_files:
    part_file = part_files[0]
    dest = "output/cleaned_data.csv"
    shutil.move(part_file, dest)
    # cleanup temporary folder contents except the moved file
    for f in glob.glob(os.path.join(out_dir, "*")):
        # skip if it's the moved file (already relocated)
        if os.path.abspath(f) == os.path.abspath(dest):
            continue
        if os.path.isdir(f):
            shutil.rmtree(f, ignore_errors=True)
        else:
            try:
                os.remove(f)
            except OSError:
                pass
    try:
        os.rmdir(out_dir)
    except OSError:
        pass
    print("Written:", dest)
else:
    print("No part file found in", out_dir)

Written: output/cleaned_data.csv
