# Applying a Spark MLlib Model to a Kafka Data Stream

Step 1: Connect to UPC VPN since the Kafka stream is from a UPC server.

In [1]:
from kafka import KafkaConsumer
from pyspark.ml import PipelineModel
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import split

Structured Streaming Programming Guide:
- https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

RDD Programming Guide: 
- https://spark.apache.org/docs/latest/rdd-programming-guide.html

Spark Streaming Programming Guide:
- https://spark.apache.org/docs/latest/streaming-programming-guide.html

In [2]:
# initializing spark session
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
spark = SparkSession.builder.master("local[*]").appName("StreamPredictor").config('spark.driver.extraClassPath','./drivers/monetdb-jdbc-3.2.jre8.jar').getOrCreate()

22/06/21 20:48:25 WARN Utils: Your hostname, Kathryns-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.37 instead (on interface en0)
22/06/21 20:48:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/kat/opt/anaconda3/envs/bdm/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/kat/.ivy2/cache
The jars for the packages stored in: /Users/kat/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ffbb12da-6c0c-4418-8f7e-3a4e86451fb8;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in local-m2-cache
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in local-m2-cache
	found commons-logging#c

In [3]:
# properties to connect to kafka stream
server = 'venomoth.fib.upc.edu:9092'
topic = 'bdm_p2'
consumer = KafkaConsumer(topic, bootstrap_servers=server)

References: 
- https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/
- https://spark.apache.org/docs/3.3.0/api/python/reference/api/pyspark.ml.PipelineModel.html

In [4]:
# loads previously saved model from the notebook MachineLearining.ipynb
model = PipelineModel.load('Fitted-ML-Pipeline')

                                                                                

Reference:
- https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns

The streaming dataframe needs to use the same variable names as used in the Machine Learning Pipeline Model: "neighborhood_id" & "price".

In [5]:
# Creating dataframe of kafka stream data
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", server).option("subscribe", topic).load()
df = df.selectExpr("CAST(value AS STRING)")
# https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns
split_col = split(df['value'], ',')
df = df.withColumn('date', split_col.getItem(0))
df = df.withColumn('neighborhood_id', split_col.getItem(1))
df = df.withColumn('price', split_col.getItem(2))

Pim & Enric provided some guidance with the following code in order to make the predictions and save them to a dataframe in memory for periodic querying.

In [6]:
# creating predictions
predict = model.transform(df).select("date","neighborhood_id", "price", "prediction")
# adding column to calculate error
predict = predict.withColumn('error', (predict['price']-predict['prediction']))

There are different options for displaying the predictions. We can print them to console in real time or save them to a dataframe and query it periodically. Both options work, but I have chosen to print them to console in real time.


In [7]:
# start streaming the data and predictions to the console
query = predict.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

22/06/21 20:48:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/fm/mghr685s1l70_6sxmqyh0zlc0000gn/T/temporary-c36de88a-6026-4d97-a2eb-8e93747341ff. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/06/21 20:48:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+----+---------------+-----+----------+-----+
|date|neighborhood_id|price|prediction|error|
+----+---------------+-----+----------+-----+
+----+---------------+-----+----------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+---------------+-------+------------------+-----------------+
|                date|neighborhood_id|  price|        prediction|            error|
+--------------------+---------------+-------+------------------+-----------------+
|2022-06-21 18:48:...|       Q1904302|3479966|1121793.0438250077|2358172.956174992|
+--------------------+---------------+-------+------------------+-----------------+



[Stage 16:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+---------------+------+-----------------+-----------------+
|                date|neighborhood_id| price|       prediction|            error|
+--------------------+---------------+------+-----------------+-----------------+
|2022-06-21 18:48:...|       Q1026658|560034|502431.0746205747|57602.92537942529|
+--------------------+---------------+------+-----------------+-----------------+



[Stage 17:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+---------------+------+-----------------+------------------+
|                date|neighborhood_id| price|       prediction|             error|
+--------------------+---------------+------+-----------------+------------------+
|2022-06-21 18:48:...|       Q3298510|245007|312176.4716073198|-67169.47160731978|
+--------------------+---------------+------+-----------------+------------------+



[Stage 18:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+---------------+------+------------------+-------------------+
|                date|neighborhood_id| price|        prediction|              error|
+--------------------+---------------+------+------------------+-------------------+
|2022-06-21 18:48:...|        Q720994|350027|512946.81859019975|-162919.81859019975|
+--------------------+---------------+------+------------------+-------------------+



[Stage 19:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+---------------+------+------------------+------------------+
|                date|neighborhood_id| price|        prediction|             error|
+--------------------+---------------+------+------------------+------------------+
|2022-06-21 18:48:...|       Q3753110|289034|277663.04870485375|11370.951295146253|
+--------------------+---------------+------+------------------+------------------+



[Stage 20:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+---------------+------+-----------------+------------------+
|                date|neighborhood_id| price|       prediction|             error|
+--------------------+---------------+------+-----------------+------------------+
|2022-06-21 18:48:...|       Q3291762|449963|732745.2440426771|-282782.2440426771|
+--------------------+---------------+------+-----------------+------------------+



[Stage 21:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+---------------+------+------------------+------------------+
|                date|neighborhood_id| price|        prediction|             error|
+--------------------+---------------+------+------------------+------------------+
|2022-06-21 18:48:...|       Q3753110|268928|277663.04870485375|-8735.048704853747|
+--------------------+---------------+------+------------------+------------------+



[Stage 22:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+--------------------+---------------+------+-----------------+-----------------+
|                date|neighborhood_id| price|       prediction|            error|
+--------------------+---------------+------+-----------------+-----------------+
|2022-06-21 18:48:...|       Q3750859|765099|384644.9921087892|380454.0078912108|
+--------------------+---------------+------+-----------------+-----------------+



                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+--------------------+---------------+------+------------------+------------------+
|                date|neighborhood_id| price|        prediction|             error|
+--------------------+---------------+------+------------------+------------------+
|2022-06-21 18:48:...|       Q3321657|950057|1688292.4283983978|-738235.4283983978|
+--------------------+---------------+------+------------------+------------------+



[Stage 24:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+--------------------+---------------+------+-----------------+-----------------+
|                date|neighborhood_id| price|       prediction|            error|
+--------------------+---------------+------+-----------------+-----------------+
|2022-06-21 18:49:...|       Q1026658|749901|502431.0746205747|247469.9253794253|
+--------------------+---------------+------+-----------------+-----------------+



[Stage 25:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+--------------------+---------------+------+-----------------+-------------------+
|                date|neighborhood_id| price|       prediction|              error|
+--------------------+---------------+------+-----------------+-------------------+
|2022-06-21 18:49:...|       Q3296693|449918|571935.8509607569|-122017.85096075688|
+--------------------+---------------+------+-----------------+-------------------+



[Stage 26:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+--------------------+---------------+------+-----------------+----------------+
|                date|neighborhood_id| price|       prediction|           error|
+--------------------+---------------+------+-----------------+----------------+
|2022-06-21 18:49:...|       Q3320705|239949|235611.1218030587|4337.87819694131|
+--------------------+---------------+------+-----------------+----------------+



[Stage 27:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 13
-------------------------------------------
+--------------------+---------------+------+-----------------+------------------+
|                date|neighborhood_id| price|       prediction|             error|
+--------------------+---------------+------+-----------------+------------------+
|2022-06-21 18:49:...|        Q980253|249950|288158.1964944844|-38208.19649448438|
+--------------------+---------------+------+-----------------+------------------+



[Stage 28:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 14
-------------------------------------------
+--------------------+---------------+------+-----------------+-----------------+
|                date|neighborhood_id| price|       prediction|            error|
+--------------------+---------------+------+-----------------+-----------------+
|2022-06-21 18:49:...|       Q1026658|599936|502431.0746205747|97504.92537942529|
+--------------------+---------------+------+-----------------+-----------------+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/kat/opt/anaconda3/envs/bdm/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/kat/opt/anaconda3/envs/bdm/lib/python3.9/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/kat/opt/anaconda3/envs/bdm/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

To save predictions to a dataframe in memory, comment out the two code chunks above, and uncomment the three code chunks below.

In [8]:
query.stop()

22/06/21 20:49:15 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@b33eea1 is aborting.
22/06/21 20:49:15 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@b33eea1 aborted.
22/06/21 20:49:16 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException
	at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:216)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.datasour

In [None]:
## save streaming data and predictions into the dataframe in memory.
# predict.writeStream.queryName("predict").format("memory").start()

In [None]:
## query the "predict" dataframe after a few seconds to see results
# spark.sql("select * from predict").show()

In [None]:
#spark.streams.active[0].stop()

In [9]:
spark.stop()