In [1]:
import os
import findspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split


findspark.init()

In [2]:
SUBMIT_ARGS = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.kafka:kafka-clients:2.8.1 pyspark-shell'


# SUBMIT_ARGS = f'--packages ' \
#               f'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,' \
#               f'org.apache.kafka:kafka-clients:2.8.1 ' \
#               f'pyspark-shell'


os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .appName("KafkaIntegration") \
    .getOrCreate()


:: loading settings :: url = jar:file:/opt/spark-3.5.0/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/hso/.ivy2/cache
The jars for the packages stored in: /Users/hso/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1ba103f2-56b3-4930-81a0-3f131b530350;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution

In [4]:
kafka_bootstrap_server = "localhost:9092"
kafka_topic = "video-stream-event"

kafka_stream_df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_server) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("includeHeaders", "true") \
    .load()

In [None]:
# lines = kafka_stream_df.selectExpr("CAST(value AS STRING)") # from now-on lines is value column of received stream.



# what if I ignore the above line? ans: it works fine

# This behavior is because Spark can often infer the correct data type based on the context in which a column is used within an expression.
# If Spark detects that the column is being used in a string manipulation function like split(), it can infer that the column should be treated as a string.

# split("value", " "): Splits the "value" column, which now contains the string representation of the Kafka message, into an array of words based on the space character (" ").
# explode(...): Transforms each element of the array into a separate row, effectively "exploding" the array into multiple rows.
# alias("word"): Renames the resulting column to "word". This step is optional, but it gives a more meaningful name to the column.

words = kafka_stream_df.select(explode(split(kafka_stream_df.value, " ")).alias("word"))

# # Perform word count
word_counts = words.groupBy("word").count()

# Start the query
"""
    Different output modes:
        1. Complete: shows the complete track of the received data (aggregations applied) (in our example all words count).
        2. Update: shoes the only updated stream (in our example number of words those added newly our those their count has been updated)

        ***** Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; ****
        ***** Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets; ****
"""
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start() \
    .awaitTermination()

24/03/26 10:12:53 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/v4/r34w0mtd5xzfzyxp6hnbp8y00000gn/T/temporary-cf2b49c2-0ff4-46ef-a36d-f081ebbf1acd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/26 10:12:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/26 10:12:54 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|   word|count|
+-------+-----+
|      3|    1|
|chetori|    1|
|   baba|    4|
|khoobi?|    1|
|  salam|    1|
|      1|    5|
|    alo|    1|
|       |    1|
|      2|    1|
+-------+-----+

