# Read raw data from tables in Unity Catalog
# Flatten and Transform the DataFrame

## Prepare Products

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import BooleanType, DoubleType

# Read raw Shopify product tables for US and AU
raw_product_us_df = (
    spark
    .read
    .table("shopify.bronze.products_quayusa")
)

raw_product_au_df = (
    spark
    .read
    .table("shopify.bronze.products_quayaustralia")
)

# Transform US products: explode variants, extract fields, flatten metafields
product_us_df = (
    raw_product_us_df
    .withColumn("us_name", F.col("title"))
    .withColumn("us_product_type", F.col("productType"))
    .withColumn("us_variant", F.explode_outer("variants.edges")) # Flatten variant edges # explode_outer preserves every input row, even when the array is null or empty. flatten the list of dicts into individual rows.
    .withColumn("sku", F.col("us_variant.node.sku")) # Extract SKU
    .withColumn(
        "us_variant_id", 
        F.element_at(F.split(F.col("us_variant.node.id"), "/"), -1) # Get variant ID from full path
    )
    .withColumn(
        "us_onsale",
        F.when(
            F.col("us_variant.node.compareAtPrice") >= F.lit(1), # Define on sale
            F.lit(1)
        ).otherwise(F.lit(0))
    )
    .withColumn("us_handle", F.col("handle"))
    .withColumn(
        "us_group_id",
        F.when(
            F.col("us_name").like("% CASE"), 
            F.concat(F.col("sku"), F.lit("-"), F.col("us_handle"))
        ).otherwise(F.col("us_handle"))
    )
    .withColumn("us_variant_title", F.col("us_variant.node.title"))
    .withColumn(
        "us_url",
        F.concat(
            F.lit("https://www.quayaustralia.com/products/"), 
            F.col("us_handle"), 
            F.lit("?variant="), 
            F.col("us_variant_id")
        )  
    )
    .withColumn("us_image_url", F.col("us_variant.node.image.url"))
    
    # Ensure US variant price defaults to 0.1 when missing (null) or explicitly zero
    .withColumn(
        "us_price", 
        F.when(
            (F.coalesce(F.col("us_variant.node.price"), F.lit(0)) == F.lit(0)), # Coalesce null price to 0 and check if price is zero
            0.1 # Default minimum price for zero or null values
        ).otherwise(F.col("us_variant.node.price")) # Otherwise, retain the original price
    )

    # Explode product-level and variant-level metafields
    .withColumn("us_metafield", F.explode_outer("metafields.edges"))
    .withColumn("us_variant_metafield", F.explode_outer("us_variant.node.metafields.edges"))
    .withColumn(
        "us_gender",
        F.when(
            (F.col("us_metafield.node.namespace") == "features")
            & (F.col("us_metafield.node.key") == "gender")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_size",
        F.when(
            (F.col("us_metafield.node.namespace") == "fit")
            & (F.col("us_metafield.node.key") == "size")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_shape",
        F.when(
            (F.col("us_metafield.node.namespace") == "fit")
            & (F.col("us_metafield.node.key") == "shape")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_lens_color",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "lens")
            & (F.col("us_variant_metafield.node.key") == "color")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_lens_technology",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "lens")
            & (F.col("us_variant_metafield.node.key") == "technology")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_lens_category",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "lens")
            & (F.col("us_variant_metafield.node.key") == "category")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_frame_color",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "frame")
            & (F.col("us_variant_metafield.node.key") == "color")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_faceshapeoval",
        F.when(
            (F.col("us_metafield.node.namespace") == "faceshape")
            & (F.col("us_metafield.node.key") == "oval")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_faceshapeheart",
        F.when(
            (F.col("us_metafield.node.namespace") == "faceshape")
            & (F.col("us_metafield.node.key") == "heart")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_faceshapesquare",
        F.when(
            (F.col("us_metafield.node.namespace") == "faceshape")
            & (F.col("us_metafield.node.key") == "square")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_faceshaperound",
        F.when(
            (F.col("us_metafield.node.namespace") == "faceshape")
            & (F.col("us_metafield.node.key") == "round")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_frameshapeaviator",
        F.when(
            (F.col("us_metafield.node.namespace") == "frameshape")
            & (F.col("us_metafield.node.key") == "aviator")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_frameshapecateye",
        F.when(
            (F.col("us_metafield.node.namespace") == "frameshape")
            & (F.col("us_metafield.node.key") == "cateye")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_frameshaperound",
        F.when(
            (F.col("us_metafield.node.namespace") == "frameshape")
            & (F.col("us_metafield.node.key") == "round")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_frameshapesquare",
        F.when(
            (F.col("us_metafield.node.namespace") == "frameshape")
            & (F.col("us_metafield.node.key") == "square")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_frameshapeshield",
        F.when(
            (F.col("us_metafield.node.namespace") == "frameshape")
            & (F.col("us_metafield.node.key") == "shield")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_framesizewide",
        F.when(
            (F.col("us_metafield.node.namespace") == "framesize")
            & (F.col("us_metafield.node.key") == "wide")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_framesizemedium",
        F.when(
            (F.col("us_metafield.node.namespace") == "framesize")
            & (F.col("us_metafield.node.key") == "medium")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_framesizenarrow",
        F.when(
            (F.col("us_metafield.node.namespace") == "framesize")
            & (F.col("us_metafield.node.key") == "narrow")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_framefeaturenosepad",
        F.when(
            (F.col("us_metafield.node.namespace") == "framefeature")
            & (F.col("us_metafield.node.key") == "nosepads")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_framefeaturelightweight",
        F.when(
            (F.col("us_metafield.node.namespace") == "framefeature")
            & (F.col("us_metafield.node.key") == "lightweight")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_framefeaturewhatever",
        F.when(
            (F.col("us_metafield.node.namespace") == "framefeature")
            & (F.col("us_metafield.node.key") == "whatever")
            & (F.col("us_metafield.node.ownerType") == "PRODUCT"),
            F.col("us_metafield.node.value")
        )
    )
    .withColumn(
        "us_vibeclassic",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "vibe")
            & (F.col("us_variant_metafield.node.key") == "classic")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_vibeedgy",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "vibe")
            & (F.col("us_variant_metafield.node.key") == "edgy")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_vibeglam",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "vibe")
            & (F.col("us_variant_metafield.node.key") == "glam")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_vibecasual",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "vibe")
            & (F.col("us_variant_metafield.node.key") == "casual")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_vibesporty",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "vibe")
            & (F.col("us_variant_metafield.node.key") == "sporty")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_colorneutral",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "color")
            & (F.col("us_variant_metafield.node.key") == "neutral")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_colorbright",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "color")
            & (F.col("us_variant_metafield.node.key") == "bright")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_colorblack",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "color")
            & (F.col("us_variant_metafield.node.key") == "black")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_colortortoise",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "color")
            & (F.col("us_variant_metafield.node.key") == "tortoise")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )
    .withColumn(
        "us_colormetallic",
        F.when(
            (F.col("us_variant_metafield.node.namespace") == "color")
            & (F.col("us_variant_metafield.node.key") == "metallic")
            & (F.col("us_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("us_variant_metafield.node.value")
        )
    )

    # Filters: valid SKUs, published products, us_image_url, exclude gift cards and donations
    .filter(F.col("sku").isNotNull() & (F.trim(F.col("sku")) != ""))
    .filter(F.col("publishedAt").isNotNull() & ~F.col("title").like("%E-GIFT CARD%"))
    .filter(~F.col("sku").like("%DONATION%"))
    .filter(F.col("us_image_url").isNotNull())

    # Select final US columns
    .select(
        "sku", "us_name", "us_product_type", "us_onsale", "us_group_id",
        "us_variant_title", "us_variant_id", "us_url", "us_image_url", "us_price",
        "us_gender", "us_size", "us_shape", "us_lens_color", "us_lens_technology",
        "us_lens_category", "us_frame_color", "us_faceshapeoval",
        "us_faceshapeheart", "us_faceshapesquare", "us_faceshaperound", "us_frameshapeaviator",
        "us_frameshapecateye", "us_frameshaperound", "us_frameshapesquare", "us_frameshapeshield",
        "us_framesizewide", "us_framesizemedium", "us_framesizenarrow", "us_framefeaturenosepad",
        "us_framefeaturelightweight", "us_framefeaturewhatever", "us_vibeclassic", "us_vibeedgy",
        "us_vibeglam", "us_vibecasual", "us_vibesporty", "us_colorneutral", "us_colorbright",
        "us_colorblack", "us_colortortoise", "us_colormetallic"
    )

    # Aggregate to one row per SKU by taking max on each column
    .groupBy("sku")
    .agg(
        F.max("us_name").alias("us_name"),
        F.max("us_product_type").alias("us_product_type"),
        F.max("us_onsale").alias("us_onsale"),
        F.max("us_group_id").alias("us_group_id"),
        F.max("us_variant_title").alias("us_variant_title"),
        F.max("us_variant_id").alias("us_variant_id"),
        F.max("us_url").alias("us_url"),
        F.max("us_image_url").alias("us_image_url"),
        F.max("us_price").alias("us_price"),
        F.max("us_gender").alias("us_gender"),
        F.max("us_size").alias("us_size"),
        F.max("us_shape").alias("us_shape"),
        F.max("us_lens_color").alias("us_lens_color"),
        F.max("us_lens_technology").alias("us_lens_technology"),
        F.max("us_lens_category").alias("us_lens_category"),
        F.max("us_frame_color").alias("us_frame_color"),
        F.max("us_faceshapeoval").alias("us_faceshapeoval"),
        F.max("us_faceshapeheart").alias("us_faceshapeheart"),
        F.max("us_faceshapesquare").alias("us_faceshapesquare"),
        F.max("us_faceshaperound").alias("us_faceshaperound"),
        F.max("us_frameshapeaviator").alias("us_frameshapeaviator"),
        F.max("us_frameshapecateye").alias("us_frameshapecateye"),
        F.max("us_frameshaperound").alias("us_frameshaperound"),
        F.max("us_frameshapesquare").alias("us_frameshapesquare"),
        F.max("us_frameshapeshield").alias("us_frameshapeshield"),
        F.max("us_framesizewide").alias("us_framesizewide"),
        F.max("us_framesizemedium").alias("us_framesizemedium"),
        F.max("us_framesizenarrow").alias("us_framesizenarrow"),
        F.max("us_framefeaturenosepad").alias("us_framefeaturenosepad"),
        F.max("us_framefeaturelightweight").alias("us_framefeaturelightweight"),
        F.max("us_framefeaturewhatever").alias("us_framefeaturewhatever"),
        F.max("us_vibeclassic").alias("us_vibeclassic"),
        F.max("us_vibeedgy").alias("us_vibeedgy"),
        F.max("us_vibeglam").alias("us_vibeglam"),
        F.max("us_vibecasual").alias("us_vibecasual"),
        F.max("us_vibesporty").alias("us_vibesporty"),
        F.max("us_colorneutral").alias("us_colorneutral"),
        F.max("us_colorbright").alias("us_colorbright"),
        F.max("us_colorblack").alias("us_colorblack"),
        F.max("us_colortortoise").alias("us_colortortoise"),
        F.max("us_colormetallic").alias("us_colormetallic")
    )
)

product_au_df = (
    raw_product_au_df
    .withColumn("au_name", F.col("title"))
    .withColumn("au_product_type", F.col("productType"))
    .withColumn("au_variant", F.explode_outer("variants.edges")) # preserves every input row, even when the array is null or empty. flatten the list of dicts into individual rows.
    .withColumn("sku", F.col("au_variant.node.sku"))
    .withColumn(
        "au_variant_id", 
        F.element_at(F.split(F.col("au_variant.node.id"), "/"), -1)
    )
    .withColumn(
        "au_onsale",
        F.when(
            F.col("au_variant.node.compareAtPrice") >= F.lit(1), 
            F.lit(1)
        ).otherwise(F.lit(0))
    )
    .withColumn("au_handle", F.col("handle"))
    .withColumn(
        "au_group_id",
        F.when(
            F.col("au_name").like("% CASE"), 
            F.concat(F.col("sku"), F.lit("-"), F.col("au_handle"))
        ).otherwise(F.col("au_handle"))
    )
    .withColumn("au_variant_title", F.col("au_variant.node.title"))
    .withColumn(
        "au_url",
        F.concat(
            F.lit("https://www.quayaustralia.com.au/products/"), 
            F.col("au_handle"), 
            F.lit("?variant="), 
            F.col("au_variant_id")
        )  
    )
    .withColumn("au_image_url", F.col("au_variant.node.image.url"))
    .withColumn("au_price", F.col("au_variant.node.price"))

    .withColumn("au_metafield", F.explode_outer("metafields.edges"))
    .withColumn("au_variant_metafield", F.explode_outer("au_variant.node.metafields.edges"))
    .withColumn(
        "au_gender",
        F.when(
            (F.col("au_metafield.node.namespace") == "features")
            & (F.col("au_metafield.node.key") == "gender")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_size",
        F.when(
            (F.col("au_metafield.node.namespace") == "fit")
            & (F.col("au_metafield.node.key") == "size")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_shape",
        F.when(
            (F.col("au_metafield.node.namespace") == "fit")
            & (F.col("au_metafield.node.key") == "shape")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_lens_color",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "lens")
            & (F.col("au_variant_metafield.node.key") == "color")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_lens_technology",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "lens")
            & (F.col("au_variant_metafield.node.key") == "technology")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_lens_category",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "lens")
            & (F.col("au_variant_metafield.node.key") == "category")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_frame_color",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "frame")
            & (F.col("au_variant_metafield.node.key") == "color")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_faceshapeoval",
        F.when(
            (F.col("au_metafield.node.namespace") == "faceshape")
            & (F.col("au_metafield.node.key") == "oval")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_faceshapeheart",
        F.when(
            (F.col("au_metafield.node.namespace") == "faceshape")
            & (F.col("au_metafield.node.key") == "heart")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_faceshapesquare",
        F.when(
            (F.col("au_metafield.node.namespace") == "faceshape")
            & (F.col("au_metafield.node.key") == "square")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_faceshaperound",
        F.when(
            (F.col("au_metafield.node.namespace") == "faceshape")
            & (F.col("au_metafield.node.key") == "round")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_frameshapeaviator",
        F.when(
            (F.col("au_metafield.node.namespace") == "frameshape")
            & (F.col("au_metafield.node.key") == "aviator")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_frameshapecateye",
        F.when(
            (F.col("au_metafield.node.namespace") == "frameshape")
            & (F.col("au_metafield.node.key") == "cateye")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_frameshaperound",
        F.when(
            (F.col("au_metafield.node.namespace") == "frameshape")
            & (F.col("au_metafield.node.key") == "round")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_frameshapesquare",
        F.when(
            (F.col("au_metafield.node.namespace") == "frameshape")
            & (F.col("au_metafield.node.key") == "square")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_frameshapeshield",
        F.when(
            (F.col("au_metafield.node.namespace") == "frameshape")
            & (F.col("au_metafield.node.key") == "shield")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_framesizewide",
        F.when(
            (F.col("au_metafield.node.namespace") == "framesize")
            & (F.col("au_metafield.node.key") == "wide")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_framesizemedium",
        F.when(
            (F.col("au_metafield.node.namespace") == "framesize")
            & (F.col("au_metafield.node.key") == "medium")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_framesizenarrow",
        F.when(
            (F.col("au_metafield.node.namespace") == "framesize")
            & (F.col("au_metafield.node.key") == "narrow")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_framefeaturenosepad",
        F.when(
            (F.col("au_metafield.node.namespace") == "framefeature")
            & (F.col("au_metafield.node.key") == "nosepads")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_framefeaturelightweight",
        F.when(
            (F.col("au_metafield.node.namespace") == "framefeature")
            & (F.col("au_metafield.node.key") == "lightweight")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_framefeaturewhatever",
        F.when(
            (F.col("au_metafield.node.namespace") == "framefeature")
            & (F.col("au_metafield.node.key") == "whatever")
            & (F.col("au_metafield.node.ownerType") == "PRODUCT"),
            F.col("au_metafield.node.value")
        )
    )
    .withColumn(
        "au_vibeclassic",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "vibe")
            & (F.col("au_variant_metafield.node.key") == "classic")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_vibeedgy",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "vibe")
            & (F.col("au_variant_metafield.node.key") == "edgy")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_vibeglam",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "vibe")
            & (F.col("au_variant_metafield.node.key") == "glam")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_vibecasual",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "vibe")
            & (F.col("au_variant_metafield.node.key") == "casual")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_vibesporty",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "vibe")
            & (F.col("au_variant_metafield.node.key") == "sporty")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_colorneutral",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "color")
            & (F.col("au_variant_metafield.node.key") == "neutral")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_colorbright",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "color")
            & (F.col("au_variant_metafield.node.key") == "bright")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_colorblack",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "color")
            & (F.col("au_variant_metafield.node.key") == "black")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_colortortoise",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "color")
            & (F.col("au_variant_metafield.node.key") == "tortoise")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .withColumn(
        "au_colormetallic",
        F.when(
            (F.col("au_variant_metafield.node.namespace") == "color")
            & (F.col("au_variant_metafield.node.key") == "metallic")
            & (F.col("au_variant_metafield.node.ownerType") == "PRODUCTVARIANT"),
            F.col("au_variant_metafield.node.value")
        )
    )
    .filter(F.col("sku").isNotNull() & (F.trim(F.col("sku")) != ""))
    .filter(F.col("publishedAt").isNotNull() & ~F.col("title").like("%E-GIFT CARD%"))
    .filter(~F.col("sku").like("%DONATION%"))
    .filter(F.col("au_image_url").isNotNull())
    .select(
        "sku", "au_name", "au_product_type", "au_onsale", "au_group_id",
        "au_variant_title", "au_variant_id", "au_url", "au_image_url", "au_price",
        "au_gender", "au_size", "au_shape", "au_lens_color", "au_lens_technology",
        "au_lens_category", "au_frame_color", "au_faceshapeoval",
        "au_faceshapeheart", "au_faceshapesquare", "au_faceshaperound", "au_frameshapeaviator",
        "au_frameshapecateye", "au_frameshaperound", "au_frameshapesquare", "au_frameshapeshield",
        "au_framesizewide", "au_framesizemedium", "au_framesizenarrow", "au_framefeaturenosepad",
        "au_framefeaturelightweight", "au_framefeaturewhatever", "au_vibeclassic", "au_vibeedgy",
        "au_vibeglam", "au_vibecasual", "au_vibesporty", "au_colorneutral", "au_colorbright",
        "au_colorblack", "au_colortortoise", "au_colormetallic"
    )
    .groupBy("sku")
    .agg(
        F.max("au_name").alias("au_name"),
        F.max("au_product_type").alias("au_product_type"),
        F.max("au_onsale").alias("au_onsale"),
        F.max("au_group_id").alias("au_group_id"),
        F.max("au_variant_title").alias("au_variant_title"),
        F.max("au_variant_id").alias("au_variant_id"),
        F.max("au_url").alias("au_url"),
        F.max("au_image_url").alias("au_image_url"),
        F.max("au_price").alias("au_price"),
        F.max("au_gender").alias("au_gender"),
        F.max("au_size").alias("au_size"),
        F.max("au_shape").alias("au_shape"),
        F.max("au_lens_color").alias("au_lens_color"),
        F.max("au_lens_technology").alias("au_lens_technology"),
        F.max("au_lens_category").alias("au_lens_category"),
        F.max("au_frame_color").alias("au_frame_color"),
        F.max("au_faceshapeoval").alias("au_faceshapeoval"),
        F.max("au_faceshapeheart").alias("au_faceshapeheart"),
        F.max("au_faceshapesquare").alias("au_faceshapesquare"),
        F.max("au_faceshaperound").alias("au_faceshaperound"),
        F.max("au_frameshapeaviator").alias("au_frameshapeaviator"),
        F.max("au_frameshapecateye").alias("au_frameshapecateye"),
        F.max("au_frameshaperound").alias("au_frameshaperound"),
        F.max("au_frameshapesquare").alias("au_frameshapesquare"),
        F.max("au_frameshapeshield").alias("au_frameshapeshield"),
        F.max("au_framesizewide").alias("au_framesizewide"),
        F.max("au_framesizemedium").alias("au_framesizemedium"),
        F.max("au_framesizenarrow").alias("au_framesizenarrow"),
        F.max("au_framefeaturenosepad").alias("au_framefeaturenosepad"),
        F.max("au_framefeaturelightweight").alias("au_framefeaturelightweight"),
        F.max("au_framefeaturewhatever").alias("au_framefeaturewhatever"),
        F.max("au_vibeclassic").alias("au_vibeclassic"),
        F.max("au_vibeedgy").alias("au_vibeedgy"),
        F.max("au_vibeglam").alias("au_vibeglam"),
        F.max("au_vibecasual").alias("au_vibecasual"),
        F.max("au_vibesporty").alias("au_vibesporty"),
        F.max("au_colorneutral").alias("au_colorneutral"),
        F.max("au_colorbright").alias("au_colorbright"),
        F.max("au_colorblack").alias("au_colorblack"),
        F.max("au_colortortoise").alias("au_colortortoise"),
        F.max("au_colormetallic").alias("au_colormetallic")
    )
)

## Prepare Handles

In [0]:
# Extract SKU-level concatenated category handles for US products
us_product_allHandles_df = (
    raw_product_us_df
    .withColumn("us_variant", F.explode_outer("variants.edges")) # Flatten variant edges to get each SKU instance
    .withColumn("sku", F.col("us_variant.node.sku")) # Pull SKU from variant node
    .withColumn("us_collection", F.explode_outer("collections.edges")) # Flatten collection edges to get handles
    .select("sku", "us_collection.node.handle") # Keep only SKU and collection handle
    .groupBy("sku") # Group by SKU for aggregation
    .agg(
        F.concat_ws( 
            "|", # Delimiter for concatenated handles. Concat all handles.
            F.sort_array(F.collect_set("handle")) # Remove duplicates and collect handles. Sort handles alphabetically.
        ).alias("us_categories")
    )
)

# Extract SKU-level concatenated category handles for AU products
au_product_allHandles_df = (
    raw_product_au_df
    .withColumn("au_variant", F.explode_outer("variants.edges"))
    .withColumn("sku", F.col("au_variant.node.sku"))
    .withColumn("au_collection", F.explode_outer("collections.edges"))
    .select("sku", "au_collection.node.handle")
    .groupBy("sku")
    .agg(
        F.concat_ws(
            "|",
            F.sort_array(F.collect_set("handle"))
        ).alias("au_categories")
    )
)

## Prepare Inventories

In [0]:
# US inventory: read, explode levels and quantities, filter to PT DC, aggregate max availability
inventory_us_df = (
    spark
    .read
    .table("shopify.bronze.inventoryitems_quayusa") # Raw US inventory items
    .withColumn("us_level", F.explode_outer("inventoryLevels.edges")) # Flatten inventory level edges
    .withColumn("us_level_quantities", F.explode_outer("us_level.node.quantities")) # Flatten quantities array
    .withColumn("us_level_location_name", F.col("us_level.node.location.name")) # Extract location name
    .withColumn("us_level_quantity", F.col("us_level_quantities.quantity")) # Extract quantity
    .filter(F.col("us_level.node.location.name") == "PT DC") # Only include PT DC inventory location in US. Per Kelli's request on 01/23/2023.
    .select("sku", "us_level_location_name", "us_level_quantity")
    .groupBy("sku") # Group by SKU to aggregate
    .agg(
        F.max("us_level_quantity").alias("us_maxAvailable") # Keep highest available quantity
    )
)

# AU inventory: read, explode levels and quantities, filter to CEVA 3PL, aggregate max availability
inventory_au_df = (
    spark
    .read
    .table("shopify.bronze.inventoryitems_quayaustralia")
    .withColumn("au_level", F.explode_outer("inventoryLevels.edges"))
    .withColumn("au_level_quantities", F.explode_outer("au_level.node.quantities"))
    .withColumn("au_level_location_name", F.col("au_level.node.location.name"))
    .withColumn("au_level_quantity", F.col("au_level_quantities.quantity"))
    .filter(F.col("au_level.node.location.name") == "CEVA 3PL")
    .select("sku", "au_level_location_name", "au_level_quantity")
    .groupBy("sku")
    .agg(
        F.max("au_level_quantity").alias("au_maxAvailable")
    )
)

# Join tables to get an integrated Product DataFrame

## Combine product attributes, categories, and inventory into final DataFrame

In [0]:
# Combine US product attributes, categories, and inventory into US final DataFrame
product_final_us_df = (
    product_us_df.alias("p") # Base US product details
    .join(us_product_allHandles_df.alias("cat"), on="sku", how="left") # Join category handles
    .join(inventory_us_df.alias("inv"), on="sku", how="left") # Join inventory availability
    .withColumn(
        "us_in_stock",
        (
            F.col("inv.us_maxAvailable").isNotNull()
            & (F.col("inv.us_maxAvailable") >= 20) # Define in stock
        )
        .cast(BooleanType())
    )
    .select("p.*", "cat.us_categories", "us_in_stock")
)

# Combine AU product attributes, categories, and inventory into AU final DataFrame
product_final_au_df = (
    product_au_df.alias("p")
    .join(au_product_allHandles_df.alias("cat"), on="sku", how="left")
    .join(inventory_au_df.alias("inv"), on="sku", how="left")
    .withColumn(
        "au_in_stock",
        (
            F.col("inv.au_maxAvailable").isNotNull()
            & (F.col("inv.au_maxAvailable") >= 20) 
        )
        .cast(BooleanType())
    )
    .select("p.*", "cat.au_categories", "au_in_stock")
)

## Combine US and AU final product DataFrames

In [0]:
# Join US and AU final product DataFrames into a single unified DataFrame
product_df = (
    product_final_us_df # US products
    .join(product_final_au_df, on="sku", how="full") # AU products. Full outer join to include SKUs from both regions
    .withColumn(
        "channel", 
        F.when(
            F.col("us_variant_id").isNotNull(),
            F.lit("US") # Mark as US if US variant exists. Treat US as the main channel if a SKU sold in both regions.
        ).otherwise(F.lit("AU")) # Otherwise mark as AU
    )
    # Select common and region-specific fields, choosing US values when channel is US, else AU
    .select(
        "channel",
        "sku",
        F.when(F.col("channel") == "US", F.col("us_name")).otherwise(F.col("au_name")).alias("name"),
        F.when(F.col("channel") == "US", F.col("us_product_type")).otherwise(F.col("au_product_type")).alias("product_type"),
        F.when(F.col("channel") == "US", F.col("us_onsale")).otherwise(F.col("au_onsale")).alias("onsale"),
        F.when(F.col("channel") == "US", F.col("us_group_id")).otherwise(F.col("au_group_id")).alias("group_id"),
        F.when(F.col("channel") == "US", F.col("us_variant_title")).otherwise(F.col("au_variant_title")).alias("variant_title"),
        F.when(F.col("channel") == "US", F.col("us_variant_id")).otherwise(F.col("au_variant_id")).alias("variant_id"),
        F.coalesce(F.col("us_url"), F.lit("https://www.quayaustralia.com/")).alias("url"), # US URL only
        F.coalesce(F.col("au_url"), F.lit("https://www.quayaustralia.com.au/")).alias("lng:en_AU:url"), # AU localized URL
        F.when(F.col("channel") == "US", F.col("us_image_url")).otherwise(F.col("au_image_url")).alias("image_url"),
        F.coalesce(F.col("us_price"), F.lit(0.1)).cast(DoubleType()).alias("price"), # US Price only. Can't be 0 or null.
        F.coalesce(F.col("au_price"), F.lit(0.1)).cast(DoubleType()).alias("lng:en_AU:price"), # AU localized Price
        F.coalesce(F.col("us_in_stock"), F.lit(False)).cast(BooleanType()).alias("in_stock"), # US In Stock only
        F.coalesce(F.col("au_in_stock"), F.lit(False)).cast(BooleanType()).alias("lng:en_AU:in_stock"), # AU localized In Stock. False if null.
        F.when(F.col("channel") == "US", F.col("us_gender")).otherwise(F.col("au_gender")).alias("Gender"),
        F.when(F.col("channel") == "US", F.col("us_size")).otherwise(F.col("au_size")).alias("Size"),
        F.when(F.col("channel") == "US", F.col("us_shape")).otherwise(F.col("au_shape")).alias("Shape"),
        F.when(F.col("channel") == "US", F.col("us_lens_color")).otherwise(F.col("au_lens_color")).alias("lens_color"),
        F.when(F.col("channel") == "US", F.col("us_lens_technology")).otherwise(F.col("au_lens_technology")).alias("lens_technology"),
        F.when(F.col("channel") == "US", F.col("us_lens_category")).otherwise(F.col("au_lens_category")).alias("lens_category"),
        F.when(F.col("channel") == "US", F.col("us_frame_color")).otherwise(F.col("au_frame_color")).alias("frame_color"),
        F.when(F.col("channel") == "US", F.col("us_categories")).otherwise(F.col("au_categories")).alias("categories"),
        F.when(F.col("channel") == "US", F.col("us_faceshapeoval")).otherwise(F.col("au_faceshapeoval")).alias("faceshapeoval"),
        F.when(F.col("channel") == "US", F.col("us_faceshapeheart")).otherwise(F.col("au_faceshapeheart")).alias("faceshapeheart"),
        F.when(F.col("channel") == "US", F.col("us_faceshapesquare")).otherwise(F.col("au_faceshapesquare")).alias("faceshapesquare"),
        F.when(F.col("channel") == "US", F.col("us_faceshaperound")).otherwise(F.col("au_faceshaperound")).alias("faceshaperound"),
        F.when(F.col("channel") == "US", F.col("us_frameshapeaviator")).otherwise(F.col("au_frameshapeaviator")).alias("frameshapeaviator"),
        F.when(F.col("channel") == "US", F.col("us_frameshapecateye")).otherwise(F.col("au_frameshapecateye")).alias("frameshapecateye"),
        F.when(F.col("channel") == "US", F.col("us_frameshaperound")).otherwise(F.col("au_frameshaperound")).alias("frameshaperound"),
        F.when(F.col("channel") == "US", F.col("us_frameshapesquare")).otherwise(F.col("au_frameshapesquare")).alias("frameshapesquare"),
        F.when(F.col("channel") == "US", F.col("us_frameshapeshield")).otherwise(F.col("au_frameshapeshield")).alias("frameshapeshield"),
        F.when(F.col("channel") == "US", F.col("us_framesizewide")).otherwise(F.col("au_framesizewide")).alias("framesizewide"),
        F.when(F.col("channel") == "US", F.col("us_framesizemedium")).otherwise(F.col("au_framesizemedium")).alias("framesizemedium"),
        F.when(F.col("channel") == "US", F.col("us_framesizenarrow")).otherwise(F.col("au_framesizenarrow")).alias("framesizenarrow"),
        F.when(F.col("channel") == "US", F.col("us_framefeaturenosepad")).otherwise(F.col("au_framefeaturenosepad")).alias("framefeaturenosepad"),
        F.when(F.col("channel") == "US", F.col("us_framefeaturelightweight")).otherwise(F.col("au_framefeaturelightweight")).alias("framefeaturelightweight"),
        F.when(F.col("channel") == "US", F.col("us_framefeaturewhatever")).otherwise(F.col("au_framefeaturewhatever")).alias("framefeaturewhatever"),
        F.when(F.col("channel") == "US", F.col("us_vibeclassic")).otherwise(F.col("au_vibeclassic")).alias("vibeclassic"),
        F.when(F.col("channel") == "US", F.col("us_vibeedgy")).otherwise(F.col("au_vibeedgy")).alias("vibeedgy"),
        F.when(F.col("channel") == "US", F.col("us_vibeglam")).otherwise(F.col("au_vibeglam")).alias("vibeglam"),
        F.when(F.col("channel") == "US", F.col("us_vibecasual")).otherwise(F.col("au_vibecasual")).alias("vibecasual"),
        F.when(F.col("channel") == "US", F.col("us_vibesporty")).otherwise(F.col("au_vibesporty")).alias("vibesporty"),
        F.when(F.col("channel") == "US", F.col("us_colorneutral")).otherwise(F.col("au_colorneutral")).alias("colorneutral"),
        F.when(F.col("channel") == "US", F.col("us_colorbright")).otherwise(F.col("au_colorbright")).alias("colorbright"),
        F.when(F.col("channel") == "US", F.col("us_colorblack")).otherwise(F.col("au_colorblack")).alias("colorblack"),
        F.when(F.col("channel") == "US", F.col("us_colortortoise")).otherwise(F.col("au_colortortoise")).alias("colortortoise"),
        F.when(F.col("channel") == "US", F.col("us_colormetallic")).otherwise(F.col("au_colormetallic")).alias("colormetallic")
    )
)

# Save Unified Product DataFrame to ADLS

In [0]:
# Define Azure Data Lake paths for Silver layer
delta_silver_path = f"abfss://silver@qydatalake.dfs.core.windows.net/dynamicYeild/productfeed_delta/"
csv_silver_path = f"abfss://silver@qydatalake.dfs.core.windows.net/dynamicYeild/productfeed_csv/"

# Write the unified product DataFrame as Delta for versioned storage and time travel
(
    product_df
    .coalesce(1) # Reduce to a single output file for simplicity (small dataset)
    .write
    .format("delta") # Delta format supports ACID and time travel
    .option("overwriteSchema", "true")
    .mode("overwrite") # Replace any existing data at the path
    .save(delta_silver_path) # Persist to Silver Delta path
)

# Write the same DataFrame as CSV for later uploading and users to easily review
(
    product_df
    .coalesce(1) # Single file output so ADF can easily pick it up
    .write
    .format("csv") # Standard CSV format
    .option("header", "true") # Include column names in the first row
    .mode("overwrite") # Overwrite existing CSV files
    .save(csv_silver_path) # Persist to Silver CSV path
) # Note: ADF uses lower-level protocols of Delta and ADF does not has S3 in sink. Hence, use Databricks to upload CSV to S3.

# Downgrade Protocols at Databricks (did not work) to match ADF version
# Vaccum ADLS

In [0]:
# from delta.tables import DeltaTable

# deltaTable = DeltaTable.forPath(spark, delta_silver_path)
# spark.sql(f"ALTER TABLE delta.`{delta_silver_path}` SET TBLPROPERTIES ('delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5')")

# # Delete the unrelated files from ADLS to save space. It is fine to run this code every time because the table is small.
# deltaTable.vacuum(168)

In [0]:
# # Read every JSON in the log
# logDf = (
#   spark
#     .read
#     .json(f"{delta_silver_path}/_delta_log/*.json")
#     .filter("protocol is not null")                # keep only the protocol entries
#     .select(
#       "protocol.minReaderVersion",
#       "protocol.minWriterVersion"
#     )
#     .distinct()
# )

# logDf.show(truncate=False)

# Read Version History for Time Travel

In [0]:
# from delta.tables import DeltaTable

# deltaTable = DeltaTable.forPath(spark, delta_silver_path)
# display(deltaTable.history())

# Time Travel by Version or Timestamp

In [0]:
# df_v0 = (
#     spark
#     .read
#     .format("delta")
#     # .option("versionAsOf", 0)
#     # .option("timestampAsOf", "2025-07-02T17:25:44.000+00:00")
#     .load(delta_silver_path)
# )

# display(df_v0.limit(10))