In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace

scala_version = '2.13'
spark_version = '3.5.5'
packages = [ f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}','org.apache.kafka:kafka-clients:2.10.0']
spark = SparkSession.builder.master('local')\
.appName("kafka-example").config("spark.jars.packages", ",".join(packages))\
.config("spark.memory.offHeap.enabled","true")\
.config("spark.memory.offHeap.size","10g")\
.config("spark.driver.memory", "16g")\
.getOrCreate()

In [2]:
topic_name = 'VideoStreaming'
kafka_server = 'localhost:9092'

streamRawDf = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", kafka_server)\
    .option("subscribe", topic_name)\
    .load()

streamDF = streamRawDf.select(col('topic'),col('offset'),col('value').cast('string'), col('partition'))

streamDF_partition_0 = streamDF.filter(col('partition') == 0)
streamDF_partition_1 = streamDF.filter(col('partition') == 1)
streamDF_partition_2 = streamDF.filter(col('partition') == 2)

In [4]:
stream_writer0 = (streamDF_partition_0.writeStream.queryName("CameraStreaming0").trigger(processingTime="5 seconds").outputMode("append").format("memory"))
stream_writer1 = (streamDF_partition_1.writeStream.queryName("CameraStreaming1").trigger(processingTime="5 seconds").outputMode("append").format("memory"))
stream_writer2 = (streamDF_partition_2.writeStream.queryName("CameraStreaming2").trigger(processingTime="5 seconds").outputMode("append").format("memory"))

query0 = stream_writer0.start()   
query1 = stream_writer1.start()   
query2 = stream_writer2.start()   


In [5]:
from time import sleep
from IPython.display import display, clear_output
for x in range(0, 100):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        result0 = spark.sql(f"SELECT * FROM {query0.name}")
        result1 = spark.sql(f"SELECT * FROM {query1.name}")
        result2 = spark.sql(f"SELECT * FROM {query2.name}")
        display(result0.toPandas())
        display(result1.toPandas())
        display(result2.toPandas())

        sleep(3)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")


Showing live view refreshed every 5 seconds
Seconds passed: 35


Unnamed: 0,topic,offset,value,partition
0,VideoStreaming,641,�����JFIF���������C�...,0
1,VideoStreaming,642,�����JFIF���������C�...,0
2,VideoStreaming,643,�����JFIF���������C�...,0
3,VideoStreaming,644,�����JFIF���������C�...,0
4,VideoStreaming,645,�����JFIF���������C�...,0
...,...,...,...,...
277,VideoStreaming,918,�����JFIF���������C�...,0
278,VideoStreaming,919,�����JFIF���������C�...,0
279,VideoStreaming,920,�����JFIF���������C�...,0
280,VideoStreaming,921,�����JFIF���������C�...,0


break
Live view ended...
