## 環境準備

In [0]:
import dlt
from pyspark.sql import functions as F

## Bronze Layer

In [0]:
@dlt.table()
def customers_bronze(table_properties={"quality":"bronze"}):
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/Volumes/databricks_simulated_retail_customer_data/v01/source_files/")
    )

In [0]:
@dlt.table(table_properties={"quality":"bronze"})
def sales_bronze():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/Volumes/databricks_simulated_retail_customer_data/v01/source_files/")
    )

In [0]:
@dlt.table(table_properties={"quality":"bronze"})
def sales_orders_bronze():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/Volumes/databricks_simulated_retail_customer_data/v01/source_files/sales_orders/")
    )

## Silver Layer

In [0]:
# 共通の前処理
def not_null_and_not_blank(col_name: str):
    return (F.col(col_name).isNotNull()) & (F.trim(F.col(col_name)) != "")

In [0]:
@dlt.table(table_properties={"quality":"silver"})
@dlt.expect_or_fail("pk_must_exist", "customer_id IS NOT NULL AND trim(customer_id) != ''")
@dlt.expect_all_or_drop({
    "state_present": "state IS NOT NULL AND trim(state) != ''",
    "city_present": "city IS NOT NULL AND trim(city) != ''",
    "units_present": "units_purchased IS NOT NULL AND trim(units_purchased) != ''",
    "loyalty_present": "loyalty_segment IS NOT NULL AND trim(loyalty_segment) != ''"
})

def customers_silver():
    base = spark.readStream.table("customers_bronze").alias("base")

    df = (
        base.select("customer_id","state","city","units_purchased","loyalty_segment")
            .where(
                not_null_and_not_blank("customer_id") &
                not_null_and_not_blank("state") &
                not_null_and_not_blank("city") &
                not_null_and_not_blank("units_purchased") &
                not_null_and_not_blank("loyalty_segment")
            )
    )
    return df

In [0]:
@dlt.table(table_properties={"quality":"silver"})
@dlt.expect_all_or_drop({
    "customer_id_present": "customer_id IS NOT NULL AND trim(customer_id) != ''",
    "product_name_present": "product_name IS NOT NULL AND trim(product_name) != ''",
    "order_date_present": "order_date IS NOT NULL AND trim(order_date) != ''",
    "product_category_present": "product_category IS NOT NULL AND trim(product_category) != ''",
    "product_present": "product IS NOT NULL AND trim(product) != ''",
    "total_price_present": "total_price IS NOT NULL AND trim(total_price) != ''"
})

def sales_silver():
    base = spark.readStream.table("sales_bronze").alias("base")

    df = (
        base.select("customer_id","product_name","order_date","product_category","product","total_price")
            .where(
                not_null_and_not_blank("customer_id") &
                not_null_and_not_blank("product_name") &
                not_null_and_not_blank("order_date") &
                not_null_and_not_blank("product_category") &
                not_null_and_not_blank("product") &
                not_null_and_not_blank("total_price")
            )
    )
    return df

In [0]:
@dlt.table(table_properties={"quality":"silver"})
@dlt.expect_or_fail("pk_must_exist", "order_number IS NOT NULL AND trim(order_number) != ''")
@dlt.expect_all_or_drop({
    "clicked_items_present": "clicked_items IS NOT NULL AND trim(clicked_items) != ''",
    "customer_id_present": "customer_id IS NOT NULL AND trim(customer_id) != ''",
    "number_of_line_items_present": "number_of_line_items IS NOT NULL AND trim(number_of_line_items) != ''",
    "order_datetime_present": "order_datetime IS NOT NULL AND trim(order_datetime) != ''",
    "ordered_products_present": "ordered_products IS NOT NULL AND trim(ordered_products) != ''",
    "promo_info_present": "promo_info IS NOT NULL AND trim(promo_info) != ''"
})

def sales_orders_silver():
    base = spark.readStream.table("sales_orders_bronze").alias("base")

    df = (
        base.select("clicked_items","customer_id","number_of_line_items","order_datetime","order_number","ordered_products","promo_info")
            .where(
                not_null_and_not_blank("clicked_items") &
                not_null_and_not_blank("customer_id") &
                not_null_and_not_blank("number_of_line_items") &
                not_null_and_not_blank("order_datetime") &
                not_null_and_not_blank("order_number") &
                not_null_and_not_blank("ordered_products") &
                not_null_and_not_blank("promo_info")
            )
    )
    return df

In [0]:
@dlt.table(table_properties={"quality":"silver"})
def sales_orders_customer_join_min():
    base = dlt.read_stream("sales_orders_silver").alias("base")

    so = (
        base.select("clicked_items","customer_id","number_of_line_items",
                    "order_datetime","order_number","ordered_products","promo_info")
    )

    c = dlt.read("customers_silver").alias("c")

    df = (
        so.join(c, "customer_id", "left")
          .select("clicked_items","customer_id","number_of_line_items",
                  "order_datetime","order_number","ordered_products","promo_info",
                  "loyalty_segment","region","state","city")
    )
    return df

## Gold Layer

In [0]:
ITEMS_SCHEMA = T.ArrayType(T.StructType([
    T.StructField("curr", T.StringType()),
    T.StructField("id",   T.StringType()),
    T.StructField("name", T.StringType()),
    T.StructField("price",T.StringType()),
    T.StructField("promotion_info", T.StringType()),
    T.StructField("qty",  T.StringType()),
    T.StructField("unit", T.StringType()),
]))

@dlt.table(table_properties={"quality":"gold"})
def fact_interactions():
    base = dlt.read("sales_orders_customer_join_min").alias("base")

    t = (
        base.withColumn("items", F.from_json("ordered_products", ITEMS_SCHEMA))
            .withColumn("p", F.explode("items"))
            .withColumn("ts", F.to_timestamp(F.from_unixtime(F.col("order_datetime").cast("bigint"))))
    )

    df = (
        t.select(
            "customer_id",
            F.col("p.id").alias("product_id"),
            "ts",
            F.col("p.qty").cast("int").alias("qty")
        )
        .where(F.col("qty").isNotNull() & (F.col("qty") > 0))
        .groupBy("customer_id","product_id")
        .agg(F.sum("qty").alias("total_qty"))
        .withColumn("interaction_weight", F.log1p(F.col("total_qty")))
    )
    return df

In [0]:
@dlt.table(name="features_user_min", table_properties={"quality":"gold"})
def features_user_min():
    t = dlt.read("sales_orders_customer_join_min")
        .withColumn("items", F.from_json("ordered_products", ITEMS_SCHEMA))
        .withColumn("p", F.explode("items"))
    w30 = F.col("order_ts") >= F.date_sub(F.current_timestamp(), 30)
    return (t.where(w30)
        .select(
            "customer_id",
            F.col("p.id").alias("product_id"),
            F.col("p.qty").cast("int").alias("qty"),
            F.col("p.price").cast("double").alias("price")
        )
        .groupBy("customer_id")
        .agg(
            F.countDistinct("product_id").alias("n_items_30d"),
            F.sum("qty").alias("qty_30d"),
            F.sum(F.col("qty")*F.col("price")).alias("sales_30d")
        )
    )

In [0]:




@dlt.table(name="features_item_min", table_properties={"quality":"gold"})
def features_item_min():
    t = dlt.read("tx_joined_min") \
        .withColumn("items", F.from_json("ordered_products", ITEMS_SCHEMA)) \
        .withColumn("p", F.explode("items"))
    w30 = F.col("order_ts") >= F.date_sub(F.current_timestamp(), 30)
    return (t.where(w30)
        .select(
            F.col("p.id").alias("product_id"),
            F.col("customer_id"),
            F.col("p.qty").cast("int").alias("qty"),
            F.col("p.price").cast("double").alias("price")
        )
        .groupBy("product_id")
        .agg(
            F.countDistinct("customer_id").alias("n_buyers_30d"),
            F.sum("qty").alias("qty_30d"),
            F.sum(F.col("qty")*F.col("price")).alias("sales_30d")
        )
    )

@dlt.table(name="labels_next_purchase_14d", table_properties={"quality":"gold"})
def labels_next_purchase_14d():
    t = dlt.read("tx_joined_min") \
        .withColumn("items", F.from_json("ordered_products", ITEMS_SCHEMA)) \
        .withColumn("p", F.explode("items")) \
        .select(
            "customer_id",
            F.col("p.id").alias("product_id"),
            "order_date"
        )
    cutoff = F.date_sub(F.current_date(), 14)
    pos = (t.where((F.col("order_date") >= cutoff) & (F.col("order_date") < F.current_date()))
             .select("customer_id","product_id").distinct())
    universe = (t.where(F.col("order_date") < cutoff)
                  .select("customer_id","product_id").distinct())
    return (universe.alias("u")
        .join(pos.alias("p"), ["customer_id","product_id"], "left")
        .select(
            F.col("u.customer_id").alias("customer_id"),
            F.col("u.product_id").alias("product_id"),
            F.when(F.col("p.product_id").isNotNull(), F.lit(1)).otherwise(F.lit(0)).alias("label")
        )
    )

@dlt.table(name="batch_recommendations_topn", table_properties={"quality":"gold"})
def batch_recommendations_topn():
    t = dlt.read("tx_joined_min") \
        .withColumn("items", F.from_json("ordered_products", ITEMS_SCHEMA)) \
        .withColumn("p", F.explode("items")) \
        .select(
            "customer_id",
            F.col("p.id").alias("product_id"),
            F.col("p.qty").cast("int").alias("qty"),
            "order_ts"
        )
    scores = (t.where(F.col("order_ts") >= F.date_sub(F.current_timestamp(), 30))
                .groupBy("product_id").agg(F.sum("qty").alias("pop_score_30d")))
    recent = (t.where(F.col("order_ts") >= F.date_sub(F.current_timestamp(), 90))
                .select("customer_id","product_id").distinct())
    users = t.select("customer_id").distinct()
    rec = (users.crossJoin(scores)
           .join(recent, ["customer_id","product_id"], "left_anti")
           .withColumn("rk", F.row_number().over(
               Window.partitionBy("customer_id").orderBy(F.col("pop_score_30d").desc())
           ))
           .where(F.col("rk") <= 10)
           .select("customer_id","product_id","pop_score_30d","rk"))
    return rec