<a href="https://colab.research.google.com/github/gauravj5849/gauravj5849/blob/main/Logit_Regression_on_pysparkML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyarrow==0.15.1

Collecting pyarrow==0.15.1
  Downloading pyarrow-0.15.1-cp37-cp37m-manylinux2010_x86_64.whl (59.2 MB)
[K     |████████████████████████████████| 59.2 MB 1.2 MB/s 
Installing collected packages: pyarrow
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 6.0.1
    Uninstalling pyarrow-6.0.1:
      Successfully uninstalled pyarrow-6.0.1
Successfully installed pyarrow-0.15.1


# New Section

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz
!tar xf spark-3.0.3-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
!ls /usr/lib/jvm/

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [6]:
import sys
from pyspark.sql.functions import *

In [7]:
df=spark.read.csv("/content/diabetes.csv",header=True)

In [8]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [9]:
df.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [10]:
from pyspark.sql.functions import col

In [11]:
ndf=df.select(*(col(c).cast("float").alias(c) for c in df.columns))

In [12]:
ndf.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [13]:
from pyspark.sql.functions import col, count, isnan, when
#checking for null ir nan type values in our columns
ndf.select([count(when(col(c).isNull(), c)).alias(c) for c in ndf.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [14]:
from pyspark.ml.feature import VectorAssembler

In [15]:
cols=ndf.columns

In [16]:
cols.remove("Outcome")

In [17]:
cols

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age']

In [None]:
assembler=VectorAssembler(inputCols=cols,outputCol="features")

In [None]:
data=assembler.transform(ndf)

In [None]:
data.select("features","Outcome").show(truncate=False)

+-----------------------------------------------------------------------+-------+
|features                                                               |Outcome|
+-----------------------------------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.35100001096725464,31.0]   |0.0    |
|[8.0,183.0,64.0,0.0,0.0,23.299999237060547,0.671999990940094,32.0]     |1.0    |
|[1.0,89.0,66.0,23.0,94.0,28.100000381469727,0.16699999570846558,21.0]  |0.0    |
|[0.0,137.0,40.0,35.0,168.0,43.099998474121094,2.2880001068115234,33.0] |1.0    |
|[5.0,116.0,74.0,0.0,0.0,25.600000381469727,0.20100000500679016,30.0]   |0.0    |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.24799999594688416,26.0]                |1.0    |
|[10.0,115.0,0.0,0.0,0.0,35.29999923706055,0.1340000033378601,29.0]     |0.0    |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.15800000727176666,53.0]              |1.0    |
|[8.0,125.0,96.0

In [None]:
from pyspark.ml.feature import StandardScaler

In [None]:
stdscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_feat")

In [None]:
data=stdscaler.fit(data).transform(data)

In [None]:
data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+--------------------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction| Age|Outcome|            features|         Scaled_feat|
+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+--------------------+--------------------+
|        6.0|  148.0|         72.0|         35.0|    0.0|33.6|                   0.627|50.0|    1.0|[6.0,148.0,72.0,3...|[1.78063837321943...|
|        1.0|   85.0|         66.0|         29.0|    0.0|26.6|                   0.351|31.0|    0.0|[1.0,85.0,66.0,29...|[0.29677306220323...|
|        8.0|  183.0|         64.0|          0.0|    0.0|23.3|                   0.672|32.0|    1.0|[8.0,183.0,64.0,0...|[2.37418449762590...|
|        1.0|   89.0|         66.0|         23.0|   94.0|28.1|                   0.167|21.0|    0.0|[1.0,89.0,66.0,23...|[0.29677306220323...|

In [None]:
finaldf=data.select("Scaled_feat","Outcome")

In [None]:
finaldf.show()

+--------------------+-------+
|         Scaled_feat|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [None]:
train,test=finaldf.randomSplit([0.7,0.3])

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
log_reg=LogisticRegression(labelCol="Outcome",featuresCol="Scaled_feat")

In [None]:
model=log_reg.fit(train)

In [None]:
pred_test=model.transform(test)

In [None]:
pred_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|         Scaled_feat|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.0...|    0.0|[3.18200312425599...|[0.96015137768523...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[1.87083943658427...|[0.86655537739739...|       0.0|
|(8,[1,5,6,7],[3.6...|    0.0|[-0.2042223084098...|[0.44912113278908...|       1.0|
|(8,[1,5,6,7],[3.7...|    1.0|[0.50621232955266...|[0.62391813721856...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-0.9440980391435...|[0.28007330151320...|       1.0|
|[0.0,2.0955431172...|    0.0|[1.78682788819423...|[0.85653792585076...|       0.0|
|[0.0,2.3144804578...|    0.0|[3.24263997244595...|[0.96240773795825...|       0.0|
|[0.0,2.6272480873...|    0.0|[2.28645575072865...|[0.90774908081230...|       0.0|
|[0.0,2.9087389538...|    0.0|[1.82385949349006...|[0.86102859150295...|    

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [None]:
predictionAndLabels = pred_test.select("Outcome","prediction").rdd.map(lambda row: row[0:])

In [None]:
summary=BinaryClassificationMetrics(predictionAndLabels)

In [None]:
summary.areaUnderROC

0.7706023651145603