## Machine Learning model with Apache Spark and Python

We will build a machine learning model with Apache Spark and Python, to predict whether a person has diabetes or not.

The workflow will be:

- Installing and creating Apache Spark.
- Data Exploration.
- Data preparation
- Training and Evaluating the Machine Learning Model

In [1]:
# Import necessary packages
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import isnull, when, count, col
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
import warnings

def fxn():
    warnings.warn("deprecated", DeprecationWarning)

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    fxn()

# Installing and creating spark session

In [2]:
# Create Spark session
spark = SparkSession \
    .builder \
    .appName('spark session') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/14 10:02:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

# 2. Data description

Pregnancies: Number of times pregnant.

Glucose: Plasma glucose concentration.

BloodPressure: (mm Hg).

SkinThickness: Triceps skin fold thickness (mm).

Insulin: (mu U/ml).

BMI: Body mass index (weight in kg/(height in m)2).

DiabetesPedigreeFunction: Diabetes pedigree function (a function which scores likelihood of diabetes based on family history).

Age: Age (years).

Outcome: Class variable (0 if non-diabetic, 1 if diabetic).


In [4]:
# read the csv file
df = (spark.read
          .format("csv")
          .option('header', 'true')
          .option("inferSchema","true")
          .load("data_diab.csv"))

In [5]:
# print out the first five rows
df.show(10)

+---+---------+-----------+-------------+----------------------+----------------+------------+-----------+----------------+---+--------+
|_c0|PatientID|Pregnancies|PlasmaGlucose|DiastolicBloodPressure|TricepsThickness|SerumInsulin|        BMI|DiabetesPedigree|Age|Diabetic|
+---+---------+-----------+-------------+----------------------+----------------+------------+-----------+----------------+---+--------+
|  0|  1354778|          0|          171|                    80|              34|          23|43.50972593|     1.213191354| 21|       0|
|  1|  1147438|          8|           92|                    93|              47|          36|21.24057571|     0.158364981| 23|       0|
|  2|  1640031|          7|          115|                    47|              52|          35|41.51152348|     0.079018568| 23|       0|
|  3|  1883350|          9|          103|                    78|              25|         304|29.58219193|     1.282869847| 43|       1|
|  4|  1424119|          1|           85|

22/01/14 10:02:20 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , PatientID, Pregnancies, PlasmaGlucose, DiastolicBloodPressure, TricepsThickness, SerumInsulin, BMI, DiabetesPedigree, Age, Diabetic
 Schema: _c0, PatientID, Pregnancies, PlasmaGlucose, DiastolicBloodPressure, TricepsThickness, SerumInsulin, BMI, DiabetesPedigree, Age, Diabetic
Expected: _c0 but found: 
CSV file: file:///Users/nesrine.rabhi/Desktop/Workdir/Lecture2/data_diab.csv


In [6]:
# Return the contents of this DataFrame as Pandas
df.toPandas()

22/01/14 10:02:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , PatientID, Pregnancies, PlasmaGlucose, DiastolicBloodPressure, TricepsThickness, SerumInsulin, BMI, DiabetesPedigree, Age, Diabetic
 Schema: _c0, PatientID, Pregnancies, PlasmaGlucose, DiastolicBloodPressure, TricepsThickness, SerumInsulin, BMI, DiabetesPedigree, Age, Diabetic
Expected: _c0 but found: 
CSV file: file:///Users/nesrine.rabhi/Desktop/Workdir/Lecture2/data_diab.csv


Unnamed: 0,_c0,PatientID,Pregnancies,PlasmaGlucose,DiastolicBloodPressure,TricepsThickness,SerumInsulin,BMI,DiabetesPedigree,Age,Diabetic
0,0,1354778,0,171,80,34,23,43.509726,1.213191,21,0
1,1,1147438,8,92,93,47,36,21.240576,0.158365,23,0
2,2,1640031,7,115,47,52,35,41.511523,0.079019,23,0
3,3,1883350,9,103,78,25,304,29.582192,1.282870,43,1
4,4,1424119,1,85,59,27,35,42.604536,0.549542,22,0
...,...,...,...,...,...,...,...,...,...,...,...
14995,14995,1490300,10,65,60,46,177,33.512468,0.148327,41,1
14996,14996,1744410,2,73,66,27,168,30.132636,0.862252,38,1
14997,14997,1742742,0,93,89,43,57,18.690683,0.427049,24,0
14998,14998,1099353,0,132,98,18,161,19.791645,0.302257,23,0


In [7]:
df = df.drop('_c0')

#print out the dataset 
print("The dataset has", df.count(), "data point", "and", len(df.columns), "columns:\n",df.columns)


The dataset has 15000 data point and 10 columns:
 ['PatientID', 'Pregnancies', 'PlasmaGlucose', 'DiastolicBloodPressure', 'TricepsThickness', 'SerumInsulin', 'BMI', 'DiabetesPedigree', 'Age', 'Diabetic']


# Data Exploration

In [8]:
df.dtypes

[('PatientID', 'int'),
 ('Pregnancies', 'int'),
 ('PlasmaGlucose', 'int'),
 ('DiastolicBloodPressure', 'int'),
 ('TricepsThickness', 'int'),
 ('SerumInsulin', 'int'),
 ('BMI', 'double'),
 ('DiabetesPedigree', 'double'),
 ('Age', 'int'),
 ('Diabetic', 'int')]

In [9]:
# Statistics to describe the data
df.describe().toPandas()

Unnamed: 0,summary,PatientID,Pregnancies,PlasmaGlucose,DiastolicBloodPressure,TricepsThickness,SerumInsulin,BMI,DiabetesPedigree,Age,Diabetic
0,count,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0
1,mean,1502922.0028666668,3.224533333333333,107.85686666666666,71.22066666666667,28.814,137.85213333333334,31.509646041017334,0.3989677489566001,30.137733333333333,0.3333333333333333
2,stddev,289253.4434711824,3.3910202078566654,31.981974651810688,16.7587160365316,14.55571578192323,133.0682519590133,9.758999734051889,0.3779435321540775,12.089702515888606,0.4714202350607769
3,min,1000038.0,0.0,44.0,24.0,7.0,14.0,18.20051152,0.078043795,21.0,0.0
4,max,1999997.0,14.0,192.0,117.0,93.0,799.0,56.03462763,2.301594189,77.0,1.0


In [10]:
new_dataset = df.select(col('Diabetic').cast('int'),
                         col('PlasmaGlucose').cast('float'),
                         col('DiastolicBloodPressure').cast('float'),
                         col('TricepsThickness').cast('float'),
                         col('SerumInsulin').cast('float'),
                         col('Age').cast('float'),
                         col('BMI').cast('float'),
                         col('DiabetesPedigree').cast('float')
                        )
new_dataset.show()

+--------+-------------+----------------------+----------------+------------+----+---------+----------------+
|Diabetic|PlasmaGlucose|DiastolicBloodPressure|TricepsThickness|SerumInsulin| Age|      BMI|DiabetesPedigree|
+--------+-------------+----------------------+----------------+------------+----+---------+----------------+
|       0|        171.0|                  80.0|            34.0|        23.0|21.0|43.509727|       1.2131914|
|       0|         92.0|                  93.0|            47.0|        36.0|23.0|21.240576|      0.15836498|
|       0|        115.0|                  47.0|            52.0|        35.0|23.0|41.511524|      0.07901857|
|       1|        103.0|                  78.0|            25.0|       304.0|43.0|29.582191|       1.2828698|
|       0|         85.0|                  59.0|            27.0|        35.0|22.0|42.604534|       0.5495419|
|       0|         82.0|                  92.0|             9.0|       253.0|26.0|19.724161|       0.1034245|
|       0|

In [11]:
# Count null values
new_dataset.select([count(when(isnull(c), c)).alias(c) for c in new_dataset.columns]).show()

+--------+-------------+----------------------+----------------+------------+---+---+----------------+
|Diabetic|PlasmaGlucose|DiastolicBloodPressure|TricepsThickness|SerumInsulin|Age|BMI|DiabetesPedigree|
+--------+-------------+----------------------+----------------+------------+---+---+----------------+
|       0|            0|                     0|               0|           0|  0|  0|               0|
+--------+-------------+----------------------+----------------+------------+---+---+----------------+



In [12]:
new_dataset.dtypes

[('Diabetic', 'int'),
 ('PlasmaGlucose', 'float'),
 ('DiastolicBloodPressure', 'float'),
 ('TricepsThickness', 'float'),
 ('SerumInsulin', 'float'),
 ('Age', 'float'),
 ('BMI', 'float'),
 ('DiabetesPedigree', 'float')]

# Vector Assembler

In [13]:
# Assemble all the features with VectorAssembler
# This is an additional step required by Spark’s ML models. 
Features = ['PlasmaGlucose',
                    'DiastolicBloodPressure',
                    'TricepsThickness',
                    'SerumInsulin',
                    'Age',
                    'BMI',
                    'DiabetesPedigree'
                   ]

VectorAssembler = VectorAssembler(inputCols=Features, outputCol='features')
data_transformed = VectorAssembler.transform(new_dataset)
data_transformed


DataFrame[Diabetic: int, PlasmaGlucose: float, DiastolicBloodPressure: float, TricepsThickness: float, SerumInsulin: float, Age: float, BMI: float, DiabetesPedigree: float, features: vector]

In [14]:
# Splitting data between train and test
(training_data, test_data) = data_transformed.randomSplit([0.7,0.3])

# Training and Evaluating ML Model

In [15]:
# RF with 10 trees
RandomF = RandomForestClassifier(labelCol = "Diabetic", featuresCol = "features", numTrees = 10)

In [16]:
RF_model = RandomF.fit(training_data)

                                                                                

In [17]:
predictions = RF_model.transform(test_data)

In [18]:
# Evaluate our model
MulticlassClassificationEvaluator = MulticlassClassificationEvaluator(
    labelCol='Diabetic', 
    predictionCol='prediction', 
    metricName='accuracy')

In [19]:
accuracy = MulticlassClassificationEvaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)


[Stage 31:>                                                         (0 + 1) / 1]

Test Accuracy =  0.8581794195250659


                                                                                