In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("train-model").getOrCreate()

df = spark.read.csv("data/offline.csv", header=True, inferSchema=True)

df.show()

24/01/20 13:13:09 WARN Utils: Your hostname, Danilos-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.100.165 instead (on interface en0)
24/01/20 13:13:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/20 13:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+------+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|    Id|Diabetes_binary|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex| Age|Education|Income|
+------+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+----+---------+------+
|204313|            0.0|   0.0|     1.0|      1.0|25.0|   0.0|   0.0|                 0.0|         1.0|   0.0|    1.0|              1.0|          1.0|        0.0|    2.0|    10.0|     2.0|     0.0|0.0| 3.0|      6.0|   8.0|
|175372|            1.0|   0.0|     1.0|      1.0|26.0|   1.0|   0.0|                 0.0|         0.0| 

In [3]:
from pyspark.sql.functions import col

df = df.withColumn("Diabetes_binary"
                   , col("Diabetes_binary").cast("int"))

In [4]:
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Diabetes_binary: integer (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- HeartDiseaseorAttack: double (nullable = true)
 |-- PhysActivity: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Veggies: double (nullable = true)
 |-- HvyAlcoholConsump: double (nullable = true)
 |-- AnyHealthcare: double (nullable = true)
 |-- NoDocbcCost: double (nullable = true)
 |-- GenHlth: double (nullable = true)
 |-- MentHlth: double (nullable = true)
 |-- PhysHlth: double (nullable = true)
 |-- DiffWalk: double (nullable = true)
 |-- Sex: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



In [8]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, LinearSVC
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

features_list = [
    'HighBP', 'HighChol', 'CholCheck', 'BMI', 'Smoker', 'Stroke', 'HeartDiseaseorAttack',
    'PhysActivity', 'Fruits', 'Veggies', 'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost',
    'GenHlth', 'MentHlth', 'PhysHlth', 'DiffWalk', 'Sex', 'Age', 'Education', 'Income'
]

assembler = VectorAssembler(inputCols=features_list, outputCol="features")
df_a = assembler.transform(df).cache()

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df_a)

(trainingData, testData) = df.randomSplit([0.75, 0.25])

rf = RandomForestClassifier(labelCol="Diabetes_binary", featuresCol="indexedFeatures", numTrees=20)
lr = LogisticRegression(labelCol="Diabetes_binary", featuresCol="indexedFeatures", maxIter=10)
svm = LinearSVC(labelCol="Diabetes_binary", featuresCol="indexedFeatures", maxIter=10)

# Create pipelines with stages
rf_pipeline = Pipeline(stages=[assembler, featureIndexer, rf])
lr_pipeline = Pipeline(stages=[assembler, featureIndexer, lr])
svm_pipeline = Pipeline(stages=[assembler, featureIndexer, svm])

rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.maxBins, [8, 16, 32, 64]) \
    .build()

lr_paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

svm_paramGrid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.1, 0.01]) \
    .build()

all_pipelines = [rf_pipeline, lr_pipeline, svm_pipeline]
all_paramGrids = [rf_paramGrid, lr_paramGrid, svm_paramGrid]

models = []

for i, (pipeline, paramGrid) in enumerate(zip(all_pipelines, all_paramGrids)):
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=MulticlassClassificationEvaluator(metricName='fMeasureByLabel', metricLabel=1,beta=1.0).setLabelCol("Diabetes_binary") ,
                              numFolds=5)  # Use 5 folds

    cvModel = crossval.fit(trainingData)

    predictions = cvModel.transform(testData)

    evaluator = MulticlassClassificationEvaluator(metricName='fMeasureByLabel', metricLabel=1,beta=1.0).setLabelCol("Diabetes_binary")
    f1 = evaluator.evaluate(predictions)
    print(f"Model {i + 1} - Test f1 = {f1}\n")

    evaluator = MulticlassClassificationEvaluator(metricName='accuracy').setLabelCol("Diabetes_binary")
    acc = evaluator.evaluate(predictions)
    print(f"Model {i + 1} - Test acc = {acc}\n")

    bestModel = cvModel.bestModel
    print(f"Model {i + 1} - Best Model:")
    print(bestModel.stages[-1])
    models.append((bestModel, f1))


24/01/20 13:30:54 WARN DAGScheduler: Broadcasting large task binary with size 1534.3 KiB
24/01/20 13:30:55 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/01/20 13:30:55 WARN DAGScheduler: Broadcasting large task binary with size 1338.6 KiB
24/01/20 13:30:57 WARN DAGScheduler: Broadcasting large task binary with size 1539.7 KiB
24/01/20 13:30:58 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/01/20 13:30:58 WARN DAGScheduler: Broadcasting large task binary with size 1400.1 KiB
24/01/20 13:31:00 WARN DAGScheduler: Broadcasting large task binary with size 1532.5 KiB
24/01/20 13:31:00 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/01/20 13:31:01 WARN DAGScheduler: Broadcasting large task binary with size 1425.5 KiB
24/01/20 13:31:03 WARN DAGScheduler: Broadcasting large task binary with size 1528.6 KiB
24/01/20 13:31:03 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/01/20 13:31:04 WARN DAGSchedul

Model 1 - Test f1 = 0.24352273990881798



24/01/20 13:35:57 WARN DAGScheduler: Broadcasting large task binary with size 12.8 MiB
                                                                                

Model 1 - Test f1 = 0.8654656198706666

Model 1 - Best Model:
RandomForestClassificationModel: uid=RandomForestClassifier_66754d402a8c, numTrees=20, numClasses=2, numFeatures=21
Model 2 - Test f1 = 0.21822297679683078

Model 2 - Test f1 = 0.8634089425910179

Model 2 - Best Model:
LogisticRegressionModel: uid=LogisticRegression_8d1e9602ddd7, numClasses=2, numFeatures=21
Model 3 - Test f1 = 0.010299234516353513

Model 3 - Test f1 = 0.8593746910040144

Model 3 - Best Model:
LinearSVCModel: uid=LinearSVC_c91f26a4be42, numClasses=2, numFeatures=21


In [10]:
models[0][0].save("best_model")

24/01/20 13:40:03 WARN TaskSetManager: Stage 7390 contains a task of very large size (1383 KiB). The maximum recommended task size is 1000 KiB.
