
# Calenders

In [0]:
from pyspark.sql.functions import col, year, month, quarter, dayofmonth, to_date, trim
@dlt.table(
    name="maven_uc.silver_dlt.calenders_silver",
    comment="Conformed calendar dimension"
)
#@dlt.expect_or_drop("valid_date", "date IS NOT NULL")
def calenders():
    df = dlt.read("maven_uc.bronze_dlt.calenders")
    return (
        df
        .withColumn("date", to_date(trim(col("date")), "M/d/yyyy"))
        .withColumn("day", dayofmonth(col("date")))
        .withColumn("month", month(col("date")))
        .withColumn("quarter", quarter(col("date")))
        .withColumn("year", year(col("date")))
        .select("date", "day", "month", "quarter", "year")
    )


# Inventory events

In [0]:
@dlt.table(
    name="maven_uc.silver_dlt.inventory_events_silver",
    comment="Inventory restock events"
)
@dlt.expect_or_drop("valid_product", "product_id IS NOT NULL")
@dlt.expect_or_drop("valid_store", "store_id IS NOT NULL")
@dlt.expect_or_drop("valid_qty", "restock_qty >= 0")
def inventory_events():
    df = dlt.read_stream("maven_uc.bronze_dlt.inventory_streaming_table")
    return (
        df
        .withColumn("restock_date", to_date(col("restock_date"), "M/d/yyyy"))
        .select(
            "product_id",
            "store_id",
            "restock_qty",
            "quantity_remaining",
            "restock_date",
            "event_ts",
        )
    )



# Order Events

In [0]:
import dlt
from pyspark.sql.functions import col, current_timestamp
@dlt.table(
    name="maven_uc.silver_dlt.orders_events_silver",
    comment="Cleaned order events from Kafka"
)
#@dlt.expect_or_drop("valid_qty", "quantity > 0")
#@dlt.expect("valid_price", "unit_price >= 0")
def orders_events_silver():
    df = dlt.read_stream("maven_uc.bronze_dlt.orders_streaming_table")
    return (
        df
        .withColumn("order_date", to_date(col("order_date"), "M/d/yyyy"))
        .withColumn("stock_date", to_date(col("stock_date"), "M/d/yyyy"))
        .select(
            "order_id",
            "customer_id",
            "product_id",
            "store_id",
            col("order_date").cast("date"),
            col("quantity").cast("int"),
            col("stock_date").cast("date"),
            col("payment_type"),
            col("price").cast("decimal(12,2)").alias("unit_price"),
            col("order_ts").cast("timestamp"),
        )
    )


# Customers

In [0]:
@dlt.table(
    name="maven_uc.silver_dlt.customers_silver",
    comment="Current customer snapshot"
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
def customers():
    df = dlt.read("mongo_customers")
    return (
        df
        .withColumn("homeowner", col("homeowner") == "Yes")
        .withColumn("member_card", col("member_card") == "Yes")
        .withColumn("ingestion_ts", current_timestamp())
        .withColumn("birthdate", to_date(col("birthdate"), "M/d/yyy"))
        .withColumn("acct_open_date", to_date(col("acct_open_date"), "M/d/yyy"))
    )


# Products

In [0]:
import dlt
from pyspark.sql.functions import col
@dlt.table(
    name="maven_uc.silver_dlt.products_silver",
    comment="Cleaned and standardized product snapshot"
)
@dlt.expect_or_drop(
    "valid_product_id",
    "product_id IS NOT NULL"
)
def products_silver():
    return (
        dlt.read("mongo_products")
        .select(
            "product_id",
            col("product_brand"),
            col("product_name"),
            col("product_sku").cast("bigint"),
            col("product_retail_price").cast("double"),
            col("product_cost").cast("double"),
            col("product_weight").cast("double"),
            # :white_check_mark: semantic fixes
            (col("low_fat") == 1).alias("low_fat"),
            (col("recyclable") == 1).alias("recyclable"),
            col("ingestion_ts"),
            col("source_system")
        )
    )


# Regions

In [0]:
@dlt.table(
    name="maven_uc.silver_dlt.regions_silver",
    comment="Sales regions dimension"
)
@dlt.expect_or_drop("valid_region_id", "region_id IS NOT NULL")
def regions():
    df = dlt.read("maven_uc.bronze_dlt.regions")
    return (
        df
        .select(
            "region_id",
            "sales_region",
            "sales_district",
            "ingestion_timestamp"
        )
    )


# Returns


## DLT

In [0]:
import dlt
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType
@dlt.table(
    name="maven_uc.silver_dlt.returns_silver",
    comment="Cleaned product returns"
)
@dlt.expect("valid_return_date", "return_date IS NOT NULL")
@dlt.expect("valid_quantity", "quantity IS NOT NULL AND CAST(quantity AS INT) > 0")
def returns():
    df = dlt.read("maven_uc.bronze_dlt.returns")
    return (
        df
        .withColumn("return_date", to_date(col("return_date"), "M/d/yyy"))
        .withColumn("quantity", col("quantity").cast(IntegerType()))
        .withColumnRenamed("ingestion_timestamp", "created_at")
        .select(
            "return_date",
            "product_id",
            "store_id",
            "quantity",
        )
    )
# The error was caused by renaming 'quantity' to 'return_quantity' before the .select(), 
# which made 'quantity' unavailable for the @dlt.expect and .select(). 
# The fix is to keep the column name as 'quantity'.


# Stores


## For DLT

In [0]:
import dlt
from pyspark.sql.functions import col, to_date, trim
from pyspark.sql.types import IntegerType
@dlt.table(
    name="maven_uc.silver_dlt.stores_silver",
    comment="Clean store dimension"
)
@dlt.expect("valid_store_id", "store_id IS NOT NULL")
@dlt.expect(
    "valid_sqft",
    """
    (total_sqft IS NULL OR CAST(total_sqft AS INT) >= 0)
    AND
    (grocery_sqft IS NULL OR CAST(grocery_sqft AS INT) >= 0)
    """
)
def stores():
    df = dlt.read("maven_uc.bronze_dlt.stores")
    return (
        df
        .withColumn("first_opened_date", to_date(trim(col("first_opened_date")), "M/d/yyy"))
        .withColumn("last_remodel_date", to_date(trim(col("last_remodel_date")), "M/d/yyy"))
        .withColumn("total_sqft", col("total_sqft").cast(IntegerType()))
        .withColumn("grocery_sqft", col("grocery_sqft").cast(IntegerType()))
        .select(
            "store_id",
            "region_id",
            "store_type",
            "store_name",
            "store_street_address",
            "store_city",
            "store_state",
            "store_country",
            "store_phone",
            "first_opened_date",
            "last_remodel_date",
            "total_sqft",
            "grocery_sqft",
            #"created_at"
        )
    )


# Transactions

## DLT

In [0]:
import dlt
from pyspark.sql.functions import col, to_date, trim
from pyspark.sql.types import IntegerType
@dlt.table(
    name="maven_uc.silver_dlt.transactions_silver",
    comment="Cleaned transaction facts"
)
@dlt.expect("valid_transaction_date", "transaction_date IS NOT NULL")
@dlt.expect("valid_quantity", "quantity IS NOT NULL AND CAST(quantity AS INT) > 0")
def transactions():
    df = dlt.read("maven_uc.bronze_dlt.transactions")
    return (
        df
        .withColumn("transaction_date", to_date(trim(col("transaction_date")), "M/d/yyy"))
        .withColumn("stock_date", to_date(trim(col("stock_date")), "M/d/yyy"))

        .withColumn("quantity", col("quantity").cast(IntegerType()))
        .withColumnRenamed("ingestion_timestamp", "created_at")
        .select(
            "transaction_date",
            "stock_date",
            "product_id",
            "customer_id",
            "store_id",
            "quantity",
            #"created_at"
        )
    )