In [65]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, window, col
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.dataframe import DataFrame
from time import sleep

In [66]:
TOPIC = "market"
KAFKA_SERVER = "localhost"
KAFKA_PORT = 9094
KAFKA_CLIENT_VERSION = "3.7.0"

SCALA_VERSION = '2.12'
SPARK_VERSION = '3.5.1'
SPARK_MASTER = "local[*]"
SHUFFLE_PARTITIONS = 5

APP_NAME = "BigDataStreaming"

In [67]:
from pyspark.sql import SparkSession

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION}',
    f'org.apache.kafka:kafka-clients:{KAFKA_CLIENT_VERSION}',
    f'org.apache.spark:spark-avro_{SCALA_VERSION}:{SPARK_VERSION}'
]

spark = SparkSession.builder\
   .master(SPARK_MASTER)\
   .appName(APP_NAME)\
   .config("spark.jars.packages", ",".join(packages))\
   .config("spark.sql.shuffle.partitions", SHUFFLE_PARTITIONS)\
   .getOrCreate()
spark

In [68]:
market_stream: DataFrame = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", f"{KAFKA_SERVER}:{KAFKA_PORT}")\
    .option("subscribe", "market")\
    .option("startingOffsets", "earliest")\
    .load()
market_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Dataframe transformation to extract AVRO data into corresponding fields, and format the timestamp correctly

In [69]:
avro_schema = open("trade.avsc", "r").read()

trades_stream = market_stream\
    .withColumn("trade_data", from_avro("value", avro_schema))\
    .select("trade_data")\
    .select("trade_data.*")\
    .select(explode("data"),"type")\
    .select("col.*")\
    .selectExpr("p as price", "s as symbol", "v as volume", "cast(cast(t as double) / 1000 as timestamp) as event_time")    

query for the trades table

In [70]:
trades_query = trades_stream\
    .writeStream\
    .queryName("trades")\
    .format("memory")\
    .outputMode("append")\
    .start()

24/04/17 00:36:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2c9a4cc3-671a-4afc-8366-9ea8788ad599. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/17 00:36:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [71]:
while 1:
    spark.sql("SELECT * FROM trades").show()
    sleep(1)

+-----+------+------+----------+
|price|symbol|volume|event_time|
+-----+------+------+----------+
+-----+------+------+----------+

+--------+---------------+-------+--------------------+
|   price|         symbol| volume|          event_time|
+--------+---------------+-------+--------------------+
|63019.33|BINANCE:BTCUSDT|0.00318|2024-04-16 13:13:...|
|63019.33|BINANCE:BTCUSDT|0.02947|2024-04-16 13:13:...|
|63019.33|BINANCE:BTCUSDT| 0.0268|2024-04-16 13:13:...|
|63019.34|BINANCE:BTCUSDT|0.00199|2024-04-16 13:13:...|
|63019.34|BINANCE:BTCUSDT|0.00176|2024-04-16 13:13:...|
|63019.34|BINANCE:BTCUSDT| 1.6E-4|2024-04-16 13:13:...|
|63019.35|BINANCE:BTCUSDT|0.00154|2024-04-16 13:13:...|
| 63019.4|BINANCE:BTCUSDT| 9.0E-5|2024-04-16 13:13:...|
|63020.22|BINANCE:BTCUSDT| 9.0E-5|2024-04-16 13:13:...|
|63021.04|BINANCE:BTCUSDT| 9.0E-5|2024-04-16 13:13:...|
|63019.33|BINANCE:BTCUSDT|0.00318|2024-04-16 13:13:...|
|63019.33|BINANCE:BTCUSDT|0.02947|2024-04-16 13:13:...|
|63019.33|BINANCE:BTCUSDT| 

KeyboardInterrupt: 

query for the minute_trades table to calculate the count and average price of trades on a window of 1 minute

In [72]:
minute_trades_query = trades_stream\
    .withWatermark("event_time", "2 minutes")\
    .groupby(window(col("event_time"), "1 minute"))\
    .agg({"*" : "count", "price" : "avg"})\
    .writeStream\
    .queryName("minute_trades")\
    .format("memory")\
    .outputMode("complete")\
    .start("")

24/04/17 00:37:03 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-79a23478-10ba-4330-bb9e-d7058a42a8cd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/17 00:37:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

In [74]:
while 1:
    spark.sql("SELECT * FROM minute_trades").show()
    sleep(1)

+--------------------+------------------+--------+
|              window|        avg(price)|count(1)|
+--------------------+------------------+--------+
|{2024-04-16 13:17...|  62902.6971428569|     329|
|{2024-04-16 13:16...| 62976.46799999992|     120|
|{2024-04-17 00:16...|63735.570000000254|     680|
|{2024-04-17 00:33...| 63780.57999999937|     497|
|{2024-04-16 13:13...|63019.602000000006|      30|
|{2024-04-16 20:46...|          62729.94|       2|
|{2024-04-17 00:09...| 63711.96999999992|     132|
+--------------------+------------------+--------+

+--------------------+------------------+--------+
|              window|        avg(price)|count(1)|
+--------------------+------------------+--------+
|{2024-04-16 13:17...|  62902.6971428569|     329|
|{2024-04-16 13:16...| 62976.46799999992|     120|
|{2024-04-17 00:16...|63735.570000000254|     680|
|{2024-04-17 00:33...| 63780.57999999937|     502|
|{2024-04-16 13:13...|63019.602000000006|      30|
|{2024-04-16 20:46...|        

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/furypirate/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/furypirate/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [75]:
spark.stop()

24/04/17 00:41:04 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:632)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:610)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:453)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.