In [4]:
# Imports
import os.path

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_json, struct
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.ml import PipelineModel


In [2]:
# Paths
data_path = "/Users/alek/Downloads/H3/MMDS_H3/data"
dataset_path = os.path.join(data_path, "dataset/diabetes_binary_health_indicators_BRFSS2015.csv")
trained_model_path = os.path.join(data_path, "trained_models")

offline_path = os.path.join(trained_model_path, "offline.csv")
online_path = os.path.join(trained_model_path, "online.csv")

In [5]:
spark = SparkSession.builder \
    .appName("OnlineSparkApp") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
    .getOrCreate()

df_kafka = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'health_indicators') \
    .load()

schema = StructType([
    StructField("HighBP", DoubleType(), True),
    StructField("HighChol", DoubleType(), True),
    StructField("CholCheck", DoubleType(), True),
    StructField("BMI", DoubleType(), True),
    StructField("Smoker", DoubleType(), True),
    StructField("Stroke", DoubleType(), True),
    StructField("HeartDiseaseorAttack", DoubleType(), True),
    StructField("PhysActivity", DoubleType(), True),
    StructField("Fruits", DoubleType(), True),
    StructField("Veggies", DoubleType(), True),
    StructField("HvyAlcoholConsump", DoubleType(), True),
    StructField("AnyHealthcare", DoubleType(), True),
    StructField("NoDocbcCost", DoubleType(), True),
    StructField("GenHlth", DoubleType(), True),
    StructField("MentHlth", DoubleType(), True),
    StructField("PhysHlth", DoubleType(), True),
    StructField("DiffWalk", DoubleType(), True),
    StructField("Sex", DoubleType(), True),
    StructField("Age", DoubleType(), True),
    StructField("Education", DoubleType(), True),
    StructField("Income", DoubleType(), True)
])


df_parsed = df_kafka.selectExpr("CAST(value AS STRING) as value") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

df_parsed = df_parsed.na.fill({
    "HighBP": 0,
    "HighChol": 0,
    "CholCheck": 0,
    "BMI": 0.0,
    "Smoker": 0,
    "Stroke": 0,
    "HeartDiseaseorAttack": 0,
    "PhysActivity": 0,
    "Fruits": 0,
    "Veggies": 0,
    "HvyAlcoholConsump": 0,
    "AnyHealthcare": 0,
    "NoDocbcCost": 0,
    "GenHlth": 0,
    "MentHlth": 0,
    "PhysHlth": 0,
    "DiffWalk": 0,
    "Sex": 0,
    "Age": 0,
    "Education": 0,
    "Income": 0
})



bestModel = PipelineModel.load(trained_model_path)

df_predicted = bestModel.transform(df_parsed)

df_output = df_predicted.select(
    to_json(struct("*")).alias("value")
)

# 6) Send enriched data to new topic
query = df_output \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "health_indicators_predicted") \
    .option("checkpointLocation", "/tmp/spark_checkpoint_health_indicators_predicted") \
    .start()

query.awaitTermination()

25/02/13 18:49:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/02/13 18:49:12 WARN StreamingQueryManager: Stopping existing streaming query [id=bf01a924-9a6c-4488-90cd-eee52fe0f3fd, runId=b24f00a3-f06c-4915-a7e1-061f39c18da0], as a new run is being started.
25/02/13 18:49:12 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 1259, writer: org.apache.spark.sql.kafka010.KafkaStreamingWrite@24127779] is aborting.
25/02/13 18:49:12 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 1259, writer: org.apache.spark.sql.kafka010.KafkaStreamingWrite@24127779] aborted.
25/02/13 18:49:12 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException
	at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:267)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
	at scala.collection.Iterator$$anon$10.ha

KeyboardInterrupt: 