# Example of a simple Machine Learning project with pyspark
 author: Francisco Mena

This is a simple example of a classification problem using pyspark

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("classify").getOrCreate()

In [0]:
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/tables/diabetes.csv")

In [0]:
df.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [0]:
#All columns are numeric, no need for one-hot encoding
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [0]:
#Some imbalance present 
df.groupby("Outcome").count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



In [0]:
#Let's take a look at the statistics of each feature
for col in df.columns:
    print(col)
    df.select(col).describe().show()

Pregnancies
+-------+------------------+
|summary|       Pregnancies|
+-------+------------------+
|  count|               768|
|   mean|3.8450520833333335|
| stddev|  3.36957806269887|
|    min|                 0|
|    max|                17|
+-------+------------------+

Glucose
+-------+-----------------+
|summary|          Glucose|
+-------+-----------------+
|  count|              768|
|   mean|     120.89453125|
| stddev|31.97261819513622|
|    min|                0|
|    max|              199|
+-------+-----------------+

BloodPressure
+-------+------------------+
|summary|     BloodPressure|
+-------+------------------+
|  count|               768|
|   mean|       69.10546875|
| stddev|19.355807170644777|
|    min|                 0|
|    max|               122|
+-------+------------------+

SkinThickness
+-------+------------------+
|summary|     SkinThickness|
+-------+------------------+
|  count|               768|
|   mean|20.536458333333332|
| stddev|15.952217567727642|
|

In [0]:
#Count NaNs

from pyspark.sql.functions import isnan, when, count, col 
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()


+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [0]:
df.columns

Out[28]: ['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

In [0]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness',
 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'],
    outputCol="features")


In [0]:
new_df = assembler.transform(df)
new_df.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|[8.0,183.0,64.0,0...|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|[1.0,89.0,66.0,23...|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|[0.0,137.0,40.0,3...|
+-----------+-------+-----------

In [0]:
df2 = new_df.select("features", "Outcome")
df2.show(5)

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[6.0,148.0,72.0,3...|      1|
|[1.0,85.0,66.0,29...|      0|
|[8.0,183.0,64.0,0...|      1|
|[1.0,89.0,66.0,23...|      0|
|[0.0,137.0,40.0,3...|      1|
+--------------------+-------+
only showing top 5 rows



In [0]:
#Split and normalize
train_data,test_data = df2.randomSplit([0.7,0.3])

In [0]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(train_data)
train_scaled = scalerModel.transform(train_data)
test_scaled = scalerModel.transform(test_data)


In [0]:
train_scaled.show(3)
test_scaled.show(3)

+--------------------+-------+--------------------+
|            features|Outcome|      scaledFeatures|
+--------------------+-------+--------------------+
|(8,[0,1,6,7],[2.0...|      0|(8,[0,1,6,7],[0.5...|
|(8,[0,1,6,7],[2.0...|      0|(8,[0,1,6,7],[0.5...|
|(8,[0,1,6,7],[6.0...|      0|(8,[0,1,6,7],[1.7...|
+--------------------+-------+--------------------+
only showing top 3 rows

+--------------------+-------+--------------------+
|            features|Outcome|      scaledFeatures|
+--------------------+-------+--------------------+
|(8,[0,1,6,7],[3.0...|      0|(8,[0,1,6,7],[0.8...|
|(8,[0,1,6,7],[10....|      1|(8,[0,1,6,7],[2.9...|
|(8,[1,5,6,7],[73....|      0|(8,[1,5,6,7],[2.3...|
+--------------------+-------+--------------------+
only showing top 3 rows



In [0]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lista_modelos = [("logres", LogisticRegression(featuresCol="scaledFeatures", labelCol="Outcome") ), 
                 ("randomForest", RandomForestClassifier(featuresCol="scaledFeatures", labelCol="Outcome") ),
                 ("SVC", LinearSVC(featuresCol="scaledFeatures", labelCol="Outcome")),
                 ("GradientBoost", GBTClassifier(featuresCol="scaledFeatures", labelCol="Outcome"))
                ]
                  

In [0]:
for name, model in lista_modelos:
    
    trainedModel = model.fit(train_scaled)
    
    predictions = trainedModel.transform(test_scaled)
    
    multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Outcome', metricName = 'accuracy')
    print(name, multi_evaluator.evaluate(predictions))
    print("\n")

logres 0.7805907172995781


randomForest 0.7721518987341772


SVC 0.7805907172995781


GradientBoost 0.7257383966244726




In [0]:
#Interesting! LinearSVC/Logistic Regression have the best performance