In [3]:
import pandas as pd
from sklearn.model_selection import train_test_split

df = pd.read_csv("data/diabetes_binary_health_indicators_BRFSS2015.csv")

offline_df, online_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42
)

offline_df.to_csv("offline.csv", index=False)
online_df.to_csv("online.csv", index=False)

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

spark = SparkSession.builder \
    .appName("DiabetesOfflineCV") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

df = spark.read.csv("offline.csv", header=True, inferSchema=True)

label_col = "Diabetes_binary"
feature_cols = [c for c in df.columns if c != label_col]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_vec"
)

scaler = StandardScaler(
    inputCol="features_vec",
    outputCol="features",
    withMean=True,
    withStd=True
)

lr = LogisticRegression(featuresCol="features", labelCol=label_col)
rf = RandomForestClassifier(featuresCol="features", labelCol=label_col)
dt = DecisionTreeClassifier(featuresCol="features", labelCol=label_col)

pipelines = [
    (
        "lr",
        Pipeline(stages=[assembler, scaler, lr]),
        ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 0.1])
            .addGrid(lr.elasticNetParam, [0.0, 0.5])
            .build()
    ),
    (
        "rf",
        Pipeline(stages=[assembler, scaler, rf]),
        ParamGridBuilder()
            .addGrid(rf.numTrees, [20, 50])
            .addGrid(rf.maxDepth, [5, 10])
            .build()
    ),
    (
        "dt",
        Pipeline(stages=[assembler, scaler, dt]),
        ParamGridBuilder()
            .addGrid(dt.maxDepth, [5, 10, 20])
            .addGrid(dt.minInstancesPerNode, [1, 5])
            .build()
    )
]

evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    metricName="f1"
)

best_model = None
best_f1 = 0.0
best_name = ""

for name, pipeline, paramGrid in pipelines:
    cv = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        numFolds=5
    )

    cvModel = cv.fit(df)
    f1 = evaluator.evaluate(cvModel.transform(df))

    print(f"{name} CV F1 = {f1:.4f}")

    if f1 > best_f1:
        best_f1 = f1
        best_model = cvModel.bestModel
        best_name = name

print(f"\nBest model: {best_name} | F1 = {best_f1}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/24 11:40:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/24 11:40:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/12/24 11:40:30 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

lr CV F1 = 0.8258


25/12/24 11:41:38 WARN DAGScheduler: Broadcasting large task binary with size 1460.3 KiB
25/12/24 11:41:41 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/12/24 11:41:45 WARN DAGScheduler: Broadcasting large task binary with size 1304.8 KiB
25/12/24 11:42:11 WARN DAGScheduler: Broadcasting large task binary with size 1867.6 KiB
25/12/24 11:42:15 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
25/12/24 11:42:20 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB
25/12/24 11:42:25 WARN DAGScheduler: Broadcasting large task binary with size 1583.9 KiB
25/12/24 11:42:27 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/12/24 11:42:38 WARN DAGScheduler: Broadcasting large task binary with size 1454.3 KiB
25/12/24 11:42:39 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/12/24 11:42:42 WARN DAGScheduler: Broadcasting large task binary with size 1308.6 KiB
25/12/24 11:42:56 WARN DAGScheduler:

rf CV F1 = 0.8264


25/12/24 11:45:45 WARN DAGScheduler: Broadcasting large task binary with size 1105.9 KiB
25/12/24 11:45:45 WARN DAGScheduler: Broadcasting large task binary with size 1520.3 KiB
25/12/24 11:45:45 WARN DAGScheduler: Broadcasting large task binary with size 2006.0 KiB
25/12/24 11:45:46 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/12/24 11:45:47 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/12/24 11:45:47 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
25/12/24 11:45:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/12/24 11:45:49 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/12/24 11:45:53 WARN DAGScheduler: Broadcasting large task binary with size 1299.1 KiB
25/12/24 11:45:53 WARN DAGScheduler: Broadcasting large task binary with size 1627.5 KiB
25/12/24 11:45:54 WARN DAGScheduler: Broadcasting large task binary with size 1945.4 KiB
25/12/24 11:45:54 WARN DAGScheduler:

dt CV F1 = 0.8319

Best model: dt | F1 = 0.8319


                                                                                

In [3]:
from pyspark.ml import PipelineModel

model_path = f"saved_models/best_{best_name}"

best_model.write().overwrite().save(model_path)

print(f"Model saved to: {model_path}")

Model saved to: saved_models/best_dt
