## This notebook sets up a Spark Structured Streaming application for real-time diabetes prediction

This application loads the saved model, subscribes to the Kafka stream, and writes the predictions to a new topic.

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.ml import PipelineModel
from pyspark.sql import functions as f

## Initialize Spark Session with Kafka support

In [25]:
spark = SparkSession.builder \
         .appName("StructuredStreamingSparkOnlinePrediction") \
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
         .getOrCreate()

## Define the schema for the incoming data (same structure as the offline.csv)

In [26]:
schema = StructType(
    [
        StructField(name="HighBP", dataType=DoubleType()),
        StructField(name="HighChol", dataType=DoubleType()),
        StructField(name="CholCheck", dataType=DoubleType()),
        StructField(name="BMI", dataType=DoubleType()),
        StructField(name="Smoker", dataType=DoubleType()),
        StructField(name="Stroke", dataType=DoubleType()),
        StructField(name="HeartDiseaseorAttack", dataType=DoubleType()),
        StructField(name="PhysActivity", dataType=DoubleType()),
        StructField(name="Fruits", dataType=DoubleType()),
        StructField(name="Veggies", dataType=DoubleType()),
        StructField(name="HvyAlcoholConsump", dataType=DoubleType()),
        StructField(name="AnyHealthcare", dataType=DoubleType()),
        StructField(name="NoDocbcCost", dataType=DoubleType()),
        StructField(name="GenHlth", dataType=DoubleType()),
        StructField(name="MentHlth", dataType=DoubleType()),
        StructField(name="PhysHlth", dataType=DoubleType()),
        StructField(name="DiffWalk", dataType=DoubleType()),
        StructField(name="Sex", dataType=DoubleType()),
        StructField(name="Age", dataType=DoubleType()),
        StructField(name="Education", dataType=DoubleType()),
        StructField(name="Income", dataType=DoubleType())
    ]
)

## Reading from Kafka topic 'health_data'

In [27]:
df = spark.readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka:9092") \
      .option("subscribe", "health_data") \
      .load()

In [28]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## Loading the saved model pipeline

In [29]:
model = PipelineModel.load("/home/jovyan/work/saved_models/best_diabetes_model")

## Parsing of the JSON message

 transforms the raw Kafka stream into a structured DataFrame with proper columns.

In [30]:
parsed_df = df.select(
    f.from_json(f.col("value").cast(dataType="string"), schema=schema).alias("data")
).select("data.*")

In [31]:
parsed_df.printSchema()

root
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- HeartDiseaseorAttack: double (nullable = true)
 |-- PhysActivity: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Veggies: double (nullable = true)
 |-- HvyAlcoholConsump: double (nullable = true)
 |-- AnyHealthcare: double (nullable = true)
 |-- NoDocbcCost: double (nullable = true)
 |-- GenHlth: double (nullable = true)
 |-- MentHlth: double (nullable = true)
 |-- PhysHlth: double (nullable = true)
 |-- DiffWalk: double (nullable = true)
 |-- Sex: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



## Making predictions using the loaded model

In [32]:
predicted = model.transform(parsed_df)

## Preparing the output DataFrame to write back to Kafka

In [33]:
output_df = predicted.selectExpr("to_json(struct(*)) AS value")

## Starting the streaming query to write predictions to a new Kafka topic 'health_data_predicted'

In [37]:
query = output_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "health_data_predicted") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start()

In [36]:
query.awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = f01979a8-ab5b-4fd8-85e1-1946e6fb1840, runId = 5796418d-3843-40f9-9dab-aae30f23e8f5] terminated with exception: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.