In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

scala_version = '2.12'  # your scala version
spark_version = '3.0.1' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:2.8.0' #your kafka version
]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages", ",".join(packages)).getOrCreate()
spark

In [2]:
packages

['org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1',
 'org.apache.kafka:kafka-clients:2.8.0']

## Creating a Kafka Source for Batch Queries
If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets.

In [3]:
topic_name = 'RandomNumber'
kafka_server = 'localhost:9092'

kafkaDf = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).option("startingOffsets", "earliest").load()

In [4]:
kafkaDf.toPandas()

Unnamed: 0,key,value,topic,partition,offset,timestamp,timestampType
0,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,0,2023-12-15 23:21:39.118,0
1,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,1,2023-12-15 23:21:44.128,0
2,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,2,2023-12-15 23:21:49.141,0
3,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,3,2023-12-15 23:21:54.148,0
4,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,4,2023-12-15 23:21:59.149,0


In [5]:
from pyspark.sql.functions import col, concat, lit
#keep running this cell to get new message from the Kafka topic
kafkaDf.select(col('topic'),col('offset'),col('value').cast('string').substr(12,1).alias('rand_number')).toPandas()

Unnamed: 0,topic,offset,rand_number
0,RandomNumber,0,0
1,RandomNumber,1,1
2,RandomNumber,2,2
3,RandomNumber,3,3
4,RandomNumber,4,4
5,RandomNumber,5,5


In [6]:
batchDF = kafkaDf.select(col('topic'),col('offset'),col('value').cast('string').substr(12,1).alias('rand_number'))


from time import sleep
from IPython.display import display, clear_output

for x in range(0, 100):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        display(batchDF.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Showing live view refreshed every 5 seconds
Seconds passed: 405


Unnamed: 0,topic,offset,rand_number
0,RandomNumber,0,0
1,RandomNumber,1,1
2,RandomNumber,2,2
3,RandomNumber,3,3
4,RandomNumber,4,4
...,...,...,...
84,RandomNumber,84,8
85,RandomNumber,85,8
86,RandomNumber,86,8
87,RandomNumber,87,8


break
Live view ended...


In [7]:
batchCountDF = batchDF.groupBy('rand_number').count()
for x in range(0, 100):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        display(batchCountDF.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Live view ended...


## Creating a Kafka Source for Streaming Queries

In [8]:

streamRawDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).load()
streamDF = streamRawDf.select(col('topic'),col('offset'),col('value').cast('string').substr(12,1).alias('rand_number'))
checkEvenDF = streamDF.withColumn('Is_Even',col('rand_number').cast('int') % 2 == 0 )


In [9]:
from random import randint
randNum=str(randint(0,10000))
q1name = "queryNumber"+randNum
q2name = "queryCheckEven"+randNum

stream_writer1 = (streamDF.writeStream.queryName(q1name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))
stream_writer2 = (checkEvenDF.writeStream.queryName(q2name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))

query1 = stream_writer1.start()
query2 = stream_writer2.start()


In [10]:
for x in range(0, 100):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        result1 = spark.sql(f"SELECT * from {query1.name}")
        result2 = spark.sql(f"SELECT * from {query2.name}")
        display(result1.toPandas())
        display(result2.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Live view ended...
