<a href="https://colab.research.google.com/github/PrasadSAmbekar/pysparkproject/blob/main/pysparkmainproject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installation & importation librairies

In [None]:
# installation des librairies
! pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegressionModel
import pandas as ps

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=df651ffca7aea80d107157e46f6470a7528038708d42ebd4171614b4f44b55bf
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## start a Spark session (sparkSession)

start a Spark session (sparkSession)
This code creates a new Spark session.

The SparkSession.builder() method creates a new builder to build a Spark session. The call to appName("spark") sets the name of the Spark application. Finally, getOrCreate() creates a new Spark session if it does not already exist, or returns the existing session if it does. The new Spark session is stored in the spark variable.

In [None]:
spark = SparkSession.builder.appName("spark").getOrCreate()

## Data import & mining

The dataset is downloaded from github. This data comes from Kaggle.



In [None]:
df = spark.read.csv('diabetes.csv', header = True, inferSchema=True) # predicts column type
df.show(6)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+


la colonne `Outcome` est la variable de sortie. `0 : normal, 1 : diabétique`.

In [None]:
# check the type of columns in the df
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [None]:
# Check the dimensions of the df
print(df.count(), ':', len(df.columns))

768 : 9


In [None]:
# Calculate the number of sick and normal people in the dataset
df.groupBy("Outcome").count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



In [None]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

The table shows basic statistics for numeric columns. The minimum value for glucose, insulin and blood pressure is 0!. These values ​​need to be cleaned.

## data cleaning

In [None]:
# Check for missing values ​​in df
for col in df.columns:
  print(col + ":", df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


Our dataset does not contain any missing values.

In [None]:
# create a function to count the number of 0 values ​​and their percentage per column
def count_zeros(df, columns):
  for col in columns:
    num_zeros = df.filter(df[col] == 0).count()
    total_rows = df.count()
    percentage = (num_zeros / total_rows) * 100
    print("{} : {} ({:.2f}%)".format(col, num_zeros, percentage))

In [None]:
liste_cols = ['Glucose', 'Bloodpressure', 'SkinThickness', 'Insulin', 'BMI']
count_zeros(df, liste_cols)

Glucose : 5 (0.65%)
Bloodpressure : 35 (4.56%)
SkinThickness : 227 (29.56%)
Insulin : 374 (48.70%)
BMI : 11 (1.43%)


These percentages indicate that the Glucose column has the lowest percentage of zero values, with only 0.65%. The Insulin column has the highest percentage of zero values, at 47.80%. The other columns have percentages of zero values ​​between 1.40% and 28.65%

In [None]:
# show the average value for each column and do the replacement
## méthode 1 :
for i in df.columns[1:6]:
  mean_val = df.agg({i:'mean'}).first()[0]
  print("the average value of column {} is: {}".format(i, int(mean_val)))
  # update the values : if the condition (val ==0) is right
  df = df.withColumn(i, when(df[i]==0, int(mean_val)).otherwise(df[i]))

df.show(10)

the average value of column Glucose is: 121
the average value of column BloodPressure is: 72
the average value of column SkinThickness is: 26
the average value of column Insulin is: 118
the average value of column BMI is: 32
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|             

The values ​​are replaced correctly:)

## Build and train the machine learning model
### Feature engineering

In [None]:
# calculate the correlation between the response variable and the other variables
for col in df.columns:
  print('The correlation of {} with the outcome variable is {}.'.format(col, df.stat.corr('Outcome', col)))

#print(f'The correlation of {col} with the outcome variable is {df.stat.corr('Outcome', col)}.')

The correlation of Pregnancies with the outcome variable is 0.22189815303398638.
The correlation of Glucose with the outcome variable is 0.49288410274882094.
The correlation of BloodPressure with the outcome variable is 0.16287909949861834.
The correlation of SkinThickness with the outcome variable is 0.171856814176564.
The correlation of Insulin with the outcome variable is 0.17869558803050842.
The correlation of BMI with the outcome variable is 0.31289043493401536.
The correlation of DiabetesPedigreeFunction with the outcome variable is 0.17384406565296007.
The correlation of Age with the outcome variable is 0.23835598302719757.
The correlation of Outcome with the outcome variable is 1.0.


The correlation results indicate that some of the factors can have a significant impact on the outcome of logistic regression.

The variables Glucose and BMI have the highest correlations with the outcome, meaning they are most likely to impact the outcome of the logistic regression.
The variables Pregnancies, BloodPressure, SkinThickness, Insulin and DiabetesPedigreeFunction have lower correlations with the outcome, meaning they are less likely to impact the outcome of the logistic regression.
The Age variable has a moderate correlation with the result, meaning it can have a moderate impact on the outcome of the logistic regression.

In [None]:
# create a vectorAssembler: it is a feature transformer which merges the different columns into a single vector (features).
inputCols = ['Pregnancies' , 'Glucose' , 'BloodPressure', 'SkinThickness' , 'Insulin' , 'BMI' , 'DiabetesPedigreeFunction' ,'Age'  ]
assembler = VectorAssembler(inputCols= inputCols, outputCol='features')
output_data = assembler.transform(df)

In [None]:
# check if the features column is added to the dataframe
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
# show data (3 lines)
output_data.show(3)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|[8.0,183.0,64.0,2...|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
only showing top 3 rows




###Train the model

In [None]:
# select the columns of interest
final_df = output_data.select('features', 'Outcome')
final_df.show(2)

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[6.0,148.0,72.0,3...|      1|
|[1.0,85.0,66.0,29...|      0|
+--------------------+-------+
only showing top 2 rows



In [None]:
# split to training (70%) and test (30%)
train, test = final_df.randomSplit([0.7, 0.3])

# create the template
models = LogisticRegression(labelCol='Outcome')

# train the model
model = models.fit(train)

In [None]:
# show summary
summary = model.summary

In [None]:
## prédictions
summary.predictions.show()
summary.predictions.describe().show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|    0.0|[3.79596707671449...|[0.97803224786297...|       0.0|
|[0.0,67.0,76.0,20...|    0.0|[2.01192226059791...|[0.88204316690786...|       0.0|
|[0.0,73.0,69.0,20...|    0.0|[4.62549974482811...|[0.99029632699917...|       0.0|
|[0.0,74.0,52.0,10...|    0.0|[3.67224996675610...|[0.97521090608918...|       0.0|
|[0.0,78.0,88.0,29...|    0.0|[2.91989281798889...|[0.94882109461500...|       0.0|
|[0.0,84.0,64.0,22...|    0.0|[2.40152317426970...|[0.91694337943944...|       0.0|
|[0.0,86.0,68.0,32...|    0.0|[2.66076718775379...|[0.93467152719219...|       0.0|
|[0.0,91.0,68.0,32...|    0.0|[2.06774954765924...|[0.88772886275497...|       0.0|
|[0.0,91.0,80.0,20...|    0.0|[2.50615914813553...|[0.92457247277558...|    

## model evalution

BinaryClassificationEvaluator from pysparkML is an evaluation tool that allows you to measure the performance of a binary classification model. It provides metrics such as precision, recall, area under the ROC curve (AUC): default, and precision-recall. These metrics can be used to compare model performance and determine the best model to use.

In [None]:
# feed test data in the model and evaluate it
predictions = model.evaluate(test)

In [None]:
# prédictions
predictions.predictions.show(15)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,84.0,82.0,31...|      0|[2.75975543234654...|[0.94046194135391...|       0.0|
|[0.0,93.0,100.0,3...|      0|[0.97087648510295...|[0.72529416538706...|       0.0|
|[0.0,94.0,69.0,20...|      0|[2.79187073785516...|[0.94223494998653...|       0.0|
|[0.0,94.0,70.0,27...|      0|[1.53166019195979...|[0.82224909026757...|       0.0|
|[0.0,95.0,80.0,45...|      0|[2.34359446832630...|[0.91242373325688...|       0.0|
|[0.0,95.0,85.0,25...|      1|[2.24111638345219...|[0.90388149313445...|       0.0|
|[0.0,99.0,69.0,20...|      0|[3.3115734710956,...|[0.96482372175932...|       0.0|
|[0.0,100.0,88.0,6...|      0|[0.49000253798499...|[0.62010703022731...|       0.0|
|[0.0,101.0,62.0,2...|      0|[3.33704060758054...|[0.96567789096433...|    

In most cases (on the 15 lines displayed), the regression model predicts the results well.

In [None]:
from pyspark.ml import evaluation
# model evalution
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8291461412151067

The results show that the regression model has an accuracy of 83%, which is quite good. This means that the model is able to predict with high accuracy whether an individual will have a positive (ill) or negative (diabetic) outcome.