# Load

- Import libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
from pymongo import MongoClient

- Constants

In [2]:
APP_NAME = "KafkaToMongoDB"
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
KAFKA_TOPIC = "btc-price-zscore"
MONGO_URI = "mongodb://localhost:27017/"
MONGO_DB_NAME = "btc_data"
SPARK_KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2"

- Spark Session

In [3]:
spark = SparkSession.builder \
    .appName(APP_NAME) \
    .config("spark.jars.packages", SPARK_KAFKA_PACKAGE) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

25/05/01 14:46:51 WARN Utils: Your hostname, acer resolves to a loopback address: 127.0.1.1; using 192.168.1.13 instead (on interface wlp0s20f3)
25/05/01 14:46:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/dang/.ivy2/cache
The jars for the packages stored in: /home/dang/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c18f40e6-c57e-41ad-8213-4255029c2841;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 379ms :: artifacts dl 9ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central in [

- Schema

In [4]:
schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("zscore_data", ArrayType(StructType([
        StructField("window", StringType(), True),
        StructField("zscore_price", FloatType(), True)
    ])), True)
])

- Read from Kafka

In [5]:
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "latest") \
    .load()

- Data Processing

In [6]:
# parse the JSON data from Kafka
json_df = kafka_df.selectExpr("CAST(value AS STRING) as value") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# explode the zscore_data array and convert timestamp to a proper format
expanded_df = json_df.withColumn("zscore", explode("zscore_data")) \
    .withColumn("event_time", to_timestamp("timestamp")) \
    .select(
        col("timestamp"),
        col("symbol"),
        col("zscore.window").alias("window"),
        col("zscore.zscore_price").alias("zscore_price"),
        col("event_time")
    )

- Handling late data (10 second)

In [7]:
watermarked_df = expanded_df.withWatermark("event_time", "10 seconds")

- Write to MongoDB

In [None]:
def write_to_mongo(df, epoch_id):
    client = MongoClient(MONGO_URI)
    db = client[MONGO_DB_NAME]
    for row in df.collect():
        collection_name = f"{KAFKA_TOPIC}-{row['window']}"
        document = {
            "timestamp": row["timestamp"],
            "symbol": row["symbol"],
            "window": row["window"],
            "zscore_price": row["zscore_price"]
        }
        db[collection_name].insert_one(document)
        print(f"[Epoch {epoch_id}] Ghi vào {collection_name}: {document}")

- Run

In [None]:
query = watermarked_df.writeStream \
    .foreachBatch(write_to_mongo) \
    .outputMode("append") \
    .start()

query.awaitTermination()


25/05/01 14:46:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a44b267e-e35b-47bc-80f8-f6bf2cabbe4f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/01 14:46:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/01 14:46:56 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
25/05/01 14:46:56 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
25/05/01 14:46:56 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
25/05/01 14:46:56 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known con

[Epoch 1] Ghi vào btc-price-zscore-30s: {'timestamp': '2025-05-01T07:47:03.941445', 'symbol': 'BTCUSDT', 'window': '30s', 'zscore_price': -0.8019999861717224, '_id': ObjectId('681326f94019f532f7e288af')}
[Epoch 1] Ghi vào btc-price-zscore-1m: {'timestamp': '2025-05-01T07:47:03.941445', 'symbol': 'BTCUSDT', 'window': '1m', 'zscore_price': 1.8880000114440918, '_id': ObjectId('681326f94019f532f7e288b0')}
[Epoch 1] Ghi vào btc-price-zscore-5m: {'timestamp': '2025-05-01T07:47:03.941445', 'symbol': 'BTCUSDT', 'window': '5m', 'zscore_price': -0.006000000052154064, '_id': ObjectId('681326f94019f532f7e288b1')}
[Epoch 1] Ghi vào btc-price-zscore-15m: {'timestamp': '2025-05-01T07:47:03.941445', 'symbol': 'BTCUSDT', 'window': '15m', 'zscore_price': 2.2809998989105225, '_id': ObjectId('681326f94019f532f7e288b2')}
[Epoch 1] Ghi vào btc-price-zscore-30m: {'timestamp': '2025-05-01T07:47:03.941445', 'symbol': 'BTCUSDT', 'window': '30m', 'zscore_price': -0.6499999761581421, '_id': ObjectId('681326f94019

                                                                                

[Epoch 3] Ghi vào btc-price-zscore-30s: {'timestamp': '2025-05-01T07:47:23.942580', 'symbol': 'BTCUSDT', 'window': '30s', 'zscore_price': 2.359999895095825, '_id': ObjectId('6813270c4019f532f7e288bd')}
[Epoch 3] Ghi vào btc-price-zscore-1m: {'timestamp': '2025-05-01T07:47:23.942580', 'symbol': 'BTCUSDT', 'window': '1m', 'zscore_price': 1.4859999418258667, '_id': ObjectId('6813270c4019f532f7e288be')}
[Epoch 3] Ghi vào btc-price-zscore-5m: {'timestamp': '2025-05-01T07:47:23.942580', 'symbol': 'BTCUSDT', 'window': '5m', 'zscore_price': 1.0800000429153442, '_id': ObjectId('6813270c4019f532f7e288bf')}
[Epoch 3] Ghi vào btc-price-zscore-15m: {'timestamp': '2025-05-01T07:47:23.942580', 'symbol': 'BTCUSDT', 'window': '15m', 'zscore_price': -0.8679999709129333, '_id': ObjectId('6813270c4019f532f7e288c0')}
[Epoch 3] Ghi vào btc-price-zscore-30m: {'timestamp': '2025-05-01T07:47:23.942580', 'symbol': 'BTCUSDT', 'window': '30m', 'zscore_price': 1.2239999771118164, '_id': ObjectId('6813270c4019f532f