In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/19 23:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv("./data/diabetes.csv", header=True, inferSchema=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+---------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|         Classe|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+---------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|tested_positive|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|tested_negative|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|tested_positive|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|tested_negative|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|tested_positive|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|tested_negative|
|          3|     78|       

In [5]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Classe: string (nullable = true)



In [39]:
from pyspark.ml.feature import StringIndexer
# conversion
df = df.na.drop()
indexer = StringIndexer(inputCol='Classe', outputCol='Class').fit(df)
new_df = indexer.transform(df)

In [40]:
new_df  = new_df.drop('Classe')
new_df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-----+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Class|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-----+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|  1.0|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|  0.0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|  1.0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|  0.0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|  1.0|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|  0.0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|  1.0|
|         10|    115

In [41]:
new_df=new_df.withColumnRenamed("class","classe")

In [42]:
new_df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|classe|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|   1.0|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|   0.0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|   1.0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|   0.0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|   1.0|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|   0.0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|   1.0|
|         

In [44]:
print((new_df.count(),len(new_df.columns)))
new_df.groupBy('classe').count().show()

(659, 9)
+------+-----+
|classe|count|
+------+-----+
|   0.0|  435|
|   1.0|  224|
+------+-----+



In [48]:
new_df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------------+------------------+-------------------+
|summary|       Pregnancies|           Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|             classe|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------------+------------------+-------------------+
|  count|               659|               659|               659|               659|               659|               659|                     659|               659|                659|
|   mean| 3.786039453717754|120.17450682852807|  68.5948406676783|20.400606980273142| 79.88770864946889|31.826707132018186|     0.47700758725341375| 32.99696509863429|0.33990895295902884|
| stddev|3.3564417861012616| 32.20288390950785|19.5676219159

## Data Cleaning & Preparation

In [53]:
#check for null values
for col in new_df.columns:
  print(col+":",new_df[new_df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
classe: 0


In [54]:
#look for the unnecessary values present
def count_zeros():
  columns_list = ["Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI"]
  for i in columns_list:
    print(i+":",new_df[new_df[i]==0].count())

In [55]:
count_zeros()

Glucose: 5
BloodPressure: 32
SkinThickness: 195
Insulin: 317
BMI: 9


In [56]:
#calculate and replace the unnecessary values by the mean value
from pyspark.sql.functions import *

for i in new_df.columns[1:6]:
  data = new_df.agg({i:"mean"}).first()[0]
  print("mean value for {} is {}".format(i,int(data)))
  new_df = new_df.withColumn(i, when(new_df[i]==0, int(data)).otherwise(new_df[i]))

mean value for Glucose is 120
mean value for BloodPressure is 68
mean value for SkinThickness is 20
mean value for Insulin is 79
mean value for BMI is 31


In [57]:
#display the dataframe 
new_df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|classe|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|   1.0|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|   0.0|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|   1.0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|   0.0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|   1.0|
|          5|    116|           74|           20|     79|25.6|                   0.201| 30|   0.0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|   1.0|
|         

## Correlation Analysis & Feature Selection

In [58]:
#find the correlation among the set of input & output variables
for i in new_df.columns:
    print("Correlation to classe for {} is {}".format(i, new_df.stat.corr("classe",i)))

Correlation to classe for Pregnancies is 0.22439606485792232
Correlation to classe for Glucose is 0.4867228097216097
Correlation to classe for BloodPressure is 0.14954404356951914
Correlation to classe for SkinThickness is 0.16904645113988484
Correlation to classe for Insulin is 0.19877004207874557
Correlation to classe for BMI is 0.3229720240561377
Correlation to classe for DiabetesPedigreeFunction is 0.1877744951239446
Correlation to classe for Age is 0.22846348116796988
Correlation to classe for classe is 1.0


In [59]:
#feature selection
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'], outputCol='features')
output_data = assembler.transform(new_df)

In [60]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- classe: double (nullable = false)
 |-- features: vector (nullable = true)



In [61]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|classe|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+------+--------------------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|   1.0|[6.0,148.0,72.0,3...|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|   0.0|[1.0,85.0,66.0,29...|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|   1.0|[8.0,183.0,64.0,2...|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|   0.0|[1.0,89.0,66.0,23...|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|   1.0|[0.0,137.0,40.0,3...|
|          5|    116|           74|     

## Split Dataset & Build the Model

In [62]:
#create final data
from pyspark.ml.classification import LogisticRegression

final_data = output_data.select('features','classe')

In [63]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- classe: double (nullable = false)



In [64]:
train, test = final_data.randomSplit([0.8, 0.2])
models = LogisticRegression(labelCol= 'classe')
model = models.fit(train)

                                                                                

In [65]:
#summary of the model
summary = model.summary
summary.predictions.describe().show()

                                                                                

+-------+-------------------+-------------------+
|summary|             classe|         prediction|
+-------+-------------------+-------------------+
|  count|                517|                517|
|   mean|0.33849129593810445|0.26499032882011603|
| stddev| 0.4736548133309049|0.44175549410613285|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



## Evaluate and Save the Model

In [66]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.evaluate(test)

In [67]:
predictions.predictions.show(20)

+--------------------+------+--------------------+--------------------+----------+
|            features|classe|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|   0.0|[3.84377494040818...|[0.97903627107118...|       0.0|
|[0.0,86.0,68.0,32...|   0.0|[2.50375768161128...|[0.92440482793639...|       0.0|
|[0.0,93.0,60.0,25...|   0.0|[2.78627145819572...|[0.94192943528251...|       0.0|
|[0.0,93.0,100.0,3...|   0.0|[0.95133397647584...|[0.72138337232106...|       0.0|
|[0.0,105.0,90.0,2...|   0.0|[2.27412203020615...|[0.90671103936416...|       0.0|
|[0.0,119.0,64.0,1...|   0.0|[1.02007470172075...|[0.73498715020073...|       0.0|
|[0.0,119.0,68.0,2...|   1.0|[1.72018708407139...|[0.84815293238342...|       0.0|
|[0.0,123.0,88.0,3...|   0.0|[1.32487372499690...|[0.78999142454247...|       0.0|
|[0.0,126.0,84.0,2...|   0.0|[1.63025638870762...|[0.83620475843186...|       0.0|
|[0.

In [68]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol= 'rawPrediction', labelCol='classe')
evaluator.evaluate(model.transform(test))

0.837173579109063

In [69]:
model.save("model")

                                                                                

In [None]:
from fastapi import FastAPI
from pyspark.ml.classification import LogisticRegressionModel
import nest_asyncio
import asyncio
import uvicorn
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.types import StructField
from pyspark.sql import Row
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = [
    "http://localhost.tiangolo.com",
    "https://localhost.tiangolo.com",
    "http://localhost",
    "http://localhost:4200",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/predict")
def predict(data):
    model = LogisticRegressionModel.load('model')

    x=data.split(",")
    test_df = spark.createDataFrame(
     [   (int(x[0]), int(x[1]),int(x[2]),int(x[3]),int(x[4]),float(x[5]),float(x[6]), int(x[7]))
    ],
    ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'])

    test_data = assembler.transform(test_df)
    results = model.transform(test_data)
    result=results.collect()[0][-1]
    if result==1:
        msg='Tested Positive'
    else:
        msg='Tested Negative'
    return msg

if __name__ == "__main__":
    nest_asyncio.apply()
    uvicorn.run(app)

INFO:     Started server process [31397]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:36158 - "GET /predict?data=1,4,2,4,2,5,6,24 HTTP/1.1" 200 OK
INFO:     127.0.0.1:34988 - "GET /predict?data=3,4,2,1,2,1,3,24 HTTP/1.1" 200 OK
INFO:     127.0.0.1:37140 - "GET /predict?data=1,3,23,1,2,1,3,30 HTTP/1.1" 200 OK
INFO:     127.0.0.1:37140 - "GET /predict?data=1,3,23,1,2,1,3,30 HTTP/1.1" 200 OK
INFO:     127.0.0.1:36962 - "GET /predict?data=1,30,23,1,2,1,3,20 HTTP/1.1" 200 OK
INFO:     127.0.0.1:36968 - "GET /predict?data=1,30,23,1,2,1,3,20 HTTP/1.1" 200 OK
INFO:     127.0.0.1:36964 - "GET /predict?data=1,30,23,1,2,1,3,20 HTTP/1.1" 200 OK
INFO:     127.0.0.1:41448 - "GET /predict?data=1,1,23,1,2,1,3,20 HTTP/1.1" 200 OK
INFO:     127.0.0.1:41448 - "GET /predict?data=1,1,1,1,2,1,3,20 HTTP/1.1" 200 OK
INFO:     127.0.0.1:49300 - "GET /predict?data=1,1,1,1,2,1,3,20 HTTP/1.1" 200 OK
INFO:     127.0.0.1:46112 - "GET /predict?data=1,1,1,1,2,1,1,20 HTTP/1.1" 200 OK
INFO:     127.0.0.1:46112 - "GET /predict?data=1,1,1,1,2,1,1,201 HTTP/1.1" 200 OK
INFO:     127.0.0.

In [168]:
from pyspark.sql import Row
data1 = {"Pregnancies": 0,"Glucose":137,"BloodPressure":40,"SkinThickness":35,"Insulin":168,"BMI":43.1,"DiabetesPedigreeFunction":2.288,"Age":33}
        
data1=Row(data1)
# Create data frame
df1 = spark.createDataFrame(data1)
test_data = assembler.transform(df1)
results = model.transform(test_data)
results.select('features','prediction').show()
print(results.collect()[0][-1])

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[0.0,137.0,40.0,3...|       1.0|
+--------------------+----------+

1.0


In [177]:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost:8000/ws");
            ws.onmessage = function(event) {
                var messages = document.getElementById('messages')
                var message = document.createElement('li')
                var content = document.createTextNode(event.data)
                message.appendChild(content)
                messages.appendChild(message)
            };
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>
"""


@app.get("/")
async def get():
    return HTMLResponse(html)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message text was: {data}")


if __name__ == "__main__":
    nest_asyncio.apply()
    uvicorn.run(app)

INFO:     Started server process [31397]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:56370 - "GET / HTTP/1.1" 200 OK




INFO:     127.0.0.1:56372 - "GET /ws HTTP/1.1" 404 Not Found


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [31397]
