# Installing Java openjdk for java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Installing pyspark v2.4.4

In [2]:
!pip install pyspark==2.4.4

Collecting pyspark==2.4.4
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 59kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 54.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130388 sha256=2fa536984b0102ab3e94a233546e033ee6d88900602cba650191c72b0c1bb2f7
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4

# Setting Java environment variable

In [3]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

# Run a Spark Session with session name 'spark'

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark').getOrCreate()

#Downloading Dataset

In [6]:
!git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 3, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects: 100% (2/2), done.[K
remote: Total 3 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (3/3), done.


#Reading Dataset 
* similar to pandas dataframe
* only difference is pyspark considers the dataset values as immutable, so create a new dataframe to update values


---


* **if header is true, pyspark treats the first row as heading.**
* **if inferSchema is true, pyspark identifies the datatypes of the values in the dataset. else, it treats everything as string**



In [8]:
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv', header = True, inferSchema = True)

# Display dataframe head


```
df.show(n=20, truncate=True, vertical=False)
Prints the first n rows to the console.

:param n: Number of rows to show. :param truncate: If set to True, truncate 
strings longer than 20 chars by default.

    If set to a number greater than one, truncates long strings to length `truncate`  
    and align cells right.  
:param vertical: If set to True, print output rows vertically (one line

    per column value).  
df DataFrame[age: int, name: string] df.show() +---+-----+ |age| name| +---
+-----+ | 2|Alice| | 5| Bob| +---+-----+ df.show(truncate=3) +---+----+ |age|
name| +---+----+ | 2| Ali| | 5| Bob| +---+----+ df.show(vertical=True)

-RECORD 0-----  
 age  | 2  
 name | Alice  
-RECORD 1-----  
 age  | 5  
 name | Bob  

```



In [9]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


## printSchema prints the datatypes of the Schemas

In [10]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



# df.count() shows the number of rows
# df.columns returns a list of column names

In [12]:
print(df.count(),len(df.columns))

2000 9


groupBy or groupby('Column') creates a groupby object


It needs count().show() to view them

In [17]:
df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



## describe().show() summarizes the values with certain statistical operations
Note: Glucose to BMI has 0 as minimum which is wrong.
#Cleaning the data

In [18]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

#Counting the null values

In [31]:
for col in df.columns:
  print(col,':',df[df['Pregnancies'].isNull()].count())

Pregnancies : 0
Glucose : 0
BloodPressure : 0
SkinThickness : 0
Insulin : 0
BMI : 0
DiabetesPedigreeFunction : 0
Age : 0
Outcome : 0


#Count the 0 values in all the columns

In [38]:
columns_list=['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
for i in columns_list:
  print(i,':',df[df[i]==0].count())

Glucose : 13
BloodPressure : 90
SkinThickness : 573
Insulin : 956
BMI : 28


# Calculate Mean values for those columns


```
df.agg(*exprs)
Aggregate on the entire :class:DataFrame without groups (shorthand for df.groupBy.agg()).

df.agg({"age": "max"}).collect() [Row(max(age)=5)] from pyspark.sql import functions as F df.agg(F.min(df.age)).collect() [Row(min(age)=2)]
```



In [40]:
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print("Mean Value for {} is {}".format(i,int(data)))

Mean Value for Glucose is 121
Mean Value for BloodPressure is 69
Mean Value for SkinThickness is 20
Mean Value for Insulin is 80
Mean Value for BMI is 32


#Replacing 0 values with mean values in a new dataframe
or we can also neglect rows containing 0 values but the data will insufficient for prediction


```
df.withColumn(colName, col)
Returns a new :class:DataFrame by adding a column or replacing the existing column that has the same name.

The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error.

:param colName: string, name of the new column. :param col: a :class:Column expression for the new column.

df.withColumn('age2', df.age + 2).collect() [Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]
```



In [42]:
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print('Mean Value for {} is {}'.format(i,int(data)))
  df = df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))


Mean Value for Glucose is 121
Mean Value for BloodPressure is 69
Mean Value for SkinThickness is 20
Mean Value for Insulin is 80
Mean Value for BMI is 32


In [43]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


#Correlation with each column and Outcome

```
df.stat.corr(col1, col2, method=None)

Calculates the correlation of two columns of a DataFrame as a double value. 
Currently only supports the Pearson Correlation Coefficient. :func:DataFrame.
corr and :func:DataFrameStatFunctions.corr are aliases of each other.

:param col1: The name of the first column :param col2: The name of the second 
column :param method: The correlation method. Currently only supports "pearson"
```



In [45]:
for col in df.columns:
  print('correlation for outcome to {} is {}'.format(col,df.stat.corr('Outcome',col)))

correlation for outcome to Pregnancies is 0.22443699263363961
correlation for outcome to Glucose is 0.48796646527321064
correlation for outcome to BloodPressure is 0.17171333286446713
correlation for outcome to SkinThickness is 0.1659010662889893
correlation for outcome to Insulin is 0.1711763270226193
correlation for outcome to BMI is 0.2827927569760082
correlation for outcome to DiabetesPedigreeFunction is 0.1554590791569403
correlation for outcome to Age is 0.23650924717620253
correlation for outcome to Outcome is 1.0


#Create a Vector column based on all the columns with column name 'features'

it is used by MlLib for prediction
```
VectorAssembler(inputCols=None, outputCol=None, handleInvalid='error')
A feature transformer that merges multiple columns into a vector column.

df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"]) vecAssembler = 
VectorAssembler(inputCols=["a", "b", "c"], outputCol="features") vecAssembler.
transform(df).head().features DenseVector([1.0, 0.0, 3.0]) vecAssembler.
setParams(outputCol="freqs").transform(df).head().freqs DenseVector([1.0, 0.0, 
3.0]) params = {vecAssembler.inputCols: ["b", "a"], vecAssembler.outputCol: 
"vector"} vecAssembler.transform(df, params).head().vector DenseVector([0.0, 1.
0]) vectorAssemblerPath = temp_path + "/vector-assembler" vecAssembler.save
(vectorAssemblerPath) loadedAssembler =VectorAssembler.load
(vectorAssemblerPath) loadedAssembler.transform(df).head().freqs == 
vecAssembler.transform(df).head().freqs True dfWithNullsAndNaNs = spark.
createDataFrame( ... [(1.0, 2.0, None), (3.0, float("nan"), 4.0), (5.0, 6.0, 7.
0)], ["a", "b", "c"]) vecAssembler2 = VectorAssembler(inputCols=["a", "b", "c"]
, outputCol="features", ... handleInvalid="keep") vecAssembler2.transform
(dfWithNullsAndNaNs).show() +---+---+----+-------------+ | a| b| c| features| 
+---+---+----+-------------+ |1.0|2.0|null|[1.0,2.0,NaN]| |3.0|NaN| 4.0|[3.0,
NaN,4.0]| |5.0|6.0| 7.0|[5.0,6.0,7.0]| +---+---+----+-------------+ ... 
vecAssembler2.setParams(handleInvalid="skip").transform(dfWithNullsAndNaNs).show
() +---+---+---+-------------+ | a| b| c| features| +---+---+---+-------------+ 
|5.0|6.0|7.0|[5.0,6.0,7.0]| +---+---+---+-------------+ ...
```



```
assembler.transform(dataset, params=None)
Transforms the input dataset with optional parameters.

:param dataset: input dataset, which is an instance of :py:class:pyspark.sql.
DataFrame :param params: an optional param map that overrides embedded params. 
:returns: transformed dataset
```



In [46]:
from pyspark.ml.feature import VectorAssembler

In [74]:
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'],outputCol='features')
output_data = assembler.transform(df)

In [75]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [76]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           



```
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')
output_data.select()
output_data.select(*cols)
Projects a set of expressions and returns a new :class:DataFrame.

:param cols: list of column names (string) or expressions (:class:Column).

    If one of the column names is '*', that column is expanded to include all columns  
    in the current DataFrame.  
df.select('*').collect() [Row(age=2, name='Alice'), Row(age=5, name='Bob')] df.
select('name', 'age').collect() [Row(name='Alice', age=2), Row(name='Bob', 
age=5)] df.select(df.name, (df.age + 10).alias('age')).collect() [Row
(name='Alice', age=12), Row(name='Bob', age=15)]
```



In [77]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')

In [78]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



##Divide the data to 70% train data and 30% test data
Train the model

```
final_data.randomSplit(weights, seed=None)
Randomly splits this :class:DataFrame with the provided weights.

:param weights: list of doubles as weights with which to split the DataFrame. Weights will

    be normalized if they don't sum up to 1.0.  
:param seed: The seed for sampling.

splits = df4.randomSplit([1.0, 2.0], 24) splits[0].count()

1  
splits[1].count()

3  

```


```
LogisticRegression(featuresCol='features', labelCol='label', 
predictionCol='prediction', maxIter=100, regParam=0.0, elasticNetParam=0.0, 
tol=1e-06, fitIntercept=True, threshold=0.5, thresholds=None, 
probabilityCol='probability', rawPredictionCol='rawPrediction', 
standardization=True, weightCol=None, aggregationDepth=2, family='auto', 
lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, 
lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None)
Logistic regression. This class supports multinomial logistic (softmax) and 
binomial logistic regression.

from pyspark.sql import Row from pyspark.ml.linalg import Vectors bdf = sc.
parallelize([ ... Row(label=1.0, weight=1.0, features=Vectors.dense(0.0, 5.0)), 
... Row(label=0.0, weight=2.0, features=Vectors.dense(1.0, 2.0)), ... Row
(label=1.0, weight=3.0, features=Vectors.dense(2.0, 1.0)), ... Row(label=0.0, 
weight=4.0, features=Vectors.dense(3.0, 3.0))]).toDF() blor = LogisticRegression
(regParam=0.01, weightCol="weight") blorModel = blor.fit(bdf) blorModel.
coefficients DenseVector([-1.080..., -0.646...]) blorModel.intercept

3.112...  
data_path = "data/mllib/sample_multiclass_classification_data.txt" mdf = spark.
read.format("libsvm").load(data_path) mlor = LogisticRegression(regParam=0.1, 
elasticNetParam=1.0, family="multinomial") mlorModel = mlor.fit(mdf) mlorModel.
coefficientMatrix SparseMatrix(3, 4, [0, 1, 2, 3], [3, 2, 1], [1.87..., -2.75...
, -0.50...], 1) mlorModel.interceptVector DenseVector([0.04..., -0.42..., 0.37..
.]) test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 1.0))]).toDF() 
result = blorModel.transform(test0).head() result.prediction

1.0  
result.probability DenseVector([0.02..., 0.97...]) result.rawPrediction 
DenseVector([-3.54..., 3.54...]) test1 = sc.parallelize([Row(features=Vectors.
sparse(2, [0], [1.0]))]).toDF() blorModel.transform(test1).head().prediction

1.0  
blor.setParams("vector") Traceback (most recent call last):

    ...  
TypeError: Method setParams forces keyword arguments. lr_path = temp_path + "/
lr" blor.save(lr_path) lr2 = LogisticRegression.load(lr_path) lr2.getRegParam()

0.01  
model_path = temp_path + "/lr_model" blorModel.save(model_path) model2 = 
LogisticRegressionModel.load(model_path) blorModel.coefficients[0] == model2.
coefficients[0] True blorModel.intercept == model2.intercept True model2 
LogisticRegressionModel: uid = ..., numClasses = 2, numFeatures = 2


```




In [79]:
train, test = final_data.randomSplit([0.7, 0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [80]:
summary = model.summary

#Actual Outcome vs Prediction

In [81]:
summary.predictions.describe().show()

+-------+-------------------+------------------+
|summary|            Outcome|        prediction|
+-------+-------------------+------------------+
|  count|               1413|              1413|
|   mean| 0.3503184713375796|0.2724699221514508|
| stddev|0.47723854260684256|0.4453879805717055|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



#BinaryClassificationEvaluator is used to evaluate the data

In [82]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

##rawPrediction is generated by spark from features
##probability is generated from rawPrediction using LogisticRegression
Thus outcome and predictions value are almost same

In [85]:
predictions.predictions.show(50)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[4.06797466346299...|[0.98317588345203...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.03506519141300...|[0.98262278071083...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.50145449592718...|[0.97072912573549...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.62581525334092...|[0.93250463878771...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.62854866325751...|[0.93267647563549...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.28867375794008...|[0.90793465035678...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.28867375794008...|[0.90793465035678...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.36400615927768...|[0.91404109188373...|       0.0|
|[0.0,93.0,60.0,25...|      0|[2.69490231838718...|[0.93672517043538...|    

#To find the amount of correctness



```
BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', 
labelCol='label', metricName='areaUnderROC')
Evaluator for binary classification, which expects two input columns: 
rawPrediction and label. The rawPrediction column can be of type double (binary 
0/1 prediction, or probability of label

1) or of type vector (length-2 vector of raw predictions, scores, or label 
probabilities).  
from pyspark.ml.linalg import Vectors scoreAndLabels = map(lambda x: (Vectors.
dense([1.0 - x[0], x[0]]), x[1]), ... [(0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.
6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)]) dataset = spark.createDataFrame
(scoreAndLabels, ["raw", "label"]) ... evaluator = BinaryClassificationEvaluator
(rawPredictionCol="raw") evaluator.evaluate(dataset)

0.70...  
evaluator.evaluate(dataset, {evaluator.metricName: "areaUnderPR"})

0.83...  
bce_path = temp_path + "/bce" evaluator.save(bce_path) evaluator2 = 
BinaryClassificationEvaluator.load(bce_path) str(evaluator2.getRawPredictionCol
()) 'raw'


```



In [88]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.845643561723964

Save your model

In [89]:
model.save("model")
#model is saved in local
#check the Files section

How to use the model again

In [None]:
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load("model")