In [200]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import rcParams
import squarify

from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.metrics import roc_curve, auc
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Start Spark session
from kafka import KafkaProducer



# Connect to Spark

In [201]:
scala_version = '2.12'  # your scala version
spark_version = '3.0.1' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:2.8.0' #your kafka version
]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages", ",".join(packages)).getOrCreate()


In [202]:
packages

['org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1',
 'org.apache.kafka:kafka-clients:2.8.0']

In [203]:
sc = spark.sparkContext.getOrCreate()

In [204]:
topic_name = 'RandomNumber'
topic_send = 'SendChart'
topic_result = 'SendResult'
kafka_server = 'localhost:29092'

In [205]:
df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load() 


In [206]:
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.selectExpr("from_json(json, 'gender STRING, age STRING, hypertension STRING, heart_disease STRING, ever_married STRING, work_type STRING, Residence_type STRING, avg_glucose_level STRING, bmi STRING, smoking_status STRING, stroke STRING') as data")

In [207]:
df = df.select(
    col("data.gender"),
    col("data.age").cast(DoubleType()).alias("age"),
    col("data.hypertension").cast(IntegerType()).alias("hypertension"),
    col("data.heart_disease").cast(IntegerType()).alias("heart_disease"),
    col("data.ever_married"),
    col("data.work_type"),
    col("data.Residence_type"),
    col("data.avg_glucose_level").cast(DoubleType()).alias("avg_glucose_level"),
    col("data.bmi").cast(DoubleType()).alias("bmi"),
    col("data.smoking_status"),
    col("data.stroke").cast(IntegerType()).alias("stroke")
)


# Examine data

In [208]:
# path = 'brain_stroke.csv'
# df = spark.read.csv(path, header='True', inferSchema='True')
# df.show(10)

In [209]:
df.limit(10).toPandas()

In [None]:
df.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
|gender|age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|bmi|smoking_status|stroke|
+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
|     0|  0|           0|            0|           0|        0|             0|                0|  0|             0|     0|
+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+



## Convert dataframe to Pandas to enable Seaborn and Matplotlib

In [None]:
df.show(20)

+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
|Female|79.0|           1|            0|         Yes|Self-employed|         Rural|           174.12|24.0|   never smoked|     1|
|  Male|81.0|           0|            0|         Yes|      Private|         Urban|           186.

In [None]:
df_pd = df.toPandas()
from pyspark.sql.functions import struct, to_json

# df.write.format("kafka")\
#   .option("kafka.bootstrap.servers", kafka_server)\
#   .option("topic", topic_send)\
#   .save()

df.select(to_json(struct("*")).alias("value")) .selectExpr("CAST(value AS STRING)").write .format("kafka").option("kafka.bootstrap.servers", kafka_server).option("topic", topic_send).save()
# df_pd.loc[(df_pd.stroke == 1), 'stroke']='Stroke'
# df_pd.loc[(df_pd.stroke == 0), 'stroke']='No stroke'

# df_pd.loc[(df_pd.heart_disease == 1), 'heart_disease']='Heart disease'
# df_pd.loc[(df_pd.heart_disease == 0), 'heart_disease']='No heart disease'

# df_pd.loc[(df_pd.hypertension == 1), 'hypertension']='Hypertension'
# df_pd.loc[(df_pd.hypertension == 0), 'hypertension']='No hypertension'

23/09/29 23:12:55 WARN NetworkClient: [Producer clientId=producer-4] Error while fetching metadata with correlation id 3 : {SendChart=LEADER_NOT_AVAILABLE}


# Convert string data to numerical

In [None]:
categorical = ('gender', 'ever_married', 'work_type', 'residence_type', 'smoking_status')

In [None]:
indexer = StringIndexer(inputCol='gender', outputCol='gender_vec')
df_indexed = indexer.fit(df).transform(df)

indexer2 = StringIndexer(inputCol='ever_married', outputCol='married_vec')
df_indexed2 = indexer2.fit(df_indexed).transform(df_indexed)

indexer3 = StringIndexer(inputCol='work_type', outputCol='work_vec')
df_indexed3 = indexer3.fit(df_indexed2).transform(df_indexed2)

indexer4 = StringIndexer(inputCol='Residence_type', outputCol='residence_vec')
df_indexed4 = indexer4.fit(df_indexed3).transform(df_indexed3)

indexer5 = StringIndexer(inputCol='smoking_status', outputCol='smoking_vec')
df_indexed5 = indexer5.fit(df_indexed4).transform(df_indexed4)

df = df_indexed5.drop(*categorical)
df.show(3)

+----+------------+-------------+-----------------+----+------+----------+-----------+--------+-------------+-----------+
| age|hypertension|heart_disease|avg_glucose_level| bmi|stroke|gender_vec|married_vec|work_vec|residence_vec|smoking_vec|
+----+------------+-------------+-----------------+----+------+----------+-----------+--------+-------------+-----------+
|67.0|           0|            1|           228.69|36.6|     1|       1.0|        0.0|     0.0|          0.0|        2.0|
|80.0|           0|            1|           105.92|32.5|     1|       1.0|        0.0|     0.0|          1.0|        0.0|
|49.0|           0|            0|           171.23|34.4|     1|       0.0|        0.0|     0.0|          0.0|        3.0|
+----+------------+-------------+-----------------+----+------+----------+-----------+--------+-------------+-----------+
only showing top 3 rows



In [None]:
feature = VectorAssembler(inputCols = df.drop('stroke').columns, outputCol='features')
feature_vector = feature.transform(df)
feature_vector.show(3)

+----+------------+-------------+-----------------+----+------+----------+-----------+--------+-------------+-----------+--------------------+
| age|hypertension|heart_disease|avg_glucose_level| bmi|stroke|gender_vec|married_vec|work_vec|residence_vec|smoking_vec|            features|
+----+------------+-------------+-----------------+----+------+----------+-----------+--------+-------------+-----------+--------------------+
|67.0|           0|            1|           228.69|36.6|     1|       1.0|        0.0|     0.0|          0.0|        2.0|[67.0,0.0,1.0,228...|
|80.0|           0|            1|           105.92|32.5|     1|       1.0|        0.0|     0.0|          1.0|        0.0|[80.0,0.0,1.0,105...|
|49.0|           0|            0|           171.23|34.4|     1|       0.0|        0.0|     0.0|          0.0|        3.0|(10,[0,3,4,9],[49...|
+----+------------+-------------+-----------------+----+------+----------+-----------+--------+-------------+-----------+--------------------+

In [None]:
ml_df = feature_vector.select(['features', 'stroke'])
train, test = ml_df.randomSplit([0.8, 0.2])

# Logistic Regression

In [None]:
lr = LogisticRegression(labelCol='stroke')

paramGrid = ParamGridBuilder().addGrid(lr. regParam, (0.01, 0.1))\
                              .addGrid(lr.maxIter, (5, 10))\
                              .addGrid(lr.tol, (1e-4, 1e-5))\
                              .addGrid(lr.elasticNetParam, (0.25, 0.75))\
                              .build()

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(labelCol='stroke'),
                           trainRatio=0.8)

lr_model = tvs.fit(train)
lr_model_pred = lr_model.transform(test)

                                                                                

In [None]:
results = lr_model_pred.select(['probability', 'stroke'])

results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)

metrics = metric(scoreAndLabels)
lr_acc = round(MulticlassClassificationEvaluator(labelCol='stroke', metricName='accuracy').evaluate(lr_model_pred), 4)
lr_prec = round(MulticlassClassificationEvaluator(labelCol='stroke', metricName='weightedPrecision').evaluate(lr_model_pred), 4)
lr_roc = round(metrics.areaUnderROC, 4)

lr_dict = {'Models':'Logistic Regression','Accuracy': lr_acc, 'Precision': lr_prec, 'ROC_Score': lr_roc}



# Random Forest

In [None]:
rf = RandomForestClassifier(labelCol='stroke')

paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [5, 10, 20])\
                              .addGrid(rf.maxBins, [20, 32, 50])\
                              .addGrid(rf.numTrees, [20, 40, 60])\
                              .addGrid(rf.impurity, ['gini', 'entropy'])\
                              .addGrid(rf.minInstancesPerNode, [1, 5, 10])\
                              .build()
    
tvs = TrainValidationSplit(estimator=rf,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(labelCol='stroke'),
                           trainRatio=0.8)

rf_model = tvs.fit(train)
rf_model_pred = rf_model.transform(test)

23/09/29 23:14:30 WARN DAGScheduler: Broadcasting large task binary with size 1210.0 KiB
23/09/29 23:14:31 WARN DAGScheduler: Broadcasting large task binary with size 1499.6 KiB
23/09/29 23:14:31 WARN DAGScheduler: Broadcasting large task binary with size 1015.2 KiB
23/09/29 23:14:32 WARN DAGScheduler: Broadcasting large task binary with size 1016.9 KiB
23/09/29 23:14:32 WARN DAGScheduler: Broadcasting large task binary with size 1173.1 KiB
23/09/29 23:14:34 WARN DAGScheduler: Broadcasting large task binary with size 1026.2 KiB
23/09/29 23:14:34 WARN DAGScheduler: Broadcasting large task binary with size 1286.0 KiB
23/09/29 23:14:36 WARN DAGScheduler: Broadcasting large task binary with size 1089.0 KiB
23/09/29 23:14:38 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
23/09/29 23:14:38 WARN DAGScheduler: Broadcasting large task binary with size 1733.9 KiB
23/09/29 23:14:38 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/09/29 23:14:39 WARN D

In [None]:
results = rf_model_pred.select(['probability', 'stroke'])

results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)

metrics = metric(scoreAndLabels)
rf_acc = round(MulticlassClassificationEvaluator(labelCol='stroke', metricName='accuracy').evaluate(rf_model_pred), 4)
rf_prec = round(MulticlassClassificationEvaluator(labelCol='stroke', metricName='weightedPrecision').evaluate(rf_model_pred), 4)
rf_roc = round(metrics.areaUnderROC, 4)

rf_dict = {'Models':'Random Forest', 'Accuracy': rf_acc, 'Precision': rf_prec, 'ROC_Score': rf_roc}

In [None]:
dicts = [lr_dict, rf_dict]
results = spark.createDataFrame(dicts)
results.select(to_json(struct("*")).alias("value")) .selectExpr("CAST(value AS STRING)").write .format("kafka").option("kafka.bootstrap.servers", kafka_server).option("topic", topic_result).save()

# results = pd.DataFrame(dicts)
# results['Models'] = ['Logistic Regression', 'Random Forest']
# results.set_index(['Models'])

23/09/29 23:21:57 WARN NetworkClient: [Producer clientId=producer-4] Error while fetching metadata with correlation id 332 : {SendResult=LEADER_NOT_AVAILABLE}
