In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.appName('deeplearn').getOrCreate()

In [None]:
!wget https://raw.githubusercontent.com/neelamdoshi/Spark_neelam/main/diabetes.csv

--2024-02-05 04:59:06--  https://raw.githubusercontent.com/neelamdoshi/Spark_neelam/main/diabetes.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 23875 (23K) [text/plain]
Saving to: ‘diabetes.csv’


2024-02-05 04:59:06 (9.76 MB/s) - ‘diabetes.csv’ saved [23875/23875]



In [None]:
# read a csv file
my_data = spark.read.csv('diabetes.csv',header=True)

# see the default schema of the dataframe
my_data.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [None]:
import pyspark.sql.types as tp

# define the schema
my_schema = tp.StructType([
    tp.StructField(name= 'Pregnancies', dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Glucose', dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'BloodPressure',       dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'SkinThickness',  dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'Insulin',   dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'BMI',       dataType= tp.DoubleType(),    nullable= True),
    tp.StructField(name= 'DiabetesPedigreeFunction',    dataType= tp.DoubleType(),   nullable= True),
    tp.StructField(name= 'Age',           dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Outcome',       dataType= tp.IntegerType(),   nullable= True)
])

# read the data again with the defined schema
my_data = spark.read.csv('diabetes.csv',schema= my_schema,header= True)

# print the schema
my_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [None]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))


(768, 9)

In [None]:
my_data.head()

Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1)

In [None]:
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=my_data.columns,
    outputCols=["{}_imputed".format(c) for c in my_data.columns]
    ).setStrategy("median")


my_data1 = imputer.fit(my_data).transform(my_data)

In [None]:
my_data1.head(10)

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1, Glucose_imputed=148, SkinThickness_imputed=35, BloodPressure_imputed=72, Outcome_imputed=1, Insulin_imputed=0, BMI_imputed=33.6, Pregnancies_imputed=6, DiabetesPedigreeFunction_imputed=0.627, Age_imputed=50),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0, Glucose_imputed=85, SkinThickness_imputed=29, BloodPressure_imputed=66, Outcome_imputed=0, Insulin_imputed=0, BMI_imputed=26.6, Pregnancies_imputed=1, DiabetesPedigreeFunction_imputed=0.351, Age_imputed=31),
 Row(Pregnancies=8, Glucose=183, BloodPressure=64, SkinThickness=0, Insulin=0, BMI=23.3, DiabetesPedigreeFunction=0.672, Age=32, Outcome=1, Glucose_imputed=183, SkinThickness_imputed=0, BloodPressure_imputed=64, Outcome_imputed=1, Insulin_imputed=0, BMI_imputed=23.3, Pregnancies_imputed=8, DiabetesPed

In [None]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Pregnancies',
                                       'Glucose',
                                       'BloodPressure',
                                       'SkinThickness',
                                       'Insulin',
                                       'BMI',
                                       'DiabetesPedigreeFunction',
                                       'Age'],
                           outputCol='features')


# transform the data
final_data = assembler.transform(my_data1)

# view the transformed vector

final_data.select("features","Outcome").show(5)

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[6.0,148.0,72.0,3...|      1|
|[1.0,85.0,66.0,29...|      0|
|[8.0,183.0,64.0,0...|      1|
|[1.0,89.0,66.0,23...|      0|
|[0.0,137.0,40.0,3...|      1|
+--------------------+-------+
only showing top 5 rows



In [None]:
from pyspark.ml.classification import LogisticRegression
xtrain, xtest = final_data.randomSplit([0.7, 0.3])


In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'Outcome', maxIter=10)

In [None]:
lrModel = lr.fit(xtrain)

In [None]:
predictions = lrModel.transform(xtest)

In [None]:
predictions.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|          0|     78|           88|           29|     40|36.9|                   0.434| 21|      0|[0.0,78.0,88.0,29...|[3.19015991237759...|[0.96046229348500...|       0.0|
|          0|     91|           68|           32|    210|39.9|                   0.381| 25|      0|[0.0,91.0,68.0,32...|[2.47009025333634...|[0.92201825438620...|       0.0|
|          0|     93|           60|            0|      0|35.3|                   0.263| 25|      0|[0.0,93.0,60.0,0....|[2.5037946

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
evaluator.setLabelCol("Outcome")

MulticlassClassificationEvaluator_adbee31a16e0

In [None]:
evaluator.setPredictionCol("prediction")

MulticlassClassificationEvaluator_adbee31a16e0

In [None]:
evaluator.evaluate(predictions)

0.7372298608253991