In [0]:
import dlt
from pyspark.sql.functions import col, regexp_replace, to_date, year

# -------------------------------------------------
# 1. Bronze: Streaming Table (Auto Loader ingestion)
# -------------------------------------------------
@dlt.table(
    name="bronze_sales",
    comment="Raw sales data ingested via Auto Loader",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.zOrderCols": "order_date"
    }
)
def bronze():
    return (spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", "csv")
                  .option("header", "true")
                  .load("/Volumes/sivaadbuc/default/batch16v1/")
                  .withColumnRenamed("Sales Person", "sales_person")
                  .withColumnRenamed("Boxes Shipped", "boxes_shipped"))
    

# -------------------------------------------------
# 2. Silver: Table with Data Quality Expectations
# -------------------------------------------------
@dlt.table(
    name="silver_sales",
    comment="Cleaned sales data with enforced schema and rules",
    table_properties={"quality": "silver"}
)
@dlt.expect("valid_amount", "amount > 0")                 # Keep row but log invalid
@dlt.expect_or_drop("non_null_country", "Country IS NOT NULL")   # Drop invalid rows
@dlt.expect_or_fail("valid_year", "year(order_date) >= 2000")    # Fail pipeline if invalid
def silver():
    bronze_df = dlt.read_stream("bronze_sales")
    return (bronze_df
              .withColumn("order_date", to_date(col("Date"), "dd-MMM-yy"))
              .withColumn("amount", regexp_replace(col("Amount"), "[$,]", "").cast("double"))
              .withColumn("boxes_shipped", col("boxes_shipped").cast("int"))
              .dropna(subset=["amount"]))


# -------------------------------------------------
# 3. Gold: Batch Table (Aggregations)
# -------------------------------------------------
@dlt.table(
    name="gold_sales",
    comment="Aggregated sales KPIs by country and year",
    table_properties={"quality": "gold"}
)
def gold():
    silver_df = dlt.read("silver_sales")
    return (silver_df.groupBy("Country", year(col("order_date")).alias("year"))
                     .agg(
                         {"amount": "sum", "boxes_shipped": "sum"}
                     )
                     .withColumnRenamed("sum(amount)", "total_sales")
                     .withColumnRenamed("sum(boxes_shipped)", "total_boxes"))


# -------------------------------------------------
# 4. View: Logical-only layer (not materialized)
# -------------------------------------------------
@dlt.view(
    name="gold_sales_view",
    comment="Logical view of gold sales table for ad-hoc queries"
)
def gold_view():
    return dlt.read("gold_sales").filter(col("total_sales") > 10000)


# -------------------------------------------------
# 5. Streaming Materialized View (continuous query)
# -------------------------------------------------
@dlt.table(
    name="gold_sales_streaming_mv",
    comment="Streaming materialized view on gold sales for near real-time queries",
    temporary=False   # default is persistent
)
def gold_streaming_mv():
    return dlt.read_stream("silver_sales").groupBy("Country").sum("amount")
