In [20]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql.functions import col,count,when

In [21]:
sparkObj=SparkSession.builder.appName('Logistic Regression').getOrCreate()

22/12/30 19:24:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [22]:
dataframe=sparkObj.read.csv('../diabetes.csv',header=True)
dataframe.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [23]:
typecaseDF=dataframe.select(
    *(col(c).cast('float') for c in dataframe.columns)
)
typecaseDF.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [24]:
temp_df=typecaseDF.drop('Outcome')
assembler = VectorAssembler(inputCols=temp_df.columns,outputCol='Status')
data=assembler.transform(typecaseDF)

In [25]:
data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction| Age|Outcome|              Status|
+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+--------------------+
|        6.0|  148.0|         72.0|         35.0|    0.0|33.6|                   0.627|50.0|    1.0|[6.0,148.0,72.0,3...|
|        1.0|   85.0|         66.0|         29.0|    0.0|26.6|                   0.351|31.0|    0.0|[1.0,85.0,66.0,29...|
|        8.0|  183.0|         64.0|          0.0|    0.0|23.3|                   0.672|32.0|    1.0|[8.0,183.0,64.0,0...|
|        1.0|   89.0|         66.0|         23.0|   94.0|28.1|                   0.167|21.0|    0.0|[1.0,89.0,66.0,23...|
|        0.0|  137.0|         40.0|         35.0|  168.0|43.1|                   2.288|33.0|    1.0|[0.0,137.0,40.0,3...|
|        5.0|  116.0|   

In [26]:
from pyspark.ml.feature import StandardScaler

In [27]:
standardScalar=StandardScaler().setInputCol('Status').setOutputCol('Scaled Status')
data=standardScalar.fit(data).transform(data)
data.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)
 |-- Status: vector (nullable = true)
 |-- Scaled Status: vector (nullable = true)



In [28]:
assembled_data = data.select("Scaled status","Outcome")
assembled_data.show()

+--------------------+-------+
|       Scaled status|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [29]:
train,test=assembled_data.randomSplit([0.7,0.3])

In [30]:
logreg=LogisticRegression(featuresCol='Scaled status',labelCol='Outcome',maxIter=40)
model=logreg.fit(train)
predict_test=model.transform(test)
predict_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|       Scaled status|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.5...|    0.0|[4.24192003502424...|[0.98582389643343...|       0.0|
|(8,[0,1,6,7],[0.8...|    0.0|[3.82669916707880...|[0.97868292183254...|       0.0|
|(8,[0,1,6,7],[1.7...|    0.0|[2.16844486890623...|[0.89737984416042...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[0.96931140421929...|[0.72498222477029...|       0.0|
|(8,[1,5,6,7],[4.4...|    1.0|[-0.4979743813284...|[0.37801681465344...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-1.2955612964486...|[0.21491299125103...|       1.0|
|(8,[1,6,7],[2.940...|    0.0|[3.70973607494246...|[0.97610115460090...|       0.0|
|[0.0,1.7827754878...|    0.0|[3.50031943724133...|[0.97069685683765...|       0.0|
|[0.0,2.6898016132...|    0.0|[2.63944626906843...|[0.93335752993200...|    

In [31]:
pred_evalutor=RegressionEvaluator(predictionCol='prediction',labelCol='Outcome',metricName='r2')
pred_evalutor.evaluate(predict_test)

0.14429976082912577

In [32]:
sparkObj.stop()