In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

In [2]:
# config = SparkConf().setMaster("local").setAppName('Diabetes Data')
# spark = SparkContext(conf=config)

In [3]:
spark = SparkSession.builder.appName('Diabetes Data').getOrCreate()

In [4]:
spark

In [5]:
df = spark.read.csv("diabetes.csv", header=True)

In [6]:
df.show(5)

+---------+-----------+-------------+----------------------+----------------+------------+-----------+----------------+---+--------+
|PatientID|Pregnancies|PlasmaGlucose|DiastolicBloodPressure|TricepsThickness|SerumInsulin|        BMI|DiabetesPedigree|Age|Diabetic|
+---------+-----------+-------------+----------------------+----------------+------------+-----------+----------------+---+--------+
|  1354778|          0|          171|                    80|              34|          23|43.50972593|     1.213191354| 21|       0|
|  1147438|          8|           92|                    93|              47|          36|21.24057571|     0.158364981| 23|       0|
|  1640031|          7|          115|                    47|              52|          35|41.51152348|     0.079018568| 23|       0|
|  1883350|          9|          103|                    78|              25|         304|29.58219193|     1.282869847| 43|       1|
|  1424119|          1|           85|                    59|         

In [7]:
df.toPandas()

Unnamed: 0,PatientID,Pregnancies,PlasmaGlucose,DiastolicBloodPressure,TricepsThickness,SerumInsulin,BMI,DiabetesPedigree,Age,Diabetic
0,1354778,0,171,80,34,23,43.50972593,1.213191354,21,0
1,1147438,8,92,93,47,36,21.24057571,0.158364981,23,0
2,1640031,7,115,47,52,35,41.51152348,0.079018568,23,0
3,1883350,9,103,78,25,304,29.58219193,1.282869847,43,1
4,1424119,1,85,59,27,35,42.60453585,0.549541871,22,0
...,...,...,...,...,...,...,...,...,...,...
14995,1490300,10,65,60,46,177,33.51246773,0.14832658,41,1
14996,1744410,2,73,66,27,168,30.13263576,0.862252262,38,1
14997,1742742,0,93,89,43,57,18.69068305,0.427048955,24,0
14998,1099353,0,132,98,18,161,19.7916451,0.302257208,23,0


In [8]:
# How many rows we have
df.count()

15000

In [9]:
# The names of our columns
df.columns

['PatientID',
 'Pregnancies',
 'PlasmaGlucose',
 'DiastolicBloodPressure',
 'TricepsThickness',
 'SerumInsulin',
 'BMI',
 'DiabetesPedigree',
 'Age',
 'Diabetic']

In [10]:
# Types of our columns
df.dtypes

[('PatientID', 'string'),
 ('Pregnancies', 'string'),
 ('PlasmaGlucose', 'string'),
 ('DiastolicBloodPressure', 'string'),
 ('TricepsThickness', 'string'),
 ('SerumInsulin', 'string'),
 ('BMI', 'string'),
 ('DiabetesPedigree', 'string'),
 ('Age', 'string'),
 ('Diabetic', 'string')]

In [11]:
# Basics stats from our columns
df.describe().toPandas()

Unnamed: 0,summary,PatientID,Pregnancies,PlasmaGlucose,DiastolicBloodPressure,TricepsThickness,SerumInsulin,BMI,DiabetesPedigree,Age,Diabetic
0,count,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0,15000.0
1,mean,1502922.0028666668,3.224533333333333,107.85686666666666,71.22066666666667,28.814,137.85213333333334,31.509646041017334,0.3989677489566001,30.137733333333333,0.3333333333333333
2,stddev,289253.4434711824,3.3910202078566654,31.981974651810688,16.7587160365316,14.55571578192323,133.0682519590133,9.758999734051889,0.3779435321540775,12.089702515888606,0.4714202350607769
3,min,1000038.0,0.0,100.0,100.0,10.0,124.0,18.20051152,0.078043795,21.0,0.0
4,max,1999997.0,9.0,99.0,99.0,93.0,97.0,56.03462763,2.301594189,77.0,1.0


In [12]:
from pyspark.sql.functions import col
dataset = df.select(
    col('PatientID').cast('int'),
    col('Pregnancies').cast('int'),
    col('PlasmaGlucose').cast('int'),
    col('DiastolicBloodPressure').cast('int'),
    col('TricepsThickness').cast('int'),
    col('SerumInsulin').cast('int'),
    col('BMI').cast('float'),
    col('DiabetesPedigree').cast('float'),
    col('Age').cast('int'),
    col('Diabetic').cast('int')
)
dataset.toPandas()

Unnamed: 0,PatientID,Pregnancies,PlasmaGlucose,DiastolicBloodPressure,TricepsThickness,SerumInsulin,BMI,DiabetesPedigree,Age,Diabetic
0,1354778,0,171,80,34,23,43.509727,1.213191,21,0
1,1147438,8,92,93,47,36,21.240576,0.158365,23,0
2,1640031,7,115,47,52,35,41.511524,0.079019,23,0
3,1883350,9,103,78,25,304,29.582191,1.282870,43,1
4,1424119,1,85,59,27,35,42.604534,0.549542,22,0
...,...,...,...,...,...,...,...,...,...,...
14995,1490300,10,65,60,46,177,33.512466,0.148327,41,1
14996,1744410,2,73,66,27,168,30.132635,0.862252,38,1
14997,1742742,0,93,89,43,57,18.690683,0.427049,24,0
14998,1099353,0,132,98,18,161,19.791645,0.302257,23,0


In [13]:
from pyspark.sql.functions import isnull, when, count, col

dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+---------+-----------+-------------+----------------------+----------------+------------+---+----------------+---+--------+
|PatientID|Pregnancies|PlasmaGlucose|DiastolicBloodPressure|TricepsThickness|SerumInsulin|BMI|DiabetesPedigree|Age|Diabetic|
+---------+-----------+-------------+----------------------+----------------+------------+---+----------------+---+--------+
|        0|          0|            0|                     0|               0|           0|  0|               0|  0|       0|
+---------+-----------+-------------+----------------------+----------------+------------+---+----------------+---+--------+



In [14]:
dataset = dataset.replace('?', None).dropna(how='any')

In [15]:
from pyspark.ml.feature import VectorAssembler

# Assemble all the features with VectorAssembler
required_features = [
    'PatientID',
    'Pregnancies',
    'PlasmaGlucose',
    'DiastolicBloodPressure',
    'TricepsThickness',
    'SerumInsulin',
    'BMI',
    'DiabetesPedigree',
    'Age'
]

assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(dataset)

In [16]:
transformed_data.toPandas()

Unnamed: 0,PatientID,Pregnancies,PlasmaGlucose,DiastolicBloodPressure,TricepsThickness,SerumInsulin,BMI,DiabetesPedigree,Age,Diabetic,features
0,1354778,0,171,80,34,23,43.509727,1.213191,21,0,"[1354778.0, 0.0, 171.0, 80.0, 34.0, 23.0, 43.5..."
1,1147438,8,92,93,47,36,21.240576,0.158365,23,0,"[1147438.0, 8.0, 92.0, 93.0, 47.0, 36.0, 21.24..."
2,1640031,7,115,47,52,35,41.511524,0.079019,23,0,"[1640031.0, 7.0, 115.0, 47.0, 52.0, 35.0, 41.5..."
3,1883350,9,103,78,25,304,29.582191,1.282870,43,1,"[1883350.0, 9.0, 103.0, 78.0, 25.0, 304.0, 29...."
4,1424119,1,85,59,27,35,42.604534,0.549542,22,0,"[1424119.0, 1.0, 85.0, 59.0, 27.0, 35.0, 42.60..."
...,...,...,...,...,...,...,...,...,...,...,...
14995,1490300,10,65,60,46,177,33.512466,0.148327,41,1,"[1490300.0, 10.0, 65.0, 60.0, 46.0, 177.0, 33...."
14996,1744410,2,73,66,27,168,30.132635,0.862252,38,1,"[1744410.0, 2.0, 73.0, 66.0, 27.0, 168.0, 30.1..."
14997,1742742,0,93,89,43,57,18.690683,0.427049,24,0,"[1742742.0, 0.0, 93.0, 89.0, 43.0, 57.0, 18.69..."
14998,1099353,0,132,98,18,161,19.791645,0.302257,23,0,"[1099353.0, 0.0, 132.0, 98.0, 18.0, 161.0, 19...."


In [17]:
rddObj = transformed_data.rdd

In [18]:
(training_data, test_data) = rddObj.randomSplit([0.7, 0.3])

In [19]:
# from pyspark.ml.classification import RandomForestClassifier
# rf = RandomForestClassifier(labelCol='Diabetic', featuresCol='features', maxDepth=10)
# model = rf.fit(training_data)
# predictions = model.transform(test_data)


In [20]:
sc = spark.sparkContext

In [21]:
test_data.take(2)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 11) (DESKTOP-UO5THGD executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 14 more


In [None]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from time import *

start_time = time()

model = RandomForest.trainClassifier(
    training_data, 
    numClasses=2, 
    categoricalFeaturesInfo={},
    seed=9,
    numTrees=9,
    maxDepth=10
)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [None]:
# Evaluate our model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='Diabetic', predictionCol='prediction', metricName='accuracy')

In [None]:
accuracy = evaluator.evaluate(predictions)

print('Test Accuracy = ', accuracy)