In [0]:
%pip install dlt
%pip install confluent_kafka
%pip install requests
%pip install confluent_kafka
%pip install azure-eventhub

In [0]:
import dlt
from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition
import json  # JSON module to deserialize the data
import pandas as pd  # Pandas for DataFrame
import time
import datetime
import requests
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, ArrayType
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, ArrayType
import dlt
import pyspark.sql.functions as F
from pyspark.sql.functions import explode


In [0]:
# Define inner schema for results array
inner_schema = StructType([
    StructField("v", DoubleType(), True),   # volume traded
    StructField("vw", DoubleType(), True),  # volume weighted avg price (not used)
    StructField("o", DoubleType(), True),   # open price
    StructField("c", DoubleType(), True),   # close price
    StructField("h", DoubleType(), True),   # high price
    StructField("l", DoubleType(), True),   # low price
    StructField("t", LongType(), True),     # trade date (Unix timestamp)
    StructField("n", LongType(), True)      # number of transactions (not used)
])

# Outer schema
schema = StructType([
    StructField("ticker", StringType(), True),
    StructField("request_id", StringType(), True),
    StructField("results", ArrayType(inner_schema), True)
])

# ------------------- Bronze Layer -------------------
@dlt.table(
    comment="Bronze layer - raw stock data from Azure Event Hub via Kafka",
    table_properties={"quality": "bronze"}
)
def bronze_stock_data_test_Sultaan():
    kafka_bootstrap_servers = ""
    topic_name = ""
    connection_string = ""

    kafka_df = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
        .option("subscribe", topic_name)
        .option("startingOffsets", "latest")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config", 
                f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection_string}";')
        .load()
    )

    # Parse JSON
    kafka_df = kafka_df.selectExpr("CAST(value AS STRING) as json_value", "timestamp as kafka_timestamp")

    json_df = kafka_df.select(from_json(col("json_value"), schema).alias("data"), "kafka_timestamp") \
        .selectExpr(
            "data.ticker as stock_symbol",
            "data.request_id as request_id",
            "data.results[0].t as trade_date",
            "data.results[0].o as open_price",
            "data.results[0].h as high_price",
            "data.results[0].l as low_price",
            "data.results[0].c as close_price",
            "data.results[0].v as volume_traded",
            "kafka_timestamp"
        )

    return json_df


In [0]:
# Silver Layer
@dlt.table(
    comment="Silver layer - cleansed stock data",
    table_properties={"quality": "silver"}
)
def silver_stock_data_testing_Sultaan():
    return (
        dlt.read("bronze_stock_data_test_Sultaan")
        .filter("stock_symbol IS NOT NULL AND trade_date IS NOT NULL")
        .filter("open_price IS NOT NULL AND high_price IS NOT NULL AND low_price IS NOT NULL AND close_price IS NOT NULL")
        .filter("volume_traded > 0")
        .filter("kafka_timestamp IS NOT NULL")
        .select(
            "stock_symbol",
            "request_id",
            "trade_date",
            "open_price",
            "high_price",
            "low_price",
            "close_price",
            "volume_traded",
            "kafka_timestamp"
        )
    )


In [0]:
# Gold Layer
@dlt.table(
    comment="Gold layer - aggregated stock data for reporting",
    table_properties={"quality": "gold"}
)
def gold_stock_data_testing_Sultaan():
    return (
        dlt.read("silver_stock_data_testing_Sultaan")
        .groupBy("stock_symbol")
        .agg(
            F.sum("volume_traded").alias("total_volume_traded"),
            F.avg("close_price").alias("avg_close_price"),
            F.max("high_price").alias("max_high_price"),
            F.min("low_price").alias("min_low_price"),
            F.max("request_id").alias("latest_request_id"),
            F.max("kafka_timestamp").alias("latest_timestamp")
        )
    )