In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("KafkaReadExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

25/01/05 13:23:08 WARN Utils: Your hostname, hadoop resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/01/05 13:23:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-088fafc6-4f8d-4de5-8186-cf69f9ee63c1;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/sp

In [2]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, LongType

In [3]:
# Kafka configuration
kafka_brokers = "localhost:9092"  # Replace with your Kafka broker address
topic = "model-topic"

# Define the schema of the JSON data
schema = StructType() \
    .add("symbol", StringType()) \
    .add("timestamp", LongType()) \
    .add("source", StringType()) \
    .add("data_type", StringType()) \
    .add("bid", DoubleType()) \
    .add("ask", DoubleType()) \
    .add("price", DoubleType()) \
    .add("volume", DoubleType()) \
    .add("spread_raw", DoubleType()) \
    .add("spread_table", DoubleType()) \
    .add("volatility", DoubleType()) \
    .add("market_sentiment", DoubleType()) \
    .add("trading_activity", DoubleType())

In [4]:
# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

In [5]:
parsed_df = df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")  # Flatten the nested structure

# Example: Filter and display rows where price is greater than 0
filtered_df = parsed_df.filter(col("price") > 0)

In [9]:
query = filtered_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

25/01/05 13:27:28 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f1f43028-402b-4c9a-ac67-cee587966858. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/01/05 13:27:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 5
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083644099|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.03|1.2892229E7|-1.0      |-1.0        |0.56      |-0.318          |25.16           |
|SHEL  |1736083644761|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.92 |4691606.0  |-1.0      |-1.0        |0.471     |0.53            |31.96           |
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 32
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083644761|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.92|4691606.0|-1.0      |-1.0        |0.471     |0.53            |31.96           |
|BP    |1736083646735|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.09|6559706.0|-1.0      |-1.0        |0.61      |-0.94           |74.22           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+



25/01/05 13:27:30 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083648481|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.98|1.2627013E7|-1.0      |-1.0        |0.557     |-0.351          |26.83           |
|COP   |1736083647727|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.22|3570777.0  |-1.0      |-1.0        |0.424     |0.402           |27.68           |
|BP    |1736083646735|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.09 |6559706.0  |-1.0      |-1.0        |0.61      |-0.94           |74.22           |
+------+-------------+----------+------

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083648481|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.98|1.2627013E7|-1.0      |-1.0        |0.557     |-0.351          |26.83           |
|COP   |1736083647727|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.22|3570777.0  |-1.0      |-1.0        |0.424     |0.402           |27.68           |
|BP    |1736083646735|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.09 |6559706.0  |-1.0      |-1.0        |0.61      |-0.94           |74.22           |
|COP   |1736083649834|YLIFE_FEED|MARKET

[Stage 47:>                                                         (0 + 3) / 3]

-------------------------------------------
Batch: 33
-------------------------------------------


                                                                                

+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083648481|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.98|1.2627013E7|-1.0      |-1.0        |0.557     |-0.351          |26.83           |
|COP   |1736083647727|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.22|3570777.0  |-1.0      |-1.0        |0.424     |0.402           |27.68           |
|COP   |1736083649834|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.24|3400598.0  |-1.0      |-1.0        |0.422     |0.446           |24.69           |
|SHEL  |1736083651009|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.83 |4518691.0  |-1.0      |-1.0        |0.475     |0.459           |34.61     

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|SHEL  |1736081410598|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.34 |5021535.0  |-1.0      |-1.0        |0.402     |-0.313          |60.21           |
|SHEL  |1736081410738|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.52 |5138018.0  |-1.0      |-1.0        |0.397     |-0.293          |63.04           |
|COP   |1736081412351|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|100.72|3197362.0  |-1.0      |-1.0        |1.487     |0.197           |30.91           |
|XOM   |1736081420723|YLIFE_FEED|MARKET

                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083651009|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.83 |4518691.0  |-1.0      |-1.0        |0.475     |0.459           |34.61           |
|XOM   |1736083652905|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.98|1.2191945E7|-1.0      |-1.0        |0.553     |-0.416          |31.03           |
|BP    |1736083653570|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.03 |6864695.0  |-1.0      |-1.0        |0.6       |-1.0            |71.89           |
+------+-------------+----------+------

                                                                                

-------------------------------------------
Batch: 34
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083654227|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.56|1.1796834E7|-1.0      |-1.0        |0.553     |-0.427          |29.31           |
|SHEL  |1736083654465|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.87 |4413580.0  |-1.0      |-1.0        |0.479     |0.481           |36.68           |
|XOM   |1736083652905|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.98|1.2191945E7|-1.0      |-1.0        |0.553     |-0.416          |31.03           |
|BP    |1736083653570|YLIFE_FEED|MARKE

ERROR:root:KeyboardInterrupt while sending command.                 (0 + 2) / 2]
Traceback (most recent call last):
  File "/home/hadoop/miniconda3/envs/kafka_stream_preprocessing/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/hadoop/miniconda3/envs/kafka_stream_preprocessing/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/hadoop/miniconda3/envs/kafka_stream_preprocessing/lib/python3.9/socket.py", line 716, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083654227|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.56|1.1796834E7|-1.0      |-1.0        |0.553     |-0.427          |29.31           |
|SHEL  |1736083654465|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.87 |4413580.0  |-1.0      |-1.0        |0.479     |0.481           |36.68           |
|XOM   |1736083655517|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.03|1.2153447E7|-1.0      |-1.0        |0.549     |-0.404          |25.94           |
|XOM   |1736083652905|YLIFE_FEED|MARKET

KeyboardInterrupt: 

-------------------------------------------
Batch: 8
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083654227|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.56|1.1796834E7|-1.0      |-1.0        |0.553     |-0.427          |29.31           |
|SHEL  |1736083654465|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.87 |4413580.0  |-1.0      |-1.0        |0.479     |0.481           |36.68           |
|XOM   |1736083655517|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.03|1.2153447E7|-1.0      |-1.0        |0.549     |-0.404          |25.94           |
|BP    |1736083653570|YLIFE_FEED|MARKET

                                                                                

-------------------------------------------
Batch: 35
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083655517|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.03|1.2153447E7|-1.0      |-1.0        |0.549     |-0.404          |25.94           |
|BP    |1736083657635|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.12 |7081121.0  |-1.0      |-1.0        |0.6       |-0.928          |74.73           |
|SHEL  |1736083656464|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.71 |4507277.0  |-1.0      |-1.0        |0.48      |0.483           |36.01           |
|XOM   |1736083657077|YLIFE_FEED|MARKE

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|BP    |1736083657635|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.12 |7081121.0  |-1.0      |-1.0        |0.6       |-0.928          |74.73           |
|COP   |1736083659113|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.18|3334304.0  |-1.0      |-1.0        |0.428     |0.513           |25.14           |
|SHEL  |1736083656464|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.71 |4507277.0  |-1.0      |-1.0        |0.48      |0.483           |36.01           |
|XOM   |1736083657077|YLIFE_FEED|MARKET

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083657635|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.12 |7081121.0|-1.0      |-1.0        |0.6       |-0.928          |74.73           |
|COP   |1736083659113|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.18|3334304.0|-1.0      |-1.0        |0.428     |0.513           |25.14           |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 36
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083661204|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.09 |7343366.0|-1.0      |-1.0        |0.605     |-1.0            |73.82           |
|COP   |1736083659113|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.18|3334304.0|-1.0      |-1.0        |0.428     |0.513           |25.14           |
|COP   |1736083660822|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|102.82|3496233.0|-1.0      |-1.0        |0.423     |0.479           |23.51           |
+------+-------------+----------+-----------+----+



-------------------------------------------
Batch: 3
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083661204|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.09 |7343366.0|-1.0      |-1.0        |0.605     |-1.0            |73.82           |
|COP   |1736083660822|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|102.82|3496233.0|-1.0      |-1.0        |0.423     |0.479           |23.51           |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083661204|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.09 |7343366.0|-1.0      |-1.0        |0.605     |-1.0            |73.82           |
|BP    |1736083663046|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.97 |7540458.0|-1.0      |-1.0        |0.616     |-1.0            |78.01           |
|COP   |1736083660822|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|102.82|3496233.0|-1.0      |-1.0        |0.423     |0.479           |23.51           |
+------+-------------+----------+-----------+----+



-------------------------------------------
Batch: 4
-------------------------------------------


                                                                                

+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083663046|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.97 |7540458.0|-1.0      |-1.0        |0.616     |-1.0            |78.01           |
|COP   |1736083663870|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|102.93|3647774.0|-1.0      |-1.0        |0.426     |0.507           |21.1            |
|BP    |1736083663610|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.07 |7237780.0|-1.0      |-1.0        |0.621     |-0.953          |76.04           |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|COP   |1736083663870|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|102.93|3647774.0  |-1.0      |-1.0        |0.426     |0.507           |21.1            |
|XOM   |1736083665490|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.3 |1.1907401E7|-1.0      |-1.0        |0.549     |-0.396          |23.5            |
|BP    |1736083663610|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.07 |7237780.0  |-1.0      |-1.0        |0.621     |-0.953          |76.04           |
+------+-------------+----------+-----

                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|COP   |1736083663870|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|102.93|3647774.0  |-1.0      |-1.0        |0.426     |0.507           |21.1            |
|XOM   |1736083665490|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.3 |1.1907401E7|-1.0      |-1.0        |0.549     |-0.396          |23.5            |
|BP    |1736083663610|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.07 |7237780.0  |-1.0      |-1.0        |0.621     |-0.953          |76.04           |
|COP   |1736083666313|YLIFE_FEED|MARKE

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083665490|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.3 |1.1907401E7|-1.0      |-1.0        |0.549     |-0.396          |23.5            |
|COP   |1736083666313|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.33|3795828.0  |-1.0      |-1.0        |0.427     |0.568           |18.8            |
|BP    |1736083666520|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.14 |7166698.0  |-1.0      |-1.0        |0.632     |-1.0            |79.88           |
|BP    |1736083668229|YLIFE_FEED|MARKET



-------------------------------------------
Batch: 12
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083666313|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.33|3795828.0|-1.0      |-1.0        |0.427     |0.568           |18.8            |
|BP    |1736083666520|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.14 |7166698.0|-1.0      |-1.0        |0.632     |-1.0            |79.88           |
|BP    |1736083668229|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.0  |6831533.0|-1.0      |-1.0        |0.626     |-1.0            |80.91           |
+------+-------------+----------+-----------+----+

                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
-------------------------------------------
Batch: 39
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083666520|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.14|7166698.0|-1.0      |-1.0        |0.632     |-1.0            |79.88           |
|BP    |1736083668229|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.0 |6831533.0|-1.0      |-1.0        |0.626     |-1.0            |80.91           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----

                                                                                

-------------------------------------------
Batch: 13
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083669911|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.87|6546943.0|-1.0      |-1.0        |0.617     |-1.0            |82.99           |
|BP    |1736083671574|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|31.03|6635440.0|-1.0      |-1.0        |0.618     |-0.961          |83.98           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch: 13
-

                                                                                

-------------------------------------------
Batch: 14
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083674778|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.54|3980459.0|-1.0      |-1.0        |0.422     |0.647           |16.45           |
|BP    |1736083674583|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.93 |6591958.0|-1.0      |-1.0        |0.626     |-0.94           |81.7            |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083674778|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.54|3980459.0|-1.0      |-1.0        |0.422     |0.647           |16.45           |
|BP    |1736083674583|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.93 |6591958.0|-1.0      |-1.0        |0.626     |-0.94           |81.7            |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch:

                                                                                

-------------------------------------------
Batch: 15
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083676778|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.56|4436033.0|-1.0      |-1.0        |0.473     |0.394           |32.81           |
|BP    |1736083677468|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.86|6657648.0|-1.0      |-1.0        |0.629     |-0.924          |86.01           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083676778|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.56|4436033.0|-1.0      |-1.0        |0.473     |0.394           |32.81           |
|BP    |1736083677468|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.86|6657648.0|-1.0      |-1.0        |0.629     |-0.924          |86.01           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch: 42
--

[Stage 86:>                                                         (0 + 1) / 1]

-------------------------------------------
Batch: 16
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083679114|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.9 |6731741.0|-1.0      |-1.0        |0.619     |-0.847          |85.73           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+



[Stage 87:>   (0 + 1) / 1][Stage 88:>   (0 + 1) / 1][Stage 89:>   (0 + 1) / 1]1]

In [10]:
query.stop()


25/01/05 13:28:03 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 10, writer: ConsoleWriter[numRows=20, truncate=false]] is aborting.
25/01/05 13:28:03 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 10, writer: ConsoleWriter[numRows=20, truncate=false]] aborted.
                                                                                

-------------------------------------------
Batch: 17
-------------------------------------------
-------------------------------------------
Batch: 43
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083679114|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.9 |6731741.0|-1.0      |-1.0        |0.619     |-0.847          |85.73           |
|BP    |1736083680232|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.75|6569666.0|-1.0      |-1.0        |0.607     |-0.911          |88.34           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----

+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083679114|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.9 |6731741.0|-1.0      |-1.0        |0.619     |-0.847          |85.73           |
|BP    |1736083680232|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.75|6569666.0|-1.0      |-1.0        |0.607     |-0.911          |88.34           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 17
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083682235|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.81|4152847.0|-1.0      |-1.0        |0.429     |0.722           |15.49           |
|BP    |1736083680232|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.75 |6569666.0|-1.0      |-1.0        |0.607     |-0.911          |88.34           |
|SHEL  |1736083681982|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.55 |4458097.0|-1.0      |-1.0        |0.476     |0.471           |35.74           |
+------+-------------+----------+-----------+----+

                                                                                

-------------------------------------------
Batch: 18
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083682235|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.81|4152847.0|-1.0      |-1.0        |0.429     |0.722           |15.49           |
|SHEL  |1736083684030|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.38 |4349685.0|-1.0      |-1.0        |0.474     |0.464           |31.07           |
|SHEL  |1736083681982|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.55 |4458097.0|-1.0      |-1.0        |0.476     |0.471           |35.74           |
+------+-------------+----------+-----------+----+

                                                                                

-------------------------------------------
Batch: 18
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume    |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+----------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083684030|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.38 |4349685.0 |-1.0      |-1.0        |0.474     |0.464           |31.07           |
|XOM   |1736083685317|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.69|1.175601E7|-1.0      |-1.0        |0.55      |-0.327          |22.19           |
|SHEL  |1736083685578|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.67 |4139533.0 |-1.0      |-1.0        |0.483     |0.547           |34.63           |
+------+-------------+----------+-----------

                                                                                

-------------------------------------------
Batch: 19
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume    |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+----------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083686845|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.74 |4215046.0 |-1.0      |-1.0        |0.483     |0.559           |36.42           |
|XOM   |1736083685317|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|111.69|1.175601E7|-1.0      |-1.0        |0.55      |-0.327          |22.19           |
|SHEL  |1736083685578|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.67 |4139533.0 |-1.0      |-1.0        |0.483     |0.547           |34.63           |
|BP    |1736083687036|YLIFE_FEED|MARKET_DATA

                                                                                

-------------------------------------------
Batch: 46
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083689034|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.94|4074377.0|-1.0      |-1.0        |0.433     |0.721           |16.88           |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch: 20
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+----------

                                                                                

-------------------------------------------
Batch: 47
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083690819|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|104.21|3872310.0|-1.0      |-1.0        |0.432     |0.678           |14.48           |
|COP   |1736083690968|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|104.01|3972033.0|-1.0      |-1.0        |0.435     |0.775           |13.48           |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch

                                                                                

-------------------------------------------
Batch: 48
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|BP    |1736083692217|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.69 |6814781.0  |-1.0      |-1.0        |0.596     |-0.915          |83.63           |
|BP    |1736083693405|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.55 |7092098.0  |-1.0      |-1.0        |0.597     |-1.0            |79.18           |
|COP   |1736083693100|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.71|4048214.0  |-1.0      |-1.0        |0.437     |0.75            |12.29           |
|XOM   |1736083693952|YLIFE_FEED|MARKE

                                                                                

-------------------------------------------
Batch: 23
-------------------------------------------
-------------------------------------------
Batch: 24
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083695717|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.57 |4043034.0  |-1.0      |-1.0        |0.489     |0.646           |38.57           |
|XOM   |1736083696284|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.25|1.2093181E7|-1.0      |-1.0        |0.546     |-0.419          |25.58           |
+------+-------------+----------+-----------+----+----+------+-----------+----------

                                                                                

-------------------------------------------
Batch: 50
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083696284|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.25|1.2093181E7|-1.0      |-1.0        |0.546     |-0.419          |25.58           |
|COP   |1736083697633|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.85|4144550.0  |-1.0      |-1.0        |0.433     |0.757           |14.37           |
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 24
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|COP   |1736083697633|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.85|4144550.0|-1.0      |-1.0        |0.433     |0.757           |14.37           |
|SHEL  |1736083697916|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.36 |3999415.0|-1.0      |-1.0        |0.492     |0.572           |41.2            |
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch

                                                                                

-------------------------------------------
Batch: 25
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083700071|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.93|1.2107616E7|-1.0      |-1.0        |0.541     |-0.434          |19.49           |
|XOM   |1736083699213|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.45|1.2230219E7|-1.0      |-1.0        |0.549     |-0.435          |22.89           |
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 26
-------------------------------------------
-------------------------------------------
Batch: 52
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083700071|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.93|1.2107616E7|-1.0      |-1.0        |0.541     |-0.434          |19.49           |
|XOM   |1736083699213|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.45|1.2230219E7|-1.0      |-1.0        |0.549     |-0.435          |22.89           |
+------+-------------+----------+-----------+----+----+------+-----------+----------

                                                                                

-------------------------------------------
Batch: 27
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083703315|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|64.13 |3884693.0  |-1.0      |-1.0        |0.483     |0.757           |40.89           |
|XOM   |1736083703766|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.51|1.1709951E7|-1.0      |-1.0        |0.534     |-0.42           |24.15           |
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+

-------------------------------------

                                                                                

-------------------------------------------
Batch: 28
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume     |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083705426|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.62|1.093833E7 |-1.0      |-1.0        |0.535     |-0.444          |24.82           |
|XOM   |1736083704638|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|113.03|1.1253232E7|-1.0      |-1.0        |0.539     |-0.446          |25.4            |
+------+-------------+----------+-----------+----+----+------+-----------+----------+------------+----------+----------------+----------------+



                                                                                

-------------------------------------------
Batch: 29
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+----------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume    |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+----------+----------+------------+----------+----------------+----------------+
|XOM   |1736083705426|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|112.62|1.093833E7|-1.0      |-1.0        |0.535     |-0.444          |24.82           |
+------+-------------+----------+-----------+----+----+------+----------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch: 55
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+----------+----------+----

                                                                                

-------------------------------------------
Batch: 29
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|BP    |1736083707223|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.47|6905611.0|-1.0      |-1.0        |0.597     |-0.938          |78.83           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+



In [11]:
for active_query in spark.streams.active:
    active_query.stop()


25/01/05 13:28:30 WARN Shell: Interrupted while joining on: Thread[Thread-2701,5,]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1300)
	at java.base/java.lang.Thread.join(Thread.java:1375)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutp

-------------------------------------------
Batch: 30
-------------------------------------------
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price|volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083709032|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|63.91|3868376.0|-1.0      |-1.0        |0.489     |0.754           |39.31           |
|BP    |1736083707223|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|30.47|6905611.0|-1.0      |-1.0        |0.597     |-0.938          |78.83           |
+------+-------------+----------+-----------+----+----+-----+---------+----------+------------+----------+----------------+----------------+

-------------------------------------------
Batch: 56
-

                                                                                

-------------------------------------------
Batch: 30
-------------------------------------------
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|symbol|timestamp    |source    |data_type  |bid |ask |price |volume   |spread_raw|spread_table|volatility|market_sentiment|trading_activity|
+------+-------------+----------+-----------+----+----+------+---------+----------+------------+----------+----------------+----------------+
|SHEL  |1736083709032|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|63.91 |3868376.0|-1.0      |-1.0        |0.489     |0.754           |39.31           |
|COP   |1736083710416|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.17|4131844.0|-1.0      |-1.0        |0.422     |0.794           |20.85           |
|COP   |1736083710032|YLIFE_FEED|MARKET_DATA|-1.0|-1.0|103.44|4211020.0|-1.0      |-1.0        |0.425     |0.837           |16.88           |
+------+-------------+----------+-----------+----+

25/01/05 13:28:32 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 57, writer: ConsoleWriter[numRows=20, truncate=false]] is aborting.
25/01/05 13:28:32 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 57, writer: ConsoleWriter[numRows=20, truncate=false]] aborted.
25/01/05 13:28:32 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 31, writer: ConsoleWriter[numRows=20, truncate=false]] is aborting.
25/01/05 13:28:32 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 31, writer: ConsoleWriter[numRows=20, truncate=false]] aborted.
25/01/05 13:28:32 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException
	at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:267)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Itera

25/01/05 13:28:32 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException
	at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:267)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally

In [None]:
# Kafka configuration
kafka_brokers = "localhost:9092"  # Replace with your Kafka broker address
topic = "model-topic"

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

# Select the key and value from Kafka messages
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Start the streaming query to console for testing
query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()