In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import dlt
import re


# Bronze Tables from raw data

In [0]:
# 1️⃣ Your ADLS Gen2 container details
storage_account = "ankurdataenggstorage"
container       = "shopify"
adls_base_path  = f"abfss://{container}@{storage_account}.dfs.core.windows.net"

# 2️⃣ List all folders under the container root
folder_infos = dbutils.fs.ls(adls_base_path)
folders      = [fi.name.rstrip("/") for fi in folder_infos if fi.isDir()]

# 3️⃣ For each folder, emit & exec a @dlt.table function
for folder in folders:
    csv_path = f"{adls_base_path}/{folder}/*.csv"

    code = f'''
@dlt.table(
    name="{folder}",
    comment="Auto-generated table for folder `{folder}`",
    table_properties={{
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "bronze"
    }}
)
def {folder}():
    return (
        spark.read
            .option("header", "true")
            .option("inferSchema", "true")
            .csv("{csv_path}")
    )
    '''

    exec(code, globals())


Name,Type
Product_Id,string
Title,string
Body_Html,string
Vendor,string
Product_Type,string
Handle,string
Created_At,timestamp
Updated_At,timestamp
Published_At,timestamp
Template_Suffix,string


# Silver tables after cleaning and transformation


### Customers

In [0]:
@dlt.table(
    name="customer_silver",
    comment="Silver customers table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "silver"
    }
    
)
def customer_silver():
    return (
        # 1️⃣ Read the bronze table
        dlt.read("Customers")
        
        # 2️⃣ Select only the columns we need
        .select(
            F.col("Customer_Id").alias("customer_id"),
            F.to_date("Created_At").alias("created_date"),
            F.trim(F.lower("Email")).alias("email"),
            F.col("First_Name").alias("first_name"),
            F.col("Last_Name").alias("last_name"),
            F.col("Orders_Count").cast(IntegerType()).alias("orders_count"),
            F.col("Total_Spent").cast(DoubleType()).alias("total_spent"),
            F.col("State").alias("state"),
            F.col("Tags").alias("tags")
        )
        
        # 3️⃣ Derive avg_order_value, guard against zero orders
        .withColumn(
            "avg_order_value",
            F.when(F.col("orders_count") > 0,
                   F.col("total_spent") / F.col("orders_count")
            ).otherwise(F.lit(None))
        )
        
        # 4️⃣ Optional: filter out test or disabled customers
        .filter(F.col("State") != "disabled")
        
        # 5️⃣ Add an ingestion timestamp
        .withColumn("ingested_at", F.current_timestamp())
    )

Name,Type
customer_id,bigint
created_date,date
email,string
first_name,string
last_name,string
orders_count,int
total_spent,double
state,string
tags,string
avg_order_value,double


In [0]:
@dlt.table(
    name="customer_addresses_silver",
    comment="Silver customer addresses table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "bronze"
    }
)
def customer_addresses_silver():
    return (
        dlt.read("Customer_Addresses")
        
        # Select only analytics-relevant fields, skipping any that are null/blank in the source
        .select(
            "Customer_Address_Id",
            "Customer_Id",
            "Address1",
            "Address2",
            "City",
            "Province",
            "Country",
            "Zip"
        )
        .withColumn("customer_address_id", F.col("Customer_Address_Id").cast("integer"))
        .withColumn("customer_id",F.col("Customer_Id").cast("integer"))
        
        # Add ingestion timestamp
        .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
customer_address_id,int
customer_id,int
Address1,string
Address2,string
City,string
Province,string
Country,string
Zip,string
ingested_at,timestamp


### Orders

In [0]:
@dlt.table(
    name="orders_silver",
    comment="Silver orders table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "silver"
    }
)
def orders_silver():
    return (
        dlt.read("Orders")
            # select only analytics-relevant fields
            .select(
                F.col("Order_Id").alias("order_id"),
                F.to_date("Created_At").alias("order_date"),
                F.col("Customer_Id").alias("customer_id"),
                F.col("Email").alias("customer_email"),
                F.col("Financial_Status").alias("financial_status"),
                F.col("Fulfillment_Status").alias("fulfillment_status"),
                F.col("Subtotal_Price").cast("double").alias("subtotal_price"),
                F.col("Total_Discounts").cast("double").alias("total_discounts"),
                F.col("Total_Tax").cast("double").alias("total_tax"),
                F.col("Total_Price").cast("double").alias("total_price"),
                F.col("Currency").alias("currency"),
                F.col("Source_Name").alias("order_source"),
                F.to_timestamp("Processed_At").alias("processed_at")
            )
            # remove any rows without an order_id
            .filter(F.col("order_id").isNotNull())
            # derive a simple order value metric
            .withColumn(
                "net_order_value",
                F.col("subtotal_price") - F.col("total_discounts") + F.col("total_tax")
            )
            # add ingestion timestamp
            .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
order_id,bigint
order_date,date
customer_id,bigint
customer_email,string
financial_status,string
fulfillment_status,string
subtotal_price,double
total_discounts,double
total_tax,double
total_price,double


In [0]:
@dlt.table(
    name="order_line_items_silver",
    comment="Silver order line items table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "silver"
    }
)
def order_line_items_silver():
    return (
        dlt.read("Order_Line_Items")
            # pick only analytics-relevant, non-null columns
            .select(
                F.col("Order_Id").alias("order_id"),
                F.col("Order_Line_Items_Id").alias("line_item_id"),
                F.col("Order_Line_Items_Product_Id").alias("product_id"),
                F.col("Order_Line_Items_Variant_Id").alias("variant_id"),
                F.col("Order_Line_Items_Quantity").cast("int").alias("quantity"),
                F.col("Order_Line_Items_Price").cast("double").alias("unit_price"),
                F.col("Order_Line_Items_Total_Discount").cast("double").alias("total_discount"),
                F.col("Order_Line_Items_Taxable").alias("taxable"),
                F.col("Order_Line_Items_Requires_Shipping").alias("requires_shipping")
            )
            # drop any rows missing the primary key
            .filter(F.col("line_item_id").isNotNull())
            # derive total line value after discount
            .withColumn(
                "line_total",
                F.expr("quantity * unit_price - total_discount")
            )
            # stamp ingestion time
            .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
order_id,string
line_item_id,string
product_id,string
variant_id,bigint
quantity,int
unit_price,double
total_discount,double
taxable,boolean
requires_shipping,boolean
line_total,double


### Products

In [0]:
@dlt.table(
    name="products_silver",
    comment="Silver products table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "silver"
    }
)
def products_silver():
    return (
        dlt.read("Products")
            # select only analytics-relevant, non-null columns
            .select(
                F.col("Product_Id").alias("product_id"),
                F.col("Title").alias("title"),
                F.col("Body_Html").alias("description"),
                F.col("Vendor").alias("vendor"),
                F.col("Product_Type").alias("product_type"),
                F.col("Handle").alias("handle"),
                F.to_timestamp("Created_At").alias("created_at"),
                F.to_timestamp("Updated_At").alias("updated_at"),
                F.to_timestamp("Published_At").alias("published_at"),
                F.col("Tags").alias("tags"),
                F.col("Image_Source").alias("image_url"),
                F.to_timestamp("Image_Created_At").alias("image_created_at"),
                F.to_timestamp("Image_Updated_At").alias("image_updated_at")
            )
            # drop rows without a product_id or title
            .filter(
                F.col("product_id").isNotNull() & 
                F.col("title").isNotNull()
            )
            # add ingestion timestamp
            .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
product_id,string
title,string
description,string
vendor,string
product_type,string
handle,string
created_at,timestamp
updated_at,timestamp
published_at,timestamp
tags,string


In [0]:
@dlt.table(
    name="product_options_silver",
    comment="Silver product options table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "silver"
    }
)
def product_options_silver():
    return (
        dlt.read("Product_Options")
            # pick only the key fields for analytics
            .select(
                F.col("Product_Id").cast("long").alias("product_id"),
                F.col("Product_Option_Id").cast("long").alias("option_id"),
                F.col("Product_Option_Name").alias("option_name"),
                F.col("Product_Option_Position").cast("int").alias("position")
            )
            # drop rows missing critical identifiers
            .filter(
                F.col("product_id").isNotNull() &
                F.col("option_id").isNotNull() &
                F.col("option_name").isNotNull()
            )
            # add ingestion timestamp
            .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
product_id,bigint
option_id,bigint
option_name,string
position,int
ingested_at,timestamp


In [0]:
@dlt.table(
    name="product_option_values_silver",
    comment="Silver product option values table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "silver"
    }
)
def product_option_values_silver():
    return (
        dlt.read("Product_Option_Values")
            # select only the key fields for analytics
            .select(
                F.col("Product_Id").cast("long").alias("product_id"),
                F.col("Options_Index").cast("int").alias("option_index"),
                F.col("Product_Option_Value_Index").cast("int").alias("value_index"),
                F.col("Product_Option_Values").alias("value")
            )
            # drop rows missing critical identifiers or values
            .filter(
                F.col("product_id").isNotNull() &
                F.col("value_index").isNotNull() &
                F.col("value").isNotNull()
            )
            # add ingestion timestamp
            .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
product_id,bigint
option_index,int
value_index,int
value,string
ingested_at,timestamp


In [0]:
@dlt.table(
    name="product_variants_silver",
    comment="Silver product variants table: cleaned and enriched for analytics",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "silver"
    }
)
def product_variants_silver():
    return (
        dlt.read("Product_Variants")
            # select only analytics-relevant, non-null columns
            .select(
                F.col("Product_Variant_Id").cast("long").alias("variant_id"),
                F.col("Product_Id").cast("long").alias("product_id"),
                F.col("Price").cast("double").alias("price"),
                F.col("Compare_At_Price").cast("double").alias("compare_at_price"),
                F.col("Inventory_Quantity").cast("int").alias("inventory_quantity"),
                F.col("Old_Inventory_Quantity").cast("int").alias("old_inventory_quantity"),
                F.col("Inventory_Quantity_Adjustment").cast("int").alias("inventory_adjustment"),
                F.col("Inventory_Policy").alias("inventory_policy"),
                F.col("Requires_Shipping").alias("requires_shipping"),
                F.col("Taxable").alias("taxable"),
                F.col("Sku").alias("sku"),
                F.col("Grams").cast("int").alias("grams"),
                F.col("Weight").cast("double").alias("weight"),
                F.col("Weight_Unit").alias("weight_unit"),
                F.to_timestamp("Created_At").alias("created_at"),
                F.to_timestamp("Updated_At").alias("updated_at"),
                F.col("Position").cast("int").alias("position"),
                F.col("Title").alias("variant_title")
            )
            # drop any rows missing the primary key
            .filter(F.col("variant_id").isNotNull())
            # derive a simple price per gram metric
            .withColumn(
                "price_per_gram",
                F.when(F.col("grams") > 0, F.col("price") / F.col("grams")).otherwise(None)
            )
            # stamp ingestion time
            .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
variant_id,bigint
product_id,bigint
price,double
compare_at_price,double
inventory_quantity,int
old_inventory_quantity,int
inventory_adjustment,int
inventory_policy,string
requires_shipping,boolean
taxable,boolean


# Gold layer tables for business use case & analytics

In [0]:
@dlt.table(
    name="customer_rfm_gold",
    comment="Customer Recency, Frequency, Monetary segmentation with descriptive segments",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def customer_rfm_gold():
    orders = dlt.read("orders_silver")
    cust   = dlt.read("customer_silver")

    # base RFM calculation
    rfm = (
        orders
            .groupBy("customer_id")
            .agg(
                F.max("order_date").alias("last_order_date"),
                F.count("*").alias("frequency"),
                F.sum("total_price").alias("monetary")
            )
            .join(cust.select("customer_id"), "customer_id")
            .withColumn("recency_days", F.datediff(F.current_date(), F.col("last_order_date")))
    )

    # compute quintile scores
    window_desc = Window.orderBy(F.col("recency_days").desc())
    window_asc  = Window.orderBy(F.col("frequency").desc())
    window_mon  = Window.orderBy(F.col("monetary").desc())

    scored = (
        rfm
            .withColumn("R_Score",    F.ntile(5).over(window_desc))
            .withColumn("F_Score",    F.ntile(5).over(window_asc))
            .withColumn("M_Score",    F.ntile(5).over(window_mon))
    )

    # assign human-readable segment names
    with_segments = scored.withColumn(
        "segment_name",
        F.when(
            (F.col("R_Score") >= 4) & (F.col("F_Score") >= 4) & (F.col("M_Score") >= 4),
            "Champions"
        ).when(
            (F.col("R_Score") >= 4) & (F.col("F_Score") >= 4),
            "Loyal Customers"
        ).when(
            (F.col("R_Score") >= 4),
            "Recent Customers"
        ).when(
            (F.col("M_Score") >= 4),
            "Big Spenders"
        ).when(
            (F.col("F_Score") >= 4),
            "Frequent Buyers"
        ).otherwise("Others")
    )

    return (
        with_segments
            .withColumn("RFM_Segment", F.concat_ws("", "R_Score", "F_Score", "M_Score"))
            .withColumn("ingested_at",  F.current_timestamp())
    )




Name,Type
customer_id,bigint
last_order_date,date
frequency,bigint
monetary,double
recency_days,int
R_Score,int
F_Score,int
M_Score,int
segment_name,string
RFM_Segment,string


In [0]:
@dlt.table(
    name="customer_segmentation_summary_gold",
    comment="Summary of customer counts and RFM averages by segment",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def customer_segmentation_summary_gold():
    rfm = dlt.read("customer_rfm_gold")
    return (
        rfm
            .groupBy("segment_name")
            .agg(
                F.countDistinct("customer_id").alias("customer_count"),
                F.avg("recency_days").alias("avg_recency_days"),
                F.avg("frequency").alias("avg_frequency"),
                F.avg("monetary").alias("avg_monetary")
            )
            .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
segment_name,string
customer_count,bigint
avg_recency_days,double
avg_frequency,double
avg_monetary,double
ingested_at,timestamp


In [0]:
@dlt.table(
    name="sales_summary_gold",
    comment="Daily and monthly sales KPIs by currency",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def sales_summary_gold():
    orders = dlt.read("orders_silver")
    lines  = dlt.read("order_line_items_silver")

    # Join orders to line items, compute line_total then aggregate daily
    sales = (
        lines
          .join(orders, "order_id")
          .groupBy(
              F.to_date("order_date").alias("date"),
              "currency"
          )
          .agg(
              F.sum("line_total").alias("daily_revenue"),
              F.countDistinct("order_id").alias("daily_orders"),
              F.avg("line_total").alias("avg_line_value")
          )
    )

    # Add month and run time
    return (
        sales
          .withColumn("month", F.date_format("date","yyyy-MM"))
          .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
date,date
currency,string
daily_revenue,double
daily_orders,bigint
avg_line_value,double
month,string
ingested_at,timestamp


In [0]:
@dlt.table(
    name="order_analytics_gold",
    comment="Order analytics: performance by source, financial & fulfillment status",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def order_analytics_gold():
    orders = dlt.read("orders_silver")

    return (
        orders
          .groupBy(
              F.col("order_source"),
              F.col("financial_status"),
              F.col("fulfillment_status"),
              F.col("currency")
          )
          .agg(
              F.count(F.col("order_id")).alias("order_count"),
              F.sum(F.col("net_order_value")).alias("total_net_revenue"),
              F.avg(F.col("net_order_value")).alias("avg_net_order_value"),
              F.min(F.col("order_date")).alias("first_order_date"),
              F.max(F.col("order_date")).alias("last_order_date")
          )
          .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
order_source,int
financial_status,string
fulfillment_status,string
currency,string
order_count,bigint
total_net_revenue,double
avg_net_order_value,double
first_order_date,date
last_order_date,date
ingested_at,timestamp


# Dimensional Data Modeling

In [0]:
# Dimension: Customer
@dlt.table(
    name="dim_customer",
    comment="Customer dimension with RFM segmentation",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def dim_customer():
    return (
        dlt.read("customer_rfm_gold")
          .select(
              F.col("customer_id"),
              F.col("segment_name"),
              F.col("R_Score").alias("rfm_recency_score"),
              F.col("F_Score").alias("rfm_frequency_score"),
              F.col("M_Score").alias("rfm_monetary_score"),
              F.col("RFM_Segment").alias("rfm_segment_code")
          )
          .dropDuplicates(["customer_id"])
    )







Name,Type
customer_id,bigint
segment_name,string
rfm_recency_score,int
rfm_frequency_score,int
rfm_monetary_score,int
rfm_segment_code,string


In [0]:
# Dimension: Date
@dlt.table(
    name="dim_date",
    comment="Date dimension for reporting",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def dim_date():
    return (
        dlt.read("sales_summary_gold")
          .select(F.col("date"))
          .dropDuplicates(["date"])
          .withColumn("day",     F.dayofmonth("date"))
          .withColumn("month",   F.month("date"))
          .withColumn("year",    F.year("date"))
          .withColumn("weekday", F.date_format("date","E"))
    )



Name,Type
date,date
day,int
month,int
year,int
weekday,string


In [0]:
# Dimension: Channel
@dlt.table(
    name="dim_channel",
    comment="Sales channel/source dimension",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def dim_channel():
    return (
        dlt.read("order_analytics_gold")
          .select(F.col("order_source").alias("channel"))
          .dropDuplicates(["channel"])
    )



Name,Type
channel,int


In [0]:
# Fact: Sales Summary (grain = date × currency)
@dlt.table(
    name="fact_sales_summary",
    comment="Fact table of daily sales KPIs",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def fact_sales_summary():
    return (
        dlt.read("sales_summary_gold")
          .select(
              F.col("date"),
              F.col("currency"),
              F.col("daily_revenue"),
              F.col("daily_orders"),
              F.col("avg_line_value")
          )
          .withColumn("ingested_at", F.current_timestamp())
    )



Name,Type
date,date
currency,string
daily_revenue,double
daily_orders,bigint
avg_line_value,double
ingested_at,timestamp


In [0]:
# Fact: Order Analytics (grain = source × financial_status × fulfillment_status × currency)
@dlt.table(
    name="fact_order_metrics",
    comment="Fact table of order counts & revenue by channel and status",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        "quality": "gold"
    }
)
def fact_order_metrics():
    return (
        dlt.read("order_analytics_gold")
          .select(
              F.col("order_source").alias("channel"),
              F.col("financial_status"),
              F.col("fulfillment_status"),
              F.col("currency"),
              F.col("order_count"),
              F.col("total_net_revenue"),
              F.col("avg_net_order_value"),
              F.col("first_order_date"),
              F.col("last_order_date")
          )
          .withColumn("ingested_at", F.current_timestamp())
    )


Name,Type
channel,int
financial_status,string
fulfillment_status,string
currency,string
order_count,bigint
total_net_revenue,double
avg_net_order_value,double
first_order_date,date
last_order_date,date
ingested_at,timestamp
