# DLT pipeline

This Delta Live Tables (DLT) definition is executed using a pipeline

In [0]:
import dlt
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit, col, to_date, trim, lower

# Bronze Layer
@dlt.table(
    comment="Raw Bakery Sales data ingested from Parquet",
    name="bronze.bakery_sales_raw"
)

def bakery_sales_raw():
    file_path = "/FileStore/tables/parq_bakery_sales"
    return spark.read.parquet(file_path)

@dlt.table(
    comment="Raw Bakery Prices data ingested from Delta Table",
    name="bronze.bakery_price_raw"
)

def bakery_price_raw():
    delta_path = "/FileStore/tables/delta_bakery_price"
    return spark.read.format("delta").load(delta_path)





In [0]:
# -------------------------
# Silver Layer - Cleaned with expectations
# -------------------------

@dlt.table(
    comment="Silver layer for validated Bakery Sales",
    name="silver.silver_sales"
    )
@dlt.expect_or_drop("valid_price", "total IS NOT NULL AND total > 0")
@dlt.expect_or_drop("valid_date", "datetime IS NOT NULL")
def silver_sales():
    df = dlt.read("bakery_sales_raw")
    return df.withColumn("Date", to_date("datetime", "MM/dd/yyyy"))

@dlt.table(
    comment="Silver layer for validated Bakery Price",
    name="silver.silver_price"
    )
@dlt.expect_or_drop("valid_price", "price IS NOT NULL AND price > 0")
def silver_price():
    df = dlt.read("bakery_price_raw")
    return df

# -------------------------
# Capture Dropped Rows (Invalids)
# -------------------------

total_col = col("total").cast("double")
@dlt.table(
    comment="Invalid Bakery Sales rows",
    name="invalid_data.invalid_sales"
    )
def invalid_sales():
    return dlt.read("bakery_sales_raw").filter(
    (~col("total").isNotNull()) | (total_col <= 0) | (~col("datetime").isNotNull())
)

price_col = col("price").cast("double")
@dlt.table(
    comment="Invalid Bakery Price rows",
    name="invalid_data.invalid_price"
    )
def invalid_price():
    return dlt.read("bakery_price_raw").filter(
    (~col("price").isNotNull()) | (price_col <= 0)
)

# -------------------------
# Gold Layer - Joined & Aggregated
# -------------------------

@dlt.table(
    comment="Join only angbutter sales with price",
    name="gold.angbutter_sales"
    )
def angbutter_sales():
    sales = dlt.read("silver.silver_sales")
    prices = dlt.read("silver.silver_price")

    # Filter only angbutter column is not null
    ang_sales = sales.filter(col("angbutter").isNotNull())

    # Get angbutter price
    ang_price_df = prices.filter(lower(trim(col("Name"))) == "angbutter").select("price")

    # Cross join with price
    result = (
        ang_sales.crossJoin(ang_price_df)
        .withColumn("price", col("price").cast("int"))
        .withColumn("revenue", col("angbutter") * col("price"))
        .select("datetime", "day_of_week", "total", "angbutter", "price", "revenue")
    )

    return result