In [1]:
!pip install pyspark



In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
import pandas as pd
from sklearn.model_selection import train_test_split
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import findspark
import numpy as np
import pandas as pd

In [6]:
data = pd.read_csv("/content/drive/MyDrive/RNMP/diabetes_binary_health_indicators_BRFSS2015.csv")

In [None]:
data.isnull().sum()

Unnamed: 0,0
Diabetes_binary,0
HighBP,0
HighChol,0
CholCheck,0
BMI,0
Smoker,0
Stroke,0
HeartDiseaseorAttack,0
PhysActivity,0
Fruits,0


In [None]:
data.columns

Index(['Diabetes_binary', 'HighBP', 'HighChol', 'CholCheck', 'BMI', 'Smoker',
       'Stroke', 'HeartDiseaseorAttack', 'PhysActivity', 'Fruits', 'Veggies',
       'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost', 'GenHlth',
       'MentHlth', 'PhysHlth', 'DiffWalk', 'Sex', 'Age', 'Education',
       'Income'],
      dtype='object')

In [None]:
data

Unnamed: 0,Diabetes_binary,HighBP,HighChol,CholCheck,BMI,Smoker,Stroke,HeartDiseaseorAttack,PhysActivity,Fruits,...,AnyHealthcare,NoDocbcCost,GenHlth,MentHlth,PhysHlth,DiffWalk,Sex,Age,Education,Income
0,0.0,1.0,1.0,1.0,40.0,1.0,0.0,0.0,0.0,0.0,...,1.0,0.0,5.0,18.0,15.0,1.0,0.0,9.0,4.0,3.0
1,0.0,0.0,0.0,0.0,25.0,1.0,0.0,0.0,1.0,0.0,...,0.0,1.0,3.0,0.0,0.0,0.0,0.0,7.0,6.0,1.0
2,0.0,1.0,1.0,1.0,28.0,0.0,0.0,0.0,0.0,1.0,...,1.0,1.0,5.0,30.0,30.0,1.0,0.0,9.0,4.0,8.0
3,0.0,1.0,0.0,1.0,27.0,0.0,0.0,0.0,1.0,1.0,...,1.0,0.0,2.0,0.0,0.0,0.0,0.0,11.0,3.0,6.0
4,0.0,1.0,1.0,1.0,24.0,0.0,0.0,0.0,1.0,1.0,...,1.0,0.0,2.0,3.0,0.0,0.0,0.0,11.0,5.0,4.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
253675,0.0,1.0,1.0,1.0,45.0,0.0,0.0,0.0,0.0,1.0,...,1.0,0.0,3.0,0.0,5.0,0.0,1.0,5.0,6.0,7.0
253676,1.0,1.0,1.0,1.0,18.0,0.0,0.0,0.0,0.0,0.0,...,1.0,0.0,4.0,0.0,0.0,1.0,0.0,11.0,2.0,4.0
253677,0.0,0.0,0.0,1.0,28.0,0.0,0.0,0.0,1.0,1.0,...,1.0,0.0,1.0,0.0,0.0,0.0,0.0,2.0,5.0,2.0
253678,0.0,1.0,0.0,1.0,23.0,0.0,0.0,0.0,0.0,1.0,...,1.0,0.0,3.0,0.0,0.0,0.0,1.0,7.0,5.0,1.0


In [7]:
offline_data, online_data = train_test_split(data, test_size=0.2, stratify=data['Diabetes_binary'], random_state=42)

offline_data.to_csv('/content/drive/MyDrive/RNMP/offline.csv', index=False)
online_data.to_csv('/content/drive/MyDrive/RNMP/online.csv', index=False)

Offline phase

In [8]:
findspark.init()

In [9]:
spark = SparkSession.builder \
    .appName("DiabetesClassification") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
    .getOrCreate()

In [10]:
offline_data_spark = spark.read.option("delimiter", ",") \
    .option("header", True)\
    .csv("/content/drive/MyDrive/RNMP/offline.csv",
         inferSchema=True)

In [30]:
def transform_data(df):
    feature_columns = list(df.columns)
    feature_columns.remove('Diabetes_binary')

    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    df_transformed = assembler.transform(df)

    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    scaler = scaler.fit(df_transformed)
    scaler.save("/content/drive/MyDrive/RNMP/scaler")
    df_scaled = scaler.transform(df_transformed)

    return df_scaled

In [31]:
offline_df_scaled = transform_data(offline_data_spark)

In [16]:
offline_df_scaled = offline_df_scaled.withColumnRenamed("Diabetes_binary", "label")

In [17]:
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")
dt = DecisionTreeClassifier(featuresCol="scaled_features", labelCol="label")
rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="label")

lr_paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

dt_paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.minInstancesPerNode, [1, 5, 10]) \
    .addGrid(dt.impurity, ["gini", "entropy"]) \
    .build()

rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_paramGrid, evaluator=MulticlassClassificationEvaluator(metricName="f1"), numFolds=3)
dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_paramGrid, evaluator=MulticlassClassificationEvaluator(metricName="f1"), numFolds=3)
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_paramGrid, evaluator=MulticlassClassificationEvaluator(metricName="f1"), numFolds=3)


In [None]:
lr_model = lr_cv.fit(offline_df_scaled)


In [None]:
dt_model = dt_cv.fit(offline_df_scaled)

In [18]:
rf_model = rf_cv.fit(offline_df_scaled)

In [None]:
lr_f1_score = lr_model.avgMetrics[0]
dt_f1_score = dt_model.avgMetrics[0]
rf_f1_score = rf_model.avgMetrics[0]

best_model_name, best_f1_score = max([
    ("Logistic Regression", lr_f1_score),
    ("Decision Tree", dt_f1_score),
    ("Random ForestTree", rf_f1_score)
], key=lambda x: x[1])

In [None]:
print(f"\nBest Model: {best_model_name} with F1 Score: {best_f1_score}")


Best Model: Random ForestTree with F1 Score: 0.9037037037037037


In [19]:
rf_model.bestModel.save("/content/drive/MyDrive/RNMP/random_forest_tree")


In [27]:
from pyspark.ml.classification import RandomForestClassificationModel

best_model = RandomForestClassificationModel.load("/content/drive/MyDrive/RNMP/random_forest_tree")

In [32]:
from pyspark.ml.feature import  StandardScalerModel
scaler = StandardScalerModel.load("/content/drive/MyDrive/RNMP/scaler")

In [33]:
scaler.transform(offline_df_scaled)

IllegalArgumentException: requirement failed: Output column scaled_features already exists.