In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import split, length
from pyspark.sql.types import StringType
from delta import *
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import explode, from_json,size
from pyspark.sql.types import *
import json
from functools import reduce
import os
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import year, month, dayofmonth, to_date

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] ='''
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,org.apache.spark:spark-avro_2.12:3.4.1,io.delta:delta-core_2.12:2.4.0 --conf spark.sql.session.timeZone=GMT pyspark-shell'''

In [15]:
spark_session = SparkSession.builder \
            .appName("read_hdfs") \
            .master("local") \
            .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()

In [16]:
binary_to_int = udf(lambda x: int.from_bytes(x, 'big'), IntegerType())

In [17]:
from confluent_kafka.schema_registry import SchemaRegistryClient
def get_schema_from_id(id: int):
    url = "http://localhost:8081"
    try:
        sr = SchemaRegistryClient({'url': url})
        version = sr.get_schema(id)
        return version.schema_str
    except Exception as e:
        raise e
schema = get_schema_from_id(1)
schema

'{"type":"record","name":"Quote","namespace":"Quote","fields":[{"name":"symbol","type":"string"},{"name":"exchange_name","type":"string"},{"name":"trade_time","type":{"type":"long","logicalType":"timestamp-micros"}},{"name":"ref_price","type":["null","double"],"default":null},{"name":"ceil_price","type":["null","double"],"default":null},{"name":"floor_price","type":["null","double"],"default":null},{"name":"b_qtty_3","type":["null","double"],"default":null},{"name":"b_price_3","type":["null","double"],"default":null},{"name":"b_qtty_2","type":["null","double"],"default":null},{"name":"b_price_2","type":["null","double"],"default":null},{"name":"b_qtty_1","type":["null","double"],"default":null},{"name":"b_price_1","type":["null","double"],"default":null},{"name":"pct_change","type":["null","double"],"default":null},{"name":"last_price","type":["null","double"],"default":null},{"name":"last_qtty","type":["null","double"],"default":null},{"name":"traded_quantity","type":["null","double"]

In [18]:
spark_session.conf.set("spark.sql.session.timeZone", "UTC+7")

In [20]:

df = spark_session\
        .readStream\
        .format("kafka")\
        .option("minPartitions", 1) \
        .option("startingOffsets", "earliest") \
        .option("kafka.bootstrap.servers", "localhost:9092")\
        .option("failOnDataLoss", False)\
        .option("subscribe", "stock")\
        .load() \
        .withColumn('fixedValue', expr("substring(value, 6, length(value)-5)")) \
        .select(col("fixedValue")) \
        .withColumn("res", from_avro(col("fixedValue"), schema, {"datetimeRebaseMode": "CORRECTED", "mode": "PERMISSIVE"})) \
        .select('res.*') \
        .withColumn("timestamp", to_timestamp(col("trade_time"), 'yyyyMMddHHmm')) \
        .withColumn("year", date_format(col("timestamp"), "yyyy")) \
        .withColumn("month", date_format(col("timestamp"), "MM")) \
        .withColumn("day", date_format(col("timestamp"), "dd")) \
        .withColumn("hour", date_format(col("timestamp"), "HH")) \
        .drop("timestamp") \
        .writeStream \
        .format("delta") \
        .partitionBy("symbol","year", "month", "day") \
        .option('checkpointLocation',"/delta/stock1/_checkpoints/") \
        .start("/delta/stock1")

23/11/25 19:08:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/11/25 19:08:11 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 1:>                                                          (0 + 1) / 1]

In [9]:
spark_session.stop()