In [None]:
import dlt
import pyspark.pandas as ps
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
service_credential = dbutils.secrets.get(scope="databricksscopename",key="databricksservicekey")

spark.conf.set("fs.azure.account.auth.type.adlsx5u224gv3xgd2.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.adlsx5u224gv3xgd2.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.adlsx5u224gv3xgd2.dfs.core.windows.net", "5c962712-e111-48b0-b9af-41f03175e216")
spark.conf.set("fs.azure.account.oauth2.client.secret.adlsx5u224gv3xgd2.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.adlsx5u224gv3xgd2.dfs.core.windows.net", "https://login.microsoftonline.com/72f988bf-86f1-41af-91ab-2d7cd011db47/oauth2/token")

In [None]:
EH_NS_NAME = "streamdata-84d2c3-ns"
BOOTSTRAP_SERVERS = f"{EH_NS_NAME}.servicebus.windows.net:9093"
SAKEY = "MjLN4fNXCODO9T/eNCXJObufnUu8yIXoA+AEhNzpTXQ="
SANAME = "rule"
EH_SASL = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://{EH_NS_NAME}.servicebus.windows.net/;SharedAccessKeyName={SANAME};SharedAccessKey={SAKEY}\";"

In [None]:
@dlt.table(
 path = "abfss://data@adlsx5u224gv3xgd2.dfs.core.windows.net/retail_org_dlt/Bronze/sales_order_stream"
 )
def bronze_sales_order_stream():
    df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
            .option("subscribe", EH_NS_NAME) \
            .option("kafka.sasl.mechanism", "PLAIN") \
            .option("kafka.security.protocol", "SASL_SSL") \
            .option("kafka.sasl.jaas.config", EH_SASL) \
            .option("kafka.request.timeout.ms", "60000") \
            .option("kafka.session.timeout.ms", "60000") \
            .option("startingOffsets", "earliest") \
            .option("failOnDataLoss", "false") \
            .load()

    sales_orders_df = df.select("value").withColumn("value", col("value").cast("string"))

    return sales_orders_df

In [None]:
sales_orders_schema = StructType(
    [
        StructField(
            "clicked_items", ArrayType(ArrayType(StringType(), True), True), True
        ),
        StructField("customer_id", LongType(), True),
        StructField("customer_name", StringType(), True),
        StructField("number_of_line_items", LongType(), True),
        StructField("order_datetime", StringType(), True),
        StructField("order_number", LongType(), True),
        StructField(
            "ordered_products",
            ArrayType(
                StructType(
                    [
                        StructField("id", StringType(), True),
                        StructField("qty", IntegerType(), True),
                        StructField("curr", StringType(), True),
                        StructField("name", StringType(), True),
                        StructField("unit", StringType(), True),
                        StructField("price", IntegerType(), True),
                        StructField(
                            "promotion_info",
                            StructType(
                                [
                                    StructField("promo_id", IntegerType(), True),
                                    StructField("promo_qty", IntegerType(), True),
                                    StructField("promo_disc", DecimalType(3, 2), True),
                                    StructField("promo_item", StringType(), True),
                                ]
                            ),
                            True,
                        ),
                    ]
                ),
                True,
            ),
            True,
        ),
        StructField(
            "promo_info",
            ArrayType(
                StructType(
                    [
                        StructField("promo_id", IntegerType(), True),
                        StructField("promo_qty", IntegerType(), True),
                        StructField("promo_disc", DecimalType(3, 2), True),
                        StructField("promo_item", StringType(), True),
                    ]
                ),
                True,
            ),
            True,
        ),
    ]
  )


@dlt.table(
 path = "abfss://data@adlsx5u224gv3xgd2.dfs.core.windows.net/retail_org_dlt/Silver/sales_order_stream"
 )
def silver_sales_order_stream():
    return (
        dlt.read_stream("bronze_sales_order_stream")
        .select("*")
        .withColumn("value", regexp_replace("value", '"\\[', "["))
        .withColumn("value", regexp_replace("value", '\\]"', "]"))
        .withColumn("value", regexp_replace("value","\\\\", ""))
        .withColumn("value", expr("substring(value, 2, length(value))"))
        .withColumn("value", expr("substring(value, 1, length(value)-1)"))
        .select(from_json(col("value"), sales_orders_schema).alias("row"))
        .select("row.*")
        .withColumn("ordered_products", explode("ordered_products"))
        .withColumn("order_datetime", from_unixtime("order_datetime"))
        .withColumn("product_id", col("ordered_products").id)
        .withColumn("unit_price", col("ordered_products").price)
        .withColumn("quantity", col("ordered_products").qty)
    )

In [None]:
# merge sales_order_cleansed_stream_dlt with sales_order_cleansed_dlt

In [None]:
@dlt.table(
 path = "abfss://data@adlsx5u224gv3xgd2.dfs.core.windows.net/retail_org_dlt/Gold/fact_sales_orders_stream"
 )
def fact_sales_orders_stream():
    s = dlt.read_stream("silver_sales_order_stream").alias("s")
    p = spark.read.format("delta").table("retail_org_dlt.dim_products").alias("p")
    c = spark.read.format("delta").table("retail_org_dlt.dim_customers").alias("c")

    df1 = s.join(p, s.product_id == p.product_id, "inner") \
            .join(c, s.customer_id == c.customer_id, "inner") \
            .select(
                "s.order_number",
                "c.customer_key",
                "p.product_key",
                col("s.order_datetime").cast("date").alias("order_date"),
                "s.unit_price",
                "s.quantity",
                expr("s.unit_price * s.quantity").alias("total_price"),
            )
    
    return df1

In [None]:
@dlt.table(
 path = "abfss://data@adlsx5u224gv3xgd2.dfs.core.windows.net/retail_org_dlt/Gold/fact_customer_sales_stream"
 )
def fact_customer_sales_stream():
    s = dlt.read_stream("silver_sales_order_stream").alias("s")
    p = spark.read.format("delta").table("retail_org_dlt.dim_products").alias("p")
    c = spark.read.format("delta").table("retail_org_dlt.dim_customers").alias("c")

    df2 = s.join(p, s.product_id == p.product_id, "inner") \
            .join(c, s.customer_id == c.customer_id, "inner") \
            .groupBy("c.customer_key", "p.product_key") \
            .agg(
                sum("quantity").alias("total_quantity"),
                sum(expr("s.unit_price * s.quantity")).alias("sale_amount"),
            )
    
    return df2