In [0]:
from pyspark.sql.functions import col, sum, count, date_format, countDistinct, avg, expr
import dlt

In [0]:
@dlt.table(
    name="maven_uc.gold_dlt.agg_store_performance",
    comment="Daily revenue and volume by store."
)
def agg_store_performance():
    return (
        dlt.read("maven_uc.gold_dlt.fact_sales")
        .groupBy("store_id", "sales_date")
        .agg(
            sum(col("revenue").cast("double")).alias("daily_revenue"),
            sum("quantity").alias("units_sold"),
            countDistinct("order_id").alias("transaction_count")
        )
    )



In [0]:
@dlt.table(
    name="maven_uc.gold_dlt.agg_product_profitability",
    comment="Analysis of product performance vs cost."
)
def agg_product_profitability():
    sales = dlt.read("maven_uc.gold_dlt.fact_sales")
    prods = dlt.read("maven_uc.gold_dlt.dim_products")
    
    return (
        sales.join(prods, "product_id")
        .groupBy("product_id", "product_name", "product_brand")
        .agg(
            sum(col("revenue").cast("double")).alias("total_revenue"),
            sum(col("quantity") * col("product_cost")).alias("total_cost"),
            (sum("revenue") - sum(col("quantity") * col("product_cost"))).alias("total_profit")
        )
    )

In [0]:
@dlt.table(name="maven_uc.gold_dlt.agg_region_monthly_profitability")
def agg_region_monthly_profitability():
    sales = dlt.read("maven_uc.gold_dlt.fact_sales")
    stores = dlt.read("maven_uc.gold_dlt.dim_stores")
    regions = dlt.read("maven_uc.gold_dlt.dim_regions")
    prods = dlt.read("maven_uc.gold_dlt.dim_products")
    
    return (
        sales.join(stores, "store_id")
             .join(regions, "region_id")
             .join(prods, "product_id")
             .groupBy("sales_region", "sales_district", "sales_date")
             .agg(
                 sum("revenue").cast("double").alias("total_revenue"),
                 sum(col("quantity") * col("product_cost")).alias("total_cost")
             )
             .withColumn("total_profit", col("total_revenue") - col("total_cost"))
    )

In [0]:
@dlt.table(
    name="maven_uc.gold_dlt.agg_customer_value",
    comment="Aggregate customer value: total revenue, order count, and average order value per customer."
)
def agg_customer_value():
    sales = dlt.read("maven_uc.gold_dlt.fact_sales")
    return (
        sales.groupBy("customer_id")
        .agg(
            sum(col("revenue").cast("double")).alias("total_revenue"),
            countDistinct("order_id").alias("order_count"),
            (sum(col("revenue").cast("double")) / countDistinct("order_id")).alias("avg_order_value")
        )
    )

In [0]:
@dlt.table(
    name="maven_uc.gold_dlt.inventory_health",
    comment="Inventory health: current stock, products below threshold, and days of inventory remaining per product."
)
def inventory_health():
    inventory = dlt.read("maven_uc.gold_dlt.fact_inventory_events")
    prods = dlt.read("maven_uc.gold_dlt.dim_products")
    joined_df = inventory.join(prods, "product_id")
    result = (
        joined_df
        .groupBy(
            "product_id",
            "product_name",
            "product_brand"
        )
        .agg(
            sum(col("quantity_remaining")).alias("total_stock"),
            sum(
                expr("CASE WHEN quantity_remaining < 10 THEN 1 ELSE 0 END")
            ).alias("below_threshold_count")
        )
    )

    return result