In [117]:
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import explode, col, isnan,\
count, when, round, collect_list,json_tuple, from_json
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, StringIndexer, ChiSqSelector
from sklearn.model_selection import train_test_split
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,\
LinearSVC, MultilayerPerceptronClassifier, OneVsRest, NaiveBayes,DecisionTreeClassifier
from pyspark.ml import Pipeline, PipelineModel
import pandas as pd
from pyspark.sql.functions import lit
import matplotlib.pyplot as plt
import seaborn as sns
import json
import pickle
import os
import time
import random
from kafka import KafkaConsumer, KafkaProducer
%matplotlib inline

In [16]:
import findspark
findspark.init()

In [2]:
path_offline_dataset = 'E:\\Downloads\\diabetes_dataset\\offline.csv'
path_variable_importances = 'variable_importance.csv'
best_top_n = 8

In [3]:
spark = SparkSession.builder.master("local[*]") \
        .appName('online_part').getOrCreate()

In [18]:
SCALA_VERSION = '2.12'
SPARK_VERSION = spark.version

In [36]:
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION},org.apache.kafka:kafka-clients:2.8.1'


## Read offline dataset, selected features and initialize preprocessing tools

In [23]:
df = spark.read.csv(path_offline_dataset, inferSchema=True, encoding='utf-8', header=True)
df.cache()


DataFrame[HighBP: int, HighChol: int, CholCheck: int, BMI: int, Smoker: int, Stroke: int, HeartDiseaseorAttack: int, PhysActivity: int, Fruits: int, Veggies: int, HvyAlcoholConsump: int, AnyHealthcare: int, NoDocbcCost: int, GenHlth: int, MentHlth: int, PhysHlth: int, DiffWalk: int, Sex: int, Age: int, Education: int, Income: int, Diabetes_012: int]

In [24]:
df_selected_features = spark.read.csv('variable_importance.csv', inferSchema=True, encoding='utf-8', header=True)
df_selected_features.show()


+---+--------------------+----------+-----------------------+
|_c0|         column_name|importance|correlation_with_target|
+---+--------------------+----------+-----------------------+
|  0|                 BMI|     0.143|                  0.226|
|  1|                 Age|     0.109|                  0.185|
|  2|             GenHlth|     0.094|                  0.304|
|  3|              Income|     0.089|                  0.172|
|  4|            PhysHlth|     0.088|                  0.177|
|  5|              HighBP|     0.064|                  0.271|
|  6|           Education|     0.064|                  0.131|
|  7|            HighChol|     0.035|                  0.209|
|  8|            DiffWalk|     0.027|                  0.227|
|  9|        PhysActivity|     0.027|                  0.124|
| 10|HeartDiseaseorAttack|     0.022|                   0.18|
+---+--------------------+----------+-----------------------+



In [86]:
variables = [row.column_name for row in df_selected_features.select('column_name').take(best_top_n)]
variables

['BMI',
 'Age',
 'GenHlth',
 'Income',
 'PhysHlth',
 'HighBP',
 'Education',
 'HighChol']

In [26]:
df = df.select(variables)
df.show()

+---+---+-------+------+--------+------+---------+--------+------------+
|BMI|Age|GenHlth|Income|PhysHlth|HighBP|Education|HighChol|Diabetes_012|
+---+---+-------+------+--------+------+---------+--------+------------+
| 29|  8|      2|     8|       0|     0|        6|       1|           0|
| 24|  9|      3|     8|       4|     0|        6|       0|           0|
| 42|  9|      3|     7|       3|     1|        6|       1|           0|
| 21| 10|      2|     7|       0|     1|        5|       0|           0|
| 29|  2|      3|     3|       0|     0|        4|       0|           0|
| 28|  4|      2|     8|       7|     0|        5|       0|           0|
| 30|  9|      2|     8|       0|     1|        5|       0|           0|
| 24| 12|      3|     4|       5|     1|        6|       1|           0|
| 27|  5|      2|     8|       0|     1|        4|       0|           0|
| 28|  1|      3|     7|       4|     0|        5|       0|           0|
| 36|  9|      2|     6|      30|     0|        5| 

In [80]:
pipeline = Pipeline(stages = [])

In [28]:
assembler = VectorAssembler()\
    .setInputCols(df.drop(*['Diabetes_012']).columns)\
    .setOutputCol('features') 

In [29]:
indexer = StringIndexer(stringOrderType='alphabetAsc')\
    .setInputCol('Diabetes_012')\
    .setOutputCol('label')

In [30]:
pipeline.setStages([indexer, assembler])


Pipeline_2c112ba0874b

In [31]:
df = pipeline.fit(df).transform(df)

## Read saved models

In [32]:
with open ('dt_param.pkl', 'rb') as f:

    dt_params = pickle.load(f)

print(dt_params)

with open ('lr_param.pkl', 'rb') as f:

    lr_params = pickle.load(f)
    
print(lr_params)

{'maxDepth': 10, 'maxBins': 20, 'impurity': 'gini'}
{'maxIter': 100, 'fitIntercept': True, 'regParam': 0.0}


In [61]:
df.show()

+---+---+-------+------+--------+------+---------+--------+------------+-----+--------------------+
|BMI|Age|GenHlth|Income|PhysHlth|HighBP|Education|HighChol|Diabetes_012|label|            features|
+---+---+-------+------+--------+------+---------+--------+------------+-----+--------------------+
| 29|  8|      2|     8|       0|     0|        6|       1|           0|  0.0|[29.0,8.0,2.0,8.0...|
| 24|  9|      3|     8|       4|     0|        6|       0|           0|  0.0|[24.0,9.0,3.0,8.0...|
| 42|  9|      3|     7|       3|     1|        6|       1|           0|  0.0|[42.0,9.0,3.0,7.0...|
| 21| 10|      2|     7|       0|     1|        5|       0|           0|  0.0|[21.0,10.0,2.0,7....|
| 29|  2|      3|     3|       0|     0|        4|       0|           0|  0.0|[29.0,2.0,3.0,3.0...|
| 28|  4|      2|     8|       7|     0|        5|       0|           0|  0.0|[28.0,4.0,2.0,8.0...|
| 30|  9|      2|     8|       0|     1|        5|       0|           0|  0.0|[30.0,9.0,2.0,8.0...|


In [45]:
dt = DecisionTreeClassifier()\
    .setMaxDepth(dt_params['maxDepth'])\
    .setMaxBins(dt_params['maxBins'])\
    .setImpurity(dt_params['impurity'])

lr = LogisticRegression()\
    .setMaxIter(lr_params['maxIter'])\
    .setFitIntercept(lr_params['fitIntercept'])\
    .setRegParam(lr_params['regParam'])
    
dt = dt.fit(df)
lr = lr.fit(df)

## :(

In [None]:
# stream = spark \
#   .readStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", 'localhost:9092') \
#   .option("subscribe", 'health_data') \
#   .load()

# stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")




In [119]:
consumer = KafkaConsumer(
    'health_data',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
producer = KafkaProducer(bootstrap_servers='localhost:9092', security_protocol='PLAINTEXT')
pipeline.setStages([assembler])
for message in consumer:
    row = Row(**message.value)
    print(row)
    df_row = spark.createDataFrame([row]).select(variables)
    df_row = pipeline.fit(df_row).transform(df_row)
    predicted_class_row = int(lr.transform(df_row).select('prediction').collect()[0]['prediction'])
    row_dict = row.asDict()
    row_dict['Diabetes_012'] = predicted_class_row
    print(row_dict)
    producer.send(
        topic="health_data_predicted",
        value=json.dumps(row_dict).encode('utf-8')
    )
    time.sleep(random.randint(500, 2000)/ 1000.0)
    
    
    



Row(HighBP=0, HighChol=0, CholCheck=1, BMI=24, Smoker=0, Stroke=0, HeartDiseaseorAttack=0, PhysActivity=0, Fruits=0, Veggies=1, HvyAlcoholConsump=0, AnyHealthcare=1, NoDocbcCost=0, GenHlth=2, MentHlth=0, PhysHlth=0, DiffWalk=0, Sex=1, Age=8, Education=4, Income=3)
{'HighBP': 0, 'HighChol': 0, 'CholCheck': 1, 'BMI': 24, 'Smoker': 0, 'Stroke': 0, 'HeartDiseaseorAttack': 0, 'PhysActivity': 0, 'Fruits': 0, 'Veggies': 1, 'HvyAlcoholConsump': 0, 'AnyHealthcare': 1, 'NoDocbcCost': 0, 'GenHlth': 2, 'MentHlth': 0, 'PhysHlth': 0, 'DiffWalk': 0, 'Sex': 1, 'Age': 8, 'Education': 4, 'Income': 3, 'Diabetes_012': 0}
Row(HighBP=0, HighChol=1, CholCheck=1, BMI=24, Smoker=1, Stroke=0, HeartDiseaseorAttack=0, PhysActivity=1, Fruits=1, Veggies=1, HvyAlcoholConsump=0, AnyHealthcare=1, NoDocbcCost=0, GenHlth=3, MentHlth=10, PhysHlth=0, DiffWalk=0, Sex=1, Age=8, Education=6, Income=4)
{'HighBP': 0, 'HighChol': 1, 'CholCheck': 1, 'BMI': 24, 'Smoker': 1, 'Stroke': 0, 'HeartDiseaseorAttack': 0, 'PhysActivity': 

{'HighBP': 0, 'HighChol': 0, 'CholCheck': 1, 'BMI': 24, 'Smoker': 1, 'Stroke': 0, 'HeartDiseaseorAttack': 0, 'PhysActivity': 1, 'Fruits': 1, 'Veggies': 1, 'HvyAlcoholConsump': 0, 'AnyHealthcare': 1, 'NoDocbcCost': 0, 'GenHlth': 5, 'MentHlth': 30, 'PhysHlth': 30, 'DiffWalk': 1, 'Sex': 1, 'Age': 5, 'Education': 4, 'Income': 1, 'Diabetes_012': 0}
Row(HighBP=1, HighChol=0, CholCheck=1, BMI=23, Smoker=0, Stroke=0, HeartDiseaseorAttack=0, PhysActivity=0, Fruits=1, Veggies=1, HvyAlcoholConsump=0, AnyHealthcare=1, NoDocbcCost=0, GenHlth=2, MentHlth=0, PhysHlth=0, DiffWalk=0, Sex=1, Age=13, Education=6, Income=6)
{'HighBP': 1, 'HighChol': 0, 'CholCheck': 1, 'BMI': 23, 'Smoker': 0, 'Stroke': 0, 'HeartDiseaseorAttack': 0, 'PhysActivity': 0, 'Fruits': 1, 'Veggies': 1, 'HvyAlcoholConsump': 0, 'AnyHealthcare': 1, 'NoDocbcCost': 0, 'GenHlth': 2, 'MentHlth': 0, 'PhysHlth': 0, 'DiffWalk': 0, 'Sex': 1, 'Age': 13, 'Education': 6, 'Income': 6, 'Diabetes_012': 0}
Row(HighBP=0, HighChol=0, CholCheck=1, BMI=

Row(HighBP=0, HighChol=0, CholCheck=0, BMI=31, Smoker=0, Stroke=0, HeartDiseaseorAttack=0, PhysActivity=1, Fruits=0, Veggies=0, HvyAlcoholConsump=0, AnyHealthcare=1, NoDocbcCost=0, GenHlth=2, MentHlth=0, PhysHlth=0, DiffWalk=0, Sex=1, Age=5, Education=6, Income=8)
{'HighBP': 0, 'HighChol': 0, 'CholCheck': 0, 'BMI': 31, 'Smoker': 0, 'Stroke': 0, 'HeartDiseaseorAttack': 0, 'PhysActivity': 1, 'Fruits': 0, 'Veggies': 0, 'HvyAlcoholConsump': 0, 'AnyHealthcare': 1, 'NoDocbcCost': 0, 'GenHlth': 2, 'MentHlth': 0, 'PhysHlth': 0, 'DiffWalk': 0, 'Sex': 1, 'Age': 5, 'Education': 6, 'Income': 8, 'Diabetes_012': 0}
Row(HighBP=0, HighChol=0, CholCheck=1, BMI=42, Smoker=0, Stroke=0, HeartDiseaseorAttack=0, PhysActivity=1, Fruits=1, Veggies=0, HvyAlcoholConsump=0, AnyHealthcare=0, NoDocbcCost=1, GenHlth=3, MentHlth=1, PhysHlth=30, DiffWalk=1, Sex=1, Age=5, Education=5, Income=4)
{'HighBP': 0, 'HighChol': 0, 'CholCheck': 1, 'BMI': 42, 'Smoker': 0, 'Stroke': 0, 'HeartDiseaseorAttack': 0, 'PhysActivity': 

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "E:\Anaconda\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "E:\Anaconda\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "E:\Anaconda\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 