# DATA MODELING ML PIPELINE SPARK

#### Import Libraries

In [167]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler, MinMaxScaler, PCA
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#### Create Spark Session

In [16]:
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Classification in Spark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Define Schema

In [17]:
diabetesSchema = StructType([
    StructField("Index", IntegerType(), False),
    StructField("EXERCISE", DoubleType(), False),
    StructField("HEARTATTACK", DoubleType(), False),
    StructField("CORONARYHEARTDISEASE", DoubleType(), False),
    StructField("STROKE", DoubleType(), False),
    StructField("ASTHMA", DoubleType(), False),
    StructField("SKINCANCER", DoubleType(), False),
    StructField("OTHERCANCER", DoubleType(), False),
    StructField("CHRONICBRONCHITIS", DoubleType(), False),
    StructField("ARTHRITIS", DoubleType(), False),
    StructField("DEPRESSIVEDISORDER", DoubleType(), False),
    StructField("KIDNEYDISEASE", DoubleType(), False),
    StructField("DIABETES", DoubleType(), False),
    StructField("SLEEPTIME_GROUP", StringType(), False),
    StructField("SEX_GROUP", StringType(), False),
    StructField("WEIGHT_KILOGRAM", DoubleType(), False),
    StructField("HEIGHT_METER", DoubleType(), False),
    StructField("BMI_GROUP", StringType(), False),
    StructField("RACE_GROUP", StringType(), False),
    StructField("AGE_GROUP", StringType(), False),
])

#### Read & Store Data

In [18]:
df = spark.read.csv('DiabetesCleaned_2018.csv', schema=diabetesSchema, header=True)
df.select('WEIGHT_KILOGRAM', 'HEIGHT_METER', 'SLEEPTIME_GROUP', 'SEX_GROUP', 'BMI_GROUP', 'RACE_GROUP', 'AGE_GROUP', 'DIABETES').show(5)

+---------------+------------+---------------+---------+------------+----------+----------+--------+
|WEIGHT_KILOGRAM|HEIGHT_METER|SLEEPTIME_GROUP|SEX_GROUP|   BMI_GROUP|RACE_GROUP| AGE_GROUP|DIABETES|
+---------------+------------+---------------+---------+------------+----------+----------+--------+
|         58.967|       1.626|    NormalSleep|   Female|NormalWeight|     White|   Elderly|     0.0|
|         90.718|       1.651|      LessSleep|   Female|       Obese|     Black|YoungAdult|     0.0|
|          64.41|       1.473|    NormalSleep|   Female|  OverWeight|     White|   Elderly|     1.0|
|         86.182|       1.778|      LessSleep|     Male|  OverWeight|     White|   Elderly|     0.0|
|         78.018|       1.575|      LessSleep|   Female|       Obese|     White|   Elderly|     0.0|
+---------------+------------+---------------+---------+------------+----------+----------+--------+
only showing top 5 rows



In [None]:
df.head(5)

In [None]:
df.toPandas().head(5)

In [19]:
df.dtypes

[('Index', 'int'),
 ('EXERCISE', 'double'),
 ('HEARTATTACK', 'double'),
 ('CORONARYHEARTDISEASE', 'double'),
 ('STROKE', 'double'),
 ('ASTHMA', 'double'),
 ('SKINCANCER', 'double'),
 ('OTHERCANCER', 'double'),
 ('CHRONICBRONCHITIS', 'double'),
 ('ARTHRITIS', 'double'),
 ('DEPRESSIVEDISORDER', 'double'),
 ('KIDNEYDISEASE', 'double'),
 ('DIABETES', 'double'),
 ('SLEEPTIME_GROUP', 'string'),
 ('SEX_GROUP', 'string'),
 ('WEIGHT_KILOGRAM', 'double'),
 ('HEIGHT_METER', 'double'),
 ('BMI_GROUP', 'string'),
 ('RACE_GROUP', 'string'),
 ('AGE_GROUP', 'string')]

#### Balance Data

In [48]:
y_collect = df.select("DIABETES").groupBy("DIABETES").count().collect()
num_1 = y_collect[0]['count']
num_0 = y_collect[1]['count']

ratio_1 = num_1/(num_1+num_0)
ratio_0 = num_0/(num_1+num_0)

def balanced_weight(labels):
    return when(labels == 1, ratio_1).otherwise(ratio_0)

df = df.withColumn('weights', balanced_weight(col('DIABETES')))

#### Split Data (Train & Test)

In [49]:
dividedData  = df.randomSplit([0.80, 0.20]) 
trainingData = dividedData[0] # index 0 = data training
testingData  = dividedData[1] # index 1 = data testing
train_rows   = trainingData.count()
test_rows    = testingData.count()

print("Training data rows:", train_rows)
print("Testing data rows:", test_rows)

Training data rows: 311722
Testing data rows: 77846


#### Transform Data

In [139]:
string_indexer_1 = StringIndexer(inputCol= 'BMI_GROUP', outputCol= 'BMI_GROUP_INDEX')
string_indexer_2 = StringIndexer(inputCol= 'RACE_GROUP', outputCol= 'RACE_GROUP_INDEX')
string_indexer_3 = StringIndexer(inputCol= 'AGE_GROUP', outputCol= 'AGE_GROUP_INDEX')
string_indexer_4 = StringIndexer(inputCol= 'SLEEPTIME_GROUP', outputCol= 'SLEEPTIME_GROUP_INDEX')
string_indexer_5 = StringIndexer(inputCol= 'SEX_GROUP', outputCol= 'SEX_GROUP_INDEX')
onehot_encoder   = OneHotEncoder(inputCols=['BMI_GROUP_INDEX', 'RACE_GROUP_INDEX', 'AGE_GROUP_INDEX', 'SLEEPTIME_GROUP_INDEX', 'SEX_GROUP_INDEX'], 
                        outputCols=['BMI_GROUP_INDEX_OHE', 'RACE_GROUP_INDEX_OHE', 'AGE_GROUP_INDEX_OHE', 'SLEEPTIME_GROUP_INDEX_OHE', 'SEX_GROUP_INDEX_OHE'])
vector_assembler = VectorAssembler(inputCols = ['EXERCISE', 'HEARTATTACK', 'CORONARYHEARTDISEASE', 'STROKE', 'ASTHMA', 'SKINCANCER', 'OTHERCANCER', 
                                       'CHRONICBRONCHITIS', 'ARTHRITIS', 'DEPRESSIVEDISORDER', 'KIDNEYDISEASE', 'SLEEPTIME_GROUP_INDEX_OHE', 
                                       'SEX_GROUP_INDEX_OHE', 'WEIGHT_KILOGRAM', 'HEIGHT_METER', 'BMI_GROUP_INDEX_OHE', 'RACE_GROUP_INDEX_OHE', 
                                       'AGE_GROUP_INDEX_OHE'], outputCol="features")
standard_scaler  = StandardScaler(inputCol = "features", outputCol = "scaled_features")
minmax_scaler    = MinMaxScaler(inputCol = "features", outputCol = "scaled_features")
pca              = PCA(k=3, inputCol = "scaled_features", outputCol = "reduced_features")

#### Pipeline Creation

In [149]:
from pyspark.ml.classification import LogisticRegression
lr_classifier = LogisticRegression(labelCol='DIABETES', featuresCol='reduced_features', weightCol='weights')
pipeline_lr = Pipeline(stages=[string_indexer_1, string_indexer_2, string_indexer_3, string_indexer_4, 
                               string_indexer_5, onehot_encoder, vector_assembler, standard_scaler, pca, lr_classifier])

In [150]:
from pyspark.ml.classification import LinearSVC
svc_classifier = LinearSVC(labelCol='DIABETES', featuresCol='reduced_features', weightCol='weights')
pipeline_svc = Pipeline(stages=[string_indexer_1, string_indexer_2, string_indexer_3, string_indexer_4,
                               string_indexer_5, onehot_encoder, vector_assembler, standard_scaler, pca, svc_classifier])

In [158]:
from pyspark.ml.classification import NaiveBayes
nb_classifier = NaiveBayes(labelCol='DIABETES', featuresCol='scaled_features', weightCol='weights')
pipeline_nb = Pipeline(stages=[string_indexer_1, string_indexer_2, string_indexer_3, string_indexer_4,
                               string_indexer_5, onehot_encoder, vector_assembler, minmax_scaler, nb_classifier])

In [152]:
from pyspark.ml.classification import DecisionTreeClassifier
dt_classifier = DecisionTreeClassifier(labelCol='DIABETES', featuresCol='reduced_features', weightCol='weights')
pipeline_dt = Pipeline(stages=[string_indexer_1, string_indexer_2, string_indexer_3, string_indexer_4,
                               string_indexer_5, onehot_encoder, vector_assembler, standard_scaler, pca, dt_classifier])

In [153]:
from pyspark.ml.classification import RandomForestClassifier
rf_classifier = RandomForestClassifier(labelCol='DIABETES', featuresCol='reduced_features', weightCol='weights')
pipeline_rf = Pipeline(stages=[string_indexer_1, string_indexer_2, string_indexer_3, string_indexer_4,
                               string_indexer_5, onehot_encoder, vector_assembler, standard_scaler, pca, rf_classifier])

In [154]:
from pyspark.ml.classification import GBTClassifier
gbt_classifier = GBTClassifier(labelCol='DIABETES', featuresCol='reduced_features', weightCol='weights')
pipeline_gbt = Pipeline(stages=[string_indexer_1, string_indexer_2, string_indexer_3, string_indexer_4,
                               string_indexer_5, onehot_encoder, vector_assembler, standard_scaler, pca, gbt_classifier])

#### Build Pipeline on Training Data & Predict Pipeline on Testing Data

In [159]:
pipelines = [pipeline_lr, pipeline_svc, pipeline_nb, pipeline_dt, pipeline_rf, pipeline_gbt]

In [160]:
pipe_dict = {0: 'Logistic Regression', 1: 'Support Vector Machine', 2: 'Naive Bayes', 
             3: 'Decision Tree', 4: 'Random Forest', 5: 'Gradient Boosting'}

In [164]:
best_accuracy   = 0
best_classifier = 0
best_pipeline   = ""

for i,pipe in enumerate(pipelines):
    predictions = pipe.fit(trainingData).transform(testingData)
    multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DIABETES', metricName = 'accuracy')
    print("{0:s} Test Accuracy: {1:.3f}".format(pipe_dict[i], multi_evaluator.evaluate(predictions)))
    if multi_evaluator.evaluate(predictions) > best_accuracy:
        best_accuracy = multi_evaluator.evaluate(predictions)
        best_pipeline = pipe
        best_classifier = i
print('Classifier with best accuracy: {}'.format(pipe_dict[best_classifier]))

Logistic Regression Test Accuracy: 0.706
Support Vector Machine Test Accuracy: 0.713
Naive Bayes Test Accuracy: 0.719
Decision Tree Test Accuracy: 0.659
Random Forest Test Accuracy: 0.684
Gradient Boosting Test Accuracy: 0.680
Classifier with best accuracy: Naive Bayes


#### Hyperparameter Tuning

In [172]:
param_grid_lr = ParamGridBuilder() \
    .addGrid(lr_classifier.maxIter, [100, 200, 500, 1000]) \
    .build()

crossval_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=param_grid_lr,
                          evaluator=MulticlassClassificationEvaluator(labelCol='DIABETES', metricName='accuracy'),
                          numFolds=3) 

In [176]:
param_grid_svc = ParamGridBuilder() \
    .addGrid(svc_classifier.maxIter, [100, 200, 500, 1000]) \
    .build()

crossval_svc = CrossValidator(estimator=pipeline_svc,
                          estimatorParamMaps=param_grid_svc,
                          evaluator=MulticlassClassificationEvaluator(labelCol='DIABETES', metricName='accuracy'),
                          numFolds=3) 

In [177]:
param_grid_dt = ParamGridBuilder() \
    .addGrid(dt_classifier.impurity, ['gini', 'entropy']) \
    .addGrid(dt_classifier.maxDepth, [2, 3, 5, 9]) \
    .build()

crossval_dt = CrossValidator(estimator=pipeline_dt,
                          estimatorParamMaps=param_grid_dt,
                          evaluator=MulticlassClassificationEvaluator(labelCol='DIABETES', metricName='accuracy'),
                          numFolds=3) 

In [197]:
param_grid_rf = ParamGridBuilder() \
    .addGrid(rf_classifier.impurity, ['gini', 'entropy']) \
    .addGrid(rf_classifier.maxDepth, [2, 3, 5, 9]) \
    .addGrid(rf_classifier.numTrees, [100, 200, 300, 1000]) \
    .build()

crossval_rf = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=param_grid_rf,
                          evaluator=MulticlassClassificationEvaluator(labelCol='DIABETES', metricName='accuracy'),
                          numFolds=3) 

In [201]:
param_grid_gbt = ParamGridBuilder() \
    .addGrid(gbt_classifier.maxDepth, [2, 3, 5, 9]) \
    .addGrid(gbt_classifier.stepSize, [1, 0.1, 0.01, 0.001]) \
    .build()

crossval_gbt = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=param_grid_gbt,
                          evaluator=MulticlassClassificationEvaluator(labelCol='DIABETES', metricName='accuracy'),
                          numFolds=3) 

#### Build Pipeline on Training Data & Predict Pipeline on Testing Data

In [202]:
pipelines_grid = [crossval_lr, crossval_svc, crossval_dt, crossval_gbt]

# Removed crossval_rf due to limitation of RAM

In [203]:
pipe_grid_dict = {0: 'Logistic Regression', 1: 'Linear Support Vector Machine', 2: 'Decision Tree', 
                  3: 'Gradient Boost'}

# Removed Random Forest due to limitation of RAM

In [204]:
best_accuracy   = 0
best_classifier = 0
best_pipeline   = ""

for i,pipe in enumerate(pipelines_grid):
    predictions = pipe.fit(trainingData).transform(testingData)
    multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DIABETES', metricName = 'accuracy')
    print("{0:s} Test Accuracy: {1:.3f}".format(pipe_grid_dict[i], multi_evaluator.evaluate(predictions)))
    if multi_evaluator.evaluate(predictions) > best_accuracy:
        best_accuracy = multi_evaluator.evaluate(predictions)
        best_pipeline = pipe
        best_classifier = i
print('Classifier with best accuracy: {}'.format(pipe_grid_dict[best_classifier]))

Logistic Regression Test Accuracy: 0.706
Linear Support Vector Machine Test Accuracy: 0.712
Decision Tree Test Accuracy: 0.798
Gradient Boost Test Accuracy: 0.786
Classifier with best accuracy: Decision Tree


##### End of document.