In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from imblearn.over_sampling import SMOTE
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("HealthCarePrediction") \
    .getOrCreate()

In [2]:
from pyspark.ml.classification import RandomForestClassificationModel,GBTClassificationModel,DecisionTreeClassificationModel
from pyspark.ml.feature import OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
# rfModel = RandomForestClassificationModel.load('model/random_forest')
# gbtModel = GBTClassificationModel.load('model/gbt')
# dtModel = DecisionTreeClassificationModel.load('model/decision_tree')

In [3]:
def select_features_to_scale(df, lower_skew=-2, upper_skew=2, dtypes='double'):
    
    selected_features = []

    feature_list = list(df.toPandas().select_dtypes(include=[dtypes]).columns)

    for feature in feature_list:

        if df.toPandas()[feature].kurtosis() < -2 or df.toPandas()[feature].kurtosis() > 2:
            
            selected_features.append(feature)

    return selected_features

In [4]:
rawDF = spark.read.csv('healthcare-dataset-stroke-data.csv', header=True, inferSchema=True)
rawDF = rawDF.dropna()
rawDF = rawDF.filter(rawDF['bmi'] != "N/A")
rawDF = rawDF.filter(rawDF['gender'] != "Other")
rawDF = rawDF.withColumn("bmi",rawDF.bmi.cast('double'))
cat_features = ['gender', 'ever_married', 'work_type', 'Residence_type', 'smoking_status']
stringIndexedDF = rawDF
for features in cat_features:
    # Index Categorical Features
    string_indexer = StringIndexer(inputCol=features, outputCol=features + "_index")
    stringIndexedDF = string_indexer.fit(stringIndexedDF).transform(stringIndexedDF)
for features in cat_features:     
    stringIndexedDF = stringIndexedDF.withColumn(features+"_index",stringIndexedDF[features+"_index"].cast('int'))

stringIndexedDF = stringIndexedDF.drop(*cat_features)

X = stringIndexedDF.drop('stroke')
Y = stringIndexedDF.select('stroke')
stk = SMOTE(random_state=42)
X_res,y_res = stk.fit_resample(X.toPandas(),Y.toPandas())
joinDF = pd.concat([X_res, y_res], axis=1, join="inner")
stringIndexedDF = spark.createDataFrame(joinDF)

stages = []
num_features = ['age','avg_glucose_level', 'bmi', 'hypertension', 'heart_disease']
index_features = ['gender_index', 'ever_married_index', 'work_type_index', 'Residence_type_index', 'smoking_status_index']
for features in index_features:
    encoder = OneHotEncoder(inputCols=[features],
                                    outputCols=[features + "_class_vec"])
    stages += [encoder]

unscaled_features = select_features_to_scale(df=stringIndexedDF, lower_skew=-2, upper_skew=2, dtypes='double')
unscaled_assembler = VectorAssembler(inputCols=unscaled_features, outputCol="unscaled_features")
scaler = StandardScaler(inputCol="unscaled_features", outputCol="scaled_features")
stages += [unscaled_assembler, scaler]
num_unscaled_diff_list = list(set(num_features) - set(unscaled_features))
assembler_inputs = [feature + "_class_vec" for feature in index_features] + num_unscaled_diff_list
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs") 
stages += [assembler]
assembler_final = VectorAssembler(inputCols=["scaled_features","assembled_inputs"], outputCol="features")
stages += [assembler_final]

In [5]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(stringIndexedDF)

In [6]:

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="stroke", featuresCol="features", maxIter=10)
df_transform_fin = pipeline_model.transform(stringIndexedDF)
train_data, test_data = df_transform_fin.randomSplit([.7, .3])
gbtModel = gbt.fit(train_data)
gbtPredictions = gbtModel.transform(test_data)
accuracy = evaluator.evaluate(gbtPredictions)
# gbtModel.save('model/gbt')
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.154822


In [40]:
# start your server at this point
from pyspark.sql.functions import regexp_extract
from functools import partial

inputStream = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load() \

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)\s*,\s*(\d+\.?\d*)"
)

topic = inputStream.select(
    fields(idx=1).alias("id"),
    fields(idx=2).cast('long').alias("gender_index"), 
    fields(idx=3).cast('double').alias("age"), 
    fields(idx=4).cast('long').alias("hypertension"),
    fields(idx=5).cast('long').alias("heart_disease"),
    fields(idx=6).cast('long').alias("ever_married_index"),
    fields(idx=7).cast('long').alias("work_type_index"),
    fields(idx=8).cast('long').alias("Residence_type_index"),
    fields(idx=9).cast('double').alias("avg_glucose_level"),
    fields(idx=10).cast('double').alias("bmi"),
    fields(idx=11).cast('long').alias("smoking_status_index")
)

query = topic \
    .writeStream \
    .queryName("eleven")\
    .outputMode("append") \
    .format("memory") \
    .start()

In [41]:
import socket
s = socket.socket()
s.connect(("localhost",9999))

In [42]:
import time
while 1:
    data = s.recv(1024)
    row = data.decode().split(',')
    sql = spark.sql(f"SELECT * FROM {query.name}")   
    sql_row = sql.filter(sql.id == row[0])
    while (sql_row.count() == 0):
        time.sleep(5)
        sql = spark.sql(f"SELECT * FROM {query.name}") 
        sql_row = sql.filter(sql.id == row[0])
    row_transform = pipeline_model.transform(sql_row)
    predict = gbtModel.transform(row_transform)
    x = predict.select('id','prediction').rdd.collect()
    predictMess = "result:"+";".join([",".join(map(str, item)) for item in x])
    s.send(predictMess.encode())

In [31]:
sql = spark.sql(f"SELECT * FROM {query.name}")   
sql.show()
# sql_row = sql.filter(sql.id == 64778)
# row_transform = pipeline_model.transform(sql_row)
# # row_transform.select('gender_index_class_vec','ever_married_index_class_vec','work_type_index_class_vec','Residence_type_index_class_vec','smoking_status_index_class_vec','unscaled_features','scaled_features','assembled_inputs').filter(row_transform.id == 64778).show(1, False)
# predict = gbtModel.transform(row_transform)
# # predict.select('features','rawPrediction','probability','prediction').show(1, False)
# x = predict.select('id','prediction').rdd.collect()
# predictMess = ";".join([",".join(map(str, item)) for item in x])
# print(predictMess)

+---+------------+----+------------+-------------+------------------+---------------+--------------------+-----------------+----+--------------------+
| id|gender_index| age|hypertension|heart_disease|ever_married_index|work_type_index|Residence_type_index|avg_glucose_level| bmi|smoking_status_index|
+---+------------+----+------------+-------------+------------------+---------------+--------------------+-----------------+----+--------------------+
|   |        null|null|        null|         null|              null|           null|                null|             null|null|                null|
+---+------------+----+------------+-------------+------------------+---------------+--------------------+-----------------+----+--------------------+

