In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator
import os
from pyspark.sql.functions import col, count, when

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("DiabetesClassification") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [4]:
offline_path = './data/offline.csv'
df_spark = spark.read.csv(offline_path, header=True, inferSchema=True)

In [5]:
print(f"Number of rows: {df_spark.count():,}")
print(f"Number of columns: {len(df_spark.columns)}")

Number of rows: 202,944
Number of columns: 22


In [7]:
df_spark.show(5)

+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|Diabetes_binary|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex| Age|Education|Income|
+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|            0.0|   1.0|     1.0|      1.0|24.0|   1.0|   0.0|                 0.0|         1.0|   0.0|    1.0|              0.0|          1.0|        0.0|    3.0|     0.0|     0.0|     0.0|1.0|11.0|      4.0|   8.0|
|            0.0|   1.0|     0.0|      1.0|27.0|   1.0|   0.0|                 0.0|         0.0|   0.0|    1.0|              0.0|   

In [8]:
df_spark.printSchema()

root
 |-- Diabetes_binary: double (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- HeartDiseaseorAttack: double (nullable = true)
 |-- PhysActivity: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Veggies: double (nullable = true)
 |-- HvyAlcoholConsump: double (nullable = true)
 |-- AnyHealthcare: double (nullable = true)
 |-- NoDocbcCost: double (nullable = true)
 |-- GenHlth: double (nullable = true)
 |-- MentHlth: double (nullable = true)
 |-- PhysHlth: double (nullable = true)
 |-- DiffWalk: double (nullable = true)
 |-- Sex: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



In [9]:
df_spark.describe().show()

+-------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-----------------+-----------------+------------------+-------------------+------------------+------------------+------------------+
|summary|    Diabetes_binary|            HighBP|          HighChol|         CholCheck|               BMI|            Smoker|             Stroke|HeartDiseaseorAttack|      PhysActivity|             Fruits|            Veggies|  HvyAlcoholConsump|      AnyHealthcare|        NoDocbcCost|           GenHlth|         MentHlth|         PhysHlth|          DiffWalk|                Sex|               Age|         Education|            Income|
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+----

In [6]:
df_spark.select([count(when(col(c).isNull(), c)).alias(c)
    for c in df_spark.columns]).show()

+---------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|Diabetes_binary|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex|Age|Education|Income|
+---------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|              0|     0|       0|        0|  0|     0|     0|                   0|           0|     0|      0|                0|            0|          0|      0|       0|       0|       0|  0|  0|        0|     0|
+---------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+------------

In [13]:
def create_transformation_pipeline(input_cols):
    assembler = VectorAssembler(
        inputCols=input_cols,
        outputCol="features_raw")

    scaler = StandardScaler(
        inputCol="features_raw",
        outputCol="features",
        withStd=True,
        withMean=True)

    pipeline = Pipeline(stages=[assembler, scaler])
    return pipeline

In [10]:
feature_cols = [col for col in df_spark.columns if col != 'Diabetes_binary']
label_col = 'Diabetes_binary'

print(f"Feature columns ({len(feature_cols)}): {feature_cols}")
print(f"Target columns: {label_col}")

Feature columns (21): ['HighBP', 'HighChol', 'CholCheck', 'BMI', 'Smoker', 'Stroke', 'HeartDiseaseorAttack', 'PhysActivity', 'Fruits', 'Veggies', 'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost', 'GenHlth', 'MentHlth', 'PhysHlth', 'DiffWalk', 'Sex', 'Age', 'Education', 'Income']
Target columns: Diabetes_binary


In [14]:
transform_pipeline = create_transformation_pipeline(feature_cols)

In [15]:
transform_model = transform_pipeline.fit(df_spark)

In [16]:
df_transformed = transform_model.transform(df_spark)

In [17]:
df_transformed.select('features', label_col).show(5, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                                  |Diabetes_binary|
+-------------------------------------------------------------------------------------------------------------

In [18]:
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)
print(f"Train dataset: {train_data.count():,} row")
print(f"Test dataset: {test_data.count():,} row")

Train dataset: 162,465 row
Test dataset: 40,479 row


In [35]:
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="f1")

accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="accuracy"
)

In [31]:
logistic_regression = [
    LogisticRegression(featuresCol="features",
        labelCol="Diabetes_binary",
        maxIter=20,
        regParam=0.0,
        elasticNetParam=0.0),
    LogisticRegression(
        featuresCol="features",
        labelCol="Diabetes_binary",
        maxIter=50,
        regParam=0.1,
        elasticNetParam=0.5),
    LogisticRegression(
        featuresCol="features",
        labelCol="Diabetes_binary",
        maxIter=100,
        regParam=0.3,
        elasticNetParam=1.0
    )
]

In [24]:
random_forest = [
    RandomForestClassifier(
        featuresCol="features",
        labelCol="Diabetes_binary",
        numTrees=20,
        maxDepth=5,
        maxBins=32),
    RandomForestClassifier(
        featuresCol="features",
        labelCol="Diabetes_binary",
        numTrees=50,
        maxDepth=10,
        maxBins=32),
    RandomForestClassifier(
        featuresCol="features",
        labelCol="Diabetes_binary",
        numTrees=100,
        maxDepth=15,
        maxBins=64)
]

In [28]:
decision_tree = [
    DecisionTreeClassifier(
        featuresCol="features",
        labelCol="Diabetes_binary",
        maxDepth=5,
        maxBins=32),
    DecisionTreeClassifier(
        featuresCol="features",
        labelCol="Diabetes_binary",
        maxDepth=10,
        maxBins=32 ),
    DecisionTreeClassifier(
        featuresCol="features",
        labelCol="Diabetes_binary",
        maxDepth=15,
        maxBins=64)]

In [40]:
def calculate_f1_and_accuracy(model_list,train,test):
    for i, model in enumerate(model_list):
        trained_model = model.fit(train)
        predictions = trained_model.transform(test)
        f1_score = f1_evaluator.evaluate(predictions)
        accuracy = accuracy_evaluator.evaluate(predictions)

        print(f'Model {i+1} results:')
        print(f'F1 score: {f1_score}')
        print(f'Accuracy: {accuracy}\n')

In [41]:
calculate_f1_and_accuracy(logistic_regression,train_data,test_data)

Model 1 results:
F1 score: 0.831270712824732
Accuracy: 0.864868203265891

Model 2 results:
F1 score: 0.7980822848024183
Accuracy: 0.8619778156575014

Model 3 results:
F1 score: 0.7980822848024183
Accuracy: 0.8619778156575014



Најдобар модел од Логистичка регресија е тој со параметри: maxIter=20, regParam=0.0 и elasticNetParam=0.0 со f1 score: 0.831270712824732

In [42]:
calculate_f1_and_accuracy(random_forest,train_data,test_data)

Model 1 results:
F1 score: 0.7982498867392818
Accuracy: 0.8620272239926876

Model 2 results:
F1 score: 0.8247336874363377
Accuracy: 0.8671903950196398

Model 3 results:
F1 score: 0.8313472369510807
Accuracy: 0.867906815879839



Најдобар модел од Random Forest е тој со параметри: numTrees=100, maxDepth=15 и maxBins=64 со f1 score: 0.8313472369510807

In [43]:
calculate_f1_and_accuracy(decision_tree,train_data,test_data)

Model 1 results:
F1 score: 0.8218349428302372
Accuracy: 0.8659057783048

Model 2 results:
F1 score: 0.8256851351235526
Accuracy: 0.8637071073890166

Model 3 results:
F1 score: 0.825024398868763
Accuracy: 0.8421403690802638



Најдобар модел од Decision tree е тој со параметри: maxDepth=10 и maxBins=32 со f1 score: 8256851351235526.

In [46]:
model = RandomForestClassifier(
        featuresCol="features",
        labelCol="Diabetes_binary",
        numTrees=100,
        maxDepth=15,
        maxBins=64)

In [47]:
trained_model = model.fit(train_data)
predictions = trained_model.transform(test_data)

In [49]:
model_dir = './model'
os.makedirs(model_dir, exist_ok=True)

In [50]:
transform_model_path = f'{model_dir}/transform_pipeline'
transform_model.write().overwrite().save(transform_model_path)

best_model_path = f'{model_dir}/best_model'
trained_model.write().overwrite().save(best_model_path)