In [1]:
pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Using cached findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.appName('deeplearn').getOrCreate()

In [4]:
# read a csv file
my_data = spark.read.csv('diabetes.csv',header=True)

# see the default schema of the dataframe
my_data.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [5]:
import pyspark.sql.types as tp

# define the schema
my_schema = tp.StructType([
    tp.StructField(name= 'Pregnancies', dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Glucose', dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'BloodPressure',       dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'SkinThickness',  dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'Insulin',   dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'BMI',       dataType= tp.DoubleType(),    nullable= True),
    tp.StructField(name= 'DiabetesPedigreeFunction',    dataType= tp.DoubleType(),   nullable= True),
    tp.StructField(name= 'Age',           dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Outcome',       dataType= tp.IntegerType(),   nullable= True)
])

# read the data again with the defined schema
my_data = spark.read.csv('diabetes.csv',schema= my_schema,header= True)

# print the schema
my_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [6]:
(my_data.count() , len(my_data.columns))

(768, 9)

In [7]:
my_data.head()

Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1)

In [8]:
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=my_data.columns,
    outputCols=["{}_imputed".format(c) for c in my_data.columns]
    ).setStrategy("median")


my_data1 = imputer.fit(my_data).transform(my_data)

ModuleNotFoundError: No module named 'distutils'

In [None]:
my_data1.head(10)

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1, Pregnancies_imputed=6, Glucose_imputed=148, BloodPressure_imputed=72, SkinThickness_imputed=35, Insulin_imputed=0, BMI_imputed=33.6, DiabetesPedigreeFunction_imputed=0.627, Age_imputed=50, Outcome_imputed=1),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0, Pregnancies_imputed=1, Glucose_imputed=85, BloodPressure_imputed=66, SkinThickness_imputed=29, Insulin_imputed=0, BMI_imputed=26.6, DiabetesPedigreeFunction_imputed=0.351, Age_imputed=31, Outcome_imputed=0),
 Row(Pregnancies=8, Glucose=183, BloodPressure=64, SkinThickness=0, Insulin=0, BMI=23.3, DiabetesPedigreeFunction=0.672, Age=32, Outcome=1, Pregnancies_imputed=8, Glucose_imputed=183, BloodPressure_imputed=64, SkinThickness_imputed=0, Insulin_imputed=0, BMI_imputed=23.3, DiabetesPedigreeFunction_imput

In [None]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Pregnancies',
                                       'Glucose',
                                       'BloodPressure',
                                       'SkinThickness',
                                       'Insulin',
                                       'BMI',
                                       'DiabetesPedigreeFunction',
                                       'Age'],
                           outputCol='features')


# transform the data
final_data = assembler.transform(my_data1)

# view the transformed vector

final_data.select("features","Outcome").show(5)

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[6.0,148.0,72.0,3...|      1|
|[1.0,85.0,66.0,29...|      0|
|[8.0,183.0,64.0,0...|      1|
|[1.0,89.0,66.0,23...|      0|
|[0.0,137.0,40.0,3...|      1|
+--------------------+-------+
only showing top 5 rows



In [None]:
from pyspark.ml.classification import LogisticRegression
xtrain, xtest = final_data.randomSplit([0.7, 0.3])

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'Outcome', maxIter=10)

In [None]:
lrModel = lr.fit(xtrain)

In [None]:
predictions = lrModel.transform(xtest)

In [None]:
predictions.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-------------------+---------------+---------------------+---------------------+---------------+-----------+--------------------------------+-----------+---------------+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|Pregnancies_imputed|Glucose_imputed|BloodPressure_imputed|SkinThickness_imputed|Insulin_imputed|BMI_imputed|DiabetesPedigreeFunction_imputed|Age_imputed|Outcome_imputed|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-------------------+---------------+---------------------+---------------------+---------------+-----------+--------------------------------+-----------+---------------+--------------------+--------------------+----

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
evaluator.setLabelCol("Outcome")

MulticlassClassificationEvaluator_79c6b000bee7

In [None]:
evaluator.setPredictionCol("prediction")

MulticlassClassificationEvaluator_79c6b000bee7

In [None]:
evaluator.evaluate(predictions)

0.767017617357176