In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [27]:
# Initialize Spark session
spark = SparkSession.builder.appName("DiabetesPrediction").getOrCreate()

In [28]:
data = spark.read.option("header", True) \
    .option("inferSchema", True) \
    .csv("/content/diabetes_prediction_india (1).csv")

In [29]:
# EDA: Display basic statistics and null value counts
data.printSchema()
data.show(5)
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Family_History: string (nullable = true)
 |-- Physical_Activity: string (nullable = true)
 |-- Diet_Type: string (nullable = true)
 |-- Smoking_Status: string (nullable = true)
 |-- Alcohol_Intake: string (nullable = true)
 |-- Stress_Level: string (nullable = true)
 |-- Hypertension: string (nullable = true)
 |-- Cholesterol_Level: double (nullable = true)
 |-- Fasting_Blood_Sugar: double (nullable = true)
 |-- Postprandial_Blood_Sugar: double (nullable = true)
 |-- HBA1C: double (nullable = true)
 |-- Heart_Rate: integer (nullable = true)
 |-- Waist_Hip_Ratio: double (nullable = true)
 |-- Urban_Rural: string (nullable = true)
 |-- Health_Insurance: string (nullable = true)
 |-- Regular_Checkups: string (nullable = true)
 |-- Medication_For_Chronic_Conditions: string (nullable = true)
 |-- Pregnancies: integer (nullable = true)
 |-- Polycystic_Ovary_Syndrome: string 

In [30]:
data.describe().show()

+-------+-----------------+------+------------------+--------------+-----------------+--------------+--------------+--------------+------------+------------+------------------+-------------------+------------------------+------------------+-----------------+-------------------+-----------+----------------+----------------+---------------------------------+------------------+-------------------------+-----------------------------+-----------------+------------------+-----------------+---------------+
|summary|              Age|Gender|               BMI|Family_History|Physical_Activity|     Diet_Type|Smoking_Status|Alcohol_Intake|Stress_Level|Hypertension| Cholesterol_Level|Fasting_Blood_Sugar|Postprandial_Blood_Sugar|             HBA1C|       Heart_Rate|    Waist_Hip_Ratio|Urban_Rural|Health_Insurance|Regular_Checkups|Medication_For_Chronic_Conditions|       Pregnancies|Polycystic_Ovary_Syndrome|Glucose_Tolerance_Test_Result|  Vitamin_D_Level|   C_Protein_Level|Thyroid_Condition|Diabete

In [31]:
# Encode categorical variables and assemble features
categorical_cols = [
    "Gender", "Family_History", "Physical_Activity", "Diet_Type", "Smoking_Status",
    "Alcohol_Intake", "Stress_Level", "Hypertension", "Urban_Rural", "Health_Insurance",
    "Regular_Checkups", "Medication_For_Chronic_Conditions", "Polycystic_Ovary_Syndrome",
    "Thyroid_Condition"]

In [32]:
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in categorical_cols]

In [33]:
# Transform categorical features
for indexer in indexers:
    data = indexer.fit(data).transform(data)

In [34]:
# Assemble all features
feature_cols = [
    "Age", "BMI", "Cholesterol_Level", "Fasting_Blood_Sugar", "Postprandial_Blood_Sugar",
    "HBA1C", "Heart_Rate", "Waist_Hip_Ratio", "Pregnancies", "Glucose_Tolerance_Test_Result",
    "Vitamin_D_Level", "C_Protein_Level"] + [f"{col}_index" for col in categorical_cols]

In [35]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(data)

In [36]:
# Standardize features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
data = scaler.fit(data).transform(data)

In [37]:
# Encode target variable
data = StringIndexer(inputCol="Diabetes_Status", outputCol="label").fit(data).transform(data)

In [38]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [39]:
# Train a logistic regression model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

In [40]:
# Hyperparameter tuning with CrossValidator
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 150]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

In [41]:
crossval_rf = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid_rf,
    evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
    numFolds=5
)

In [42]:
# Fit the model
cv_model = crossval_rf.fit(train_data)

In [43]:
# Evaluate models and compare
best_model_rf = cv_model_rf.bestModel

Best numTrees: 50
Best maxDepth: <bound method _DecisionTreeParams.getMaxDepth of RandomForestClassificationModel: uid=RandomForestClassifier_42a2af1deb51, numTrees=50, numClasses=2, numFeatures=26>


In [44]:
# Evaluate the model on test data
predictions = best_model_rf.transform(test_data)

In [45]:
# Evaluate accuracy and other metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

In [46]:
print(f"Test Accuracy: {accuracy:.2f}")
print(f"Test Precision: {precision:.2f}")
print(f"Test Recall: {recall:.2f}")
print(f"Test F1 Score: {f1:.2f}")


Test Accuracy: 0.49
Test Precision: 0.49
Test Recall: 0.49
Test F1 Score: 0.48


In [47]:
# Feature importance
importances = best_model_rf.featureImportances
important_features = [(feature_cols[i], importance) for i, importance in enumerate(importances) if importance > 0]
important_features = sorted(important_features, key=lambda x: x[1], reverse=True)
print("Feature Importances:")
for feature, importance in important_features:
    print(f"{feature}: {importance:.4f}")


Feature Importances:
Age: 0.0777
C_Protein_Level: 0.0725
BMI: 0.0711
Waist_Hip_Ratio: 0.0679
Vitamin_D_Level: 0.0667
Glucose_Tolerance_Test_Result: 0.0666
Heart_Rate: 0.0653
Cholesterol_Level: 0.0651
Fasting_Blood_Sugar: 0.0633
Postprandial_Blood_Sugar: 0.0616
HBA1C: 0.0567
Physical_Activity_index: 0.0265
Diet_Type_index: 0.0252
Smoking_Status_index: 0.0248
Stress_Level_index: 0.0248
Alcohol_Intake_index: 0.0243
Pregnancies: 0.0232
Gender_index: 0.0171
Thyroid_Condition_index: 0.0134
Polycystic_Ovary_Syndrome_index: 0.0128
Hypertension_index: 0.0127
Health_Insurance_index: 0.0125
Medication_For_Chronic_Conditions_index: 0.0125
Urban_Rural_index: 0.0122
Regular_Checkups_index: 0.0121
Family_History_index: 0.0113


In [48]:
spark.stop()