# PART 1 : Install Dependencies & Run Spark Session

In [4]:
#install pyspark
! pip install pyspark

from pyspark.sql.functions import when, col

Exception in thread _colab_inspector_thread:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.10/dist-packages/google/colab/_debugpy.py", line 64, in inspector_thread
    _variable_inspector.run(shell, time)
  File "/usr/local/lib/python3.10/dist-packages/google/colab/_variable_inspector.py", line 27, in run
    globals().clear()
TypeError: 'module' object is not callable




In [None]:
#create a sparksession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark").getOrCreate()

# PART 2: Clone & Explore dataset

In [None]:
#clone the diabetes dataset from the github repository
# if you get error saying "Project-4 directory already exists, move to next cell"
! git clone https://github.com/elluis1001/Project-4/

In [None]:
#check if the dataset exists
! ls /content/Project-4/Luis/Dataset/

In [None]:
#create spark dataframe
df_diabetes_data = spark.read.csv("/content/Project-4/Luis/Dataset/diabetes_prediction_dataset.csv", header=True, inferSchema=True)

In [None]:
#display the dataframe
df_diabetes_data.show()

In [None]:
#show amount of rows
df_diabetes_data.count()

In [None]:
#print the schema
df_diabetes_data.printSchema()

In [None]:
#count the total no. of diabetic and non-diabetic class (values of 1 indicating the presence of diabetes and 0 indicating the absence of diabetes)
print((df_diabetes_data.count(), len(df_diabetes_data.columns)))
df_diabetes_data.groupBy('diabetes').count().show()

In [None]:
#count the total no. of gender types
print((df_diabetes_data.count(), len(df_diabetes_data.columns)))
df_diabetes_data.groupBy('gender').count().show()

In [None]:
#check to see if there are any empty values in the 'gender' column
df_diabetes_data[df_diabetes_data['gender'] == '']

In [None]:
#get the summary statistics
df_diabetes_data.describe().show()

# PART 3: Data Cleaning & Preparation

In [None]:
#check for null values
for col in df_diabetes_data.columns:
  print(col + ":", df_diabetes_data[df_diabetes_data[col].isNull()].count())

In [None]:
#look for the unnecessary values present
def count_zeros():
  columns_list = ["age", "bmi", "HbA1c_level", "blood_glucose_level"]
  for i in columns_list:
    print(i+":",df_diabetes_data[df_diabetes_data[i]==0].count())

In [None]:
count_zeros()

In [None]:
#display the dataframe
df_diabetes_data.show()

In [None]:
#drop the 'other' rows in the gender columns
string_to_remove = "Other"
df_diabetes_data = df_diabetes_data[df_diabetes_data['Gender'] != string_to_remove]

In [None]:
#count the total no. of gender types
print((df_diabetes_data.count(), len(df_diabetes_data.columns)))
df_diabetes_data.groupBy('gender').count().show()

In [None]:
#count the total no. of smoker/non-smoker types
print((df_diabetes_data.count(), len(df_diabetes_data.columns)))
df_diabetes_data.groupBy('smoking_history').count().show()

In [None]:
#drop the 'other' rows in the gender columns
string_to_remove_1= "No Info"
df_diabetes_data = df_diabetes_data[df_diabetes_data['smoking_history'] != string_to_remove_1]

In [None]:
#count the total no. of smoker/non-smoker types
print((df_diabetes_data.count(), len(df_diabetes_data.columns)))
df_diabetes_data.groupBy('smoking_history').count().show()

In [None]:
#count the total no. of gender types
print((df_diabetes_data.count(), len(df_diabetes_data.columns)))
df_diabetes_data.groupBy('gender').count().show()

In [None]:
#assign in the 'gender'column 'Female' = 0, and 'Male' = 1
from pyspark.sql.functions import when, col
df_diabetes_data = df_diabetes_data.withColumn("gender",
    when(col("gender") == "Female", 0).
    when(col("gender") == "Male", 1).
    otherwise(col("gender"))
)
df_diabetes_data.show()

In [None]:
#assign in the 'smoking_history': "never" = 0, "ever" = 1, "not current" = 2, "current" = 3, "former" = 4
df_diabetes_data = df_diabetes_data.withColumn("smoking_history",
    when(col("smoking_history") == "never", 0).
    when(col("smoking_history") == "ever", 1).
    when(col("smoking_history") == "not current", 2).
    when(col("smoking_history") == "current", 3).
    when(col("smoking_history") == "former", 4).
    otherwise(col("smoking_history"))
)
df_diabetes_data.show()

# PART 4: Correlation Analysis & Feature Selection

In [None]:
# gender and smoking_history needs to be converted to float data type for model to work
df_diabetes_data = df_diabetes_data.withColumn("gender", col("gender").cast('float'))
df_diabetes_data = df_diabetes_data.withColumn("smoking_history", col("smoking_history").cast('float'))
df_diabetes_data.show()


In [None]:
#find the correlation among the set of input & output variables
for i in df_diabetes_data.columns:
  print("Correlation to outcome for {} is {}".format(i, df_diabetes_data.stat.corr("diabetes",i)))

In [None]:
#feature selection
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['gender', 'age', 'hypertension', 'heart_disease',
                                         'smoking_history', 'bmi', 'HbA1c_level', 'blood_glucose_level'], outputCol='features')
output_data = assembler.transform(df_diabetes_data)

In [None]:
#print the schema
output_data.printSchema()

In [None]:
#display dataframe
output_data.show()

# PART 5: Split Dataset & Build the Model

In [None]:
#create final data
from pyspark.ml.classification import LogisticRegression

final_data = output_data.select('features','diabetes')

In [None]:
#print schema of final data
final_data.printSchema()

In [None]:
#split the dataset ; build the model
train, test = final_data.randomSplit([0.7, 0.3])
models = LogisticRegression(labelCol= 'diabetes')
model = models.fit(train)

In [None]:
#summary of the model
summary = model.summary
summary.predictions.describe().show()

# PART 6: Evaluate and Save the Model

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.evaluate(test)

In [None]:
predictions.predictions.show(100)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol= 'rawPrediction', labelCol='diabetes')
evaluator.evaluate(model.transform(test))

In [None]:
# save model
model.save("model")

In [None]:
# load saved model back to the environment
from pyspark.ml.classification import LogisticRegressionModel

model = LogisticRegressionModel.load('model')

# PART 7: Prediction on New Data with the saved model


In [None]:
#create a new spark dataframe
test_df = spark.read.csv('/content/Project-4/Luis/Dataset/diabetes_test_dataset.csv', header=True, inferSchema=True)

In [None]:
#print the schema
test_df.printSchema()

In [None]:
#create an additional feature merged column
test_data = assembler.transform(test_df)

In [None]:
#print the schema
test_data.printSchema()

In [None]:
#use model to make predictions
results = model.transform(test_data)
results.printSchema()

In [None]:
#display the predictions
results.select('features','rawPrediction','probability','prediction').show()

# PART 8: Host a Flask App which will take subjects parameters and return probablity and prediction on being diabetic or not based on ML model above

In [1]:
# import dependencies to run Flask app and host it on publicly accessible colab URL
from flask import *
from google.colab import output
from google.colab.output import eval_js

In [2]:
# initialize Flask app
app=Flask(__name__)


In [3]:
# Static Root API for Diabetes Prediction Model
@app.route('/')
def home():
    return 'Root API for Diabetes Prediction Model'

# Dynamic API that will take parameters of subject through Web UI and leverage ML Model above to return probability & prediction for being diabetic
@app.route('/api/v1.0/predict/<gender>/<age>/<hypertension>/<heart_disease>/<smoking_history>/<bmi>/<HbA1c_level>/<blood_glucose_level>')
def predict(gender,age,hypertension,heart_disease,smoking_history,bmi,HbA1c_level,blood_glucose_level):
    # create a tuple from input parameters and convert them to int or float as they are treated as string when passed from Web UI
    data = [(int(gender),int(age),int(hypertension),int(heart_disease),int(smoking_history),float(bmi),float(HbA1c_level),int(blood_glucose_level))]
    columns = ['gender','age','hypertension','heart_disease','smoking_history','bmi','HbA1c_level','blood_glucose_level']

    # Create spark dataframe for the input parameters
    test_df = spark.createDataFrame(data,columns)

    # invoke ML Model and capture model output in results
    test_data = assembler.transform(test_df)
    results = model.transform(test_data)

    # return the results after converting it to JSON
    return results.toJSON().first()

In [None]:
# Flask app when run gives local IP to access API. Since this is running on Cloud (Google Colab) and not on local notebook, local IP (127.0.0.1) will not be accessible.
# Below code will ask colab to give us publicly accessible URL

print(eval_js("google.colab.kernel.proxyPort(5000)"))
output.serve_kernel_port_as_window(5000)
if __name__ == '__main__':
    app.run(host='0.0.0.0',port=5000)