In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, ArrayType, DoubleType, LongType, IntegerType
from pyspark.sql.functions import from_json, col
from pyspark.sql.streaming import StreamingQueryListener

In [2]:
# Listener for tracking spark batches status and rows numbers
class SparkListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")

    def onQueryProgress(self, event):
        progress = event.progress
        batch_id = progress['batchId']
        num_input_rows = progress['numInputRows']
        print(f"Batch {batch_id} processed, rows: {num_input_rows}")

    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")


In [3]:
# New spark session
spark_inst = SparkSession.builder.appName("KafkaSparkConsumer")\
    .master("local[*]").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0")\
    .getOrCreate()
spark_inst.streams.addListener(SparkListener())

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/15 17:03:47 WARN Utils: Your hostname, DESKTOP-5C2PQBK, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/06/15 17:03:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/yagel/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/yagel/.ivy2.5.2/cache
The jars for the packages stored in: /home/yagel/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8b8f54c1-076f-4b88-88bc-414ee99ac73a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.0 in central
	found org.apache.kafka#kafka-clients;3.9.0 in central
	found org.l

Query started: 1acc8e2f-8737-43a5-80ba-3d545de5be44


In [6]:

# Read webstock stream from kafka broker for my stock_stream topic
df = spark_inst.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "172.20.15.243:9092") \
    .option("subscribe", "stock_stream") \
    .option("startingOffsets", "earliest") \
    .load()

In [7]:
# Convert binary value to string and parse JSON
schema = StructType()\
    .add("c", ArrayType(StringType())) \
    .add("p", DoubleType()) \
    .add("s", StringType()) \
    .add("t", LongType()) \
    .add("v", IntegerType())

In [8]:
# Convert kafka binary data to json/str
json_df = df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

In [9]:
# Remove unncessary columns and renaming columns
transformed_df = json_df.select(
    col("s").alias("stock_symbol"),
    col("p").alias("last_price"),
    (col("t") / 1000).cast("timestamp").alias("timestamp"),
    col("v").alias("volume")
)

In [10]:
# Print streaming data to console  
# query = transformed_df.writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .start()
#query.awaitTermination()
query = transformed_df.writeStream \
    .format("parquet") \
    .option("path", "./data/stock_parquet/") \
    .option("checkpointLocation", "./data/checkpoints/") \
    .outputMode("append") \
    .start()

25/06/15 17:05:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [11]:
df= spark_inst.read.parquet("./data/stock_parquet/")
df.show(2)

                                                                                

+---------------+----------+--------------------+------+
|   stock_symbol|last_price|           timestamp|volume|
+---------------+----------+--------------------+------+
|           AAPL|    201.77|2025-06-09 21:18:...|   100|
|BINANCE:BTCUSDT| 108479.98|2025-06-09 21:18:...|  NULL|
+---------------+----------+--------------------+------+
only showing top 2 rows


In [None]:
#df = df.repartition(10)  # Set reasonable partition count for large-scale
df.groupBy("stock_symbol").count().orderBy(col('count').desc()).show()

In [None]:
df.rdd.getNumPartitions()