Task 3

Overview: Creation of gold layer. Create standard star schema based on the bellow
specifications
- Consider only type 2-dimension fields when calculating history for the dim.
- Each row should have effective start and effective end dates, which will
represent the time row was active. Base them on silver layer
consume_datetime ;
- Make sure the referential integrity between the fact and dimensions is kept
- No duplicates should be kept, and history should be only for specified SCD2
fields.
- Use xxhash64 function for hashing, where applicable.
- Dimension details:
dim_customersField name Description Dimension Type
cust_sk surrgogate_key - hashed
cust_nk
cust_nk natural key (customer_id)
cust_first_name 1
cust_last_name 1
cust_address_country_id 2
cust_address_state_province 2
cust_address_city 2
cust_address_postal_code 2
cust_address_street_address 2
cust_phone_number 1
cust_email 1
account_mgr_id 1
date_of_birth 1
marital_status 2
Gender 1
effective_from effective from date for type
2 dimensions based on the
consume_datetime in silver
layer
effective_to effective to date for type 2
dimensions based on the
consume_datetime in silver
layer
Inserted_datetime when row was inserted
Updated_datetime when row was last updated
dim_products
Field name Description Dimension Type
product_sk surrgogate_key - hashed
product_id
product_nk natural key (product_id)
product_name 1
category_name 2
weight_class 1
product_status 2
list_price 2
min_price 1
effective_from effective from date for type
2 dimensions based on the
consume_timestamp in
silver layereffective_to effective to date for type 2
dimensions based on the
consume_timestamp in
silver layer
Inserted_datetime when row was inserted
Updated_datetime when row was last updated
Fact orders
Field name Description
order_sk surrgogate_key - hashed order_nk
customer_sk dimension surrgogate key
product_sk dimension surrgogate key
order_nk natural key - Concatenation of
order_id,line_item_id,customer_id,product_id
split by pipes
customer_nk dimension natural key
product_nk dimension natural key
order_id
line_item_id
order_date
order_mode
order_status
unit_price
quantity
Inserted_datetime when row was inserted

In [0]:
#################
##### Gold ######
#################


###################
## dim_customers ##
###################

from pyspark.sql.functions import col, lit, xxhash64, current_timestamp

# Load silver customers
silver_customers_df = spark.table("de_pyspark_training_catalog.buddy_group_1.amanolov_silver_customers_exam")

# Build staging DataFrame
staging_customers_df = silver_customers_df.select(
    col("customer_id").alias("cust_nk"),
    col("cust_first_name"),
    col("cust_last_name"),
    col("cust_address_country_id"),
    col("cust_address_state_province"),
    col("cust_address_city"),
    col("cust_address_postal_code"),
    col("cust_address_street_address"),
    col("phone_number").alias("cust_phone_number"),
    col("cust_email"),
    "account_mgr_id",
    "date_of_birth",
    "marital_status",
    "gender",
    "consume_datetime"
)

# Add surrogate key
staging_customers_df = staging_customers_df.withColumn("cust_sk", xxhash64("cust_nk"))

# Add control columns
staging_customers_df = staging_customers_df.withColumn("inserted_datetime", current_timestamp())
staging_customers_df = staging_customers_df.withColumn("updated_datetime", current_timestamp())
staging_customers_df = staging_customers_df.withColumn("effective_from", col("consume_datetime"))
staging_customers_df = staging_customers_df.withColumn("effective_to", lit("9999-12-31").cast("date"))

# Save staging DataFrame as a temporary view
staging_customers_df.createOrReplaceTempView("staging_customers")

# Create gold dimension table if not exists 
spark.sql("""
    CREATE TABLE IF NOT EXISTS de_pyspark_training_catalog.buddy_group_1.amanolov_gold_dim_customers_exam (
        cust_sk BIGINT,
        cust_nk STRING,
        cust_first_name STRING,
        cust_last_name STRING,
        cust_address_country_id STRING,
        cust_address_state_province STRING,
        cust_address_city STRING,
        cust_address_postal_code STRING,
        cust_address_street_address STRING,
        cust_phone_number STRING,
        cust_email STRING,
        account_mgr_id STRING,
        date_of_birth DATE,
        marital_status STRING,
        gender STRING,
        effective_from TIMESTAMP,
        effective_to DATE,
        inserted_datetime TIMESTAMP,
        updated_datetime TIMESTAMP
    )
    USING DELTA
""")

# Apply SCD Type 2 Merge
spark.sql("""
    MERGE INTO de_pyspark_training_catalog.buddy_group_1.amanolov_gold_dim_customers_exam AS target
    USING staging_customers AS source
    ON target.cust_nk = source.cust_nk AND target.effective_to = '9999-12-31'

    WHEN MATCHED AND (
        target.cust_address_country_id <> source.cust_address_country_id OR
        target.cust_address_state_province <> source.cust_address_state_province OR
        target.cust_address_city <> source.cust_address_city OR
        target.cust_address_postal_code <> source.cust_address_postal_code OR
        target.cust_address_street_address <> source.cust_address_street_address OR
        target.marital_status <> source.marital_status
    ) THEN
      -- Close the existing record
      UPDATE SET
        target.effective_to = source.consume_datetime,
        target.updated_datetime = current_timestamp()

    WHEN MATCHED THEN
      -- SCD Type 1 fields update in place
      UPDATE SET
        target.cust_first_name = source.cust_first_name,
        target.cust_last_name = source.cust_last_name,
        target.cust_phone_number = source.cust_phone_number,
        target.cust_email = source.cust_email,
        target.account_mgr_id = source.account_mgr_id,
        target.date_of_birth = source.date_of_birth,
        target.gender = source.gender,
        target.updated_datetime = current_timestamp()

    WHEN NOT MATCHED THEN
      -- Insert new record
      INSERT (
        cust_sk,
        cust_nk,
        cust_first_name,
        cust_last_name,
        cust_address_country_id,
        cust_address_state_province,
        cust_address_city,
        cust_address_postal_code,
        cust_address_street_address,
        cust_phone_number,
        cust_email,
        account_mgr_id,
        date_of_birth,
        marital_status,
        gender,
        effective_from,
        effective_to,
        inserted_datetime,
        updated_datetime
      )
      VALUES (
        source.cust_sk,
        source.cust_nk,
        source.cust_first_name,
        source.cust_last_name,
        source.cust_address_country_id,
        source.cust_address_state_province,
        source.cust_address_city,
        source.cust_address_postal_code,
        source.cust_address_street_address,
        source.cust_phone_number,
        source.cust_email,
        source.account_mgr_id,
        source.date_of_birth,
        source.marital_status,
        source.gender,
        source.effective_from,
        source.effective_to,
        current_timestamp(),
        current_timestamp()
      )
""")



In [0]:
#################
##### Gold ######
#################

###################
## dim_products ##
###################

from pyspark.sql.functions import col, lit, xxhash64, current_timestamp

# Load silver products
silver_products_df = spark.table("de_pyspark_training_catalog.buddy_group_1.amanolov_silver_products_exam")

# Build gold dim_products (staging DataFrame)
staging_products_df = silver_products_df.select(
    col("product_id").alias("product_nk"),
    col("product_name"),
    col("category_name"),
    col("weight_class"),
    col("product_status"),
    col("list_price"),
    col("min_price"),
    "consume_datetime"
)

# Add surrogate key
staging_products_df = staging_products_df.withColumn("product_sk", xxhash64("product_nk"))

# Add control columns
staging_products_df = staging_products_df.withColumn("inserted_datetime", current_timestamp())
staging_products_df = staging_products_df.withColumn("updated_datetime", current_timestamp())
staging_products_df = staging_products_df.withColumn("effective_from", col("consume_datetime"))
staging_products_df = staging_products_df.withColumn("effective_to", lit("9999-12-31").cast("date"))

# Save staging DataFrame as a temporary view
staging_products_df.createOrReplaceTempView("staging_products")

# Create gold dimension table if not exists
spark.sql("""
    CREATE TABLE IF NOT EXISTS de_pyspark_training_catalog.buddy_group_1.amanolov_gold_dim_products_exam (
        product_sk BIGINT,
        product_nk STRING,
        product_name STRING,
        category_name STRING,
        weight_class STRING,
        product_status STRING,
        list_price DOUBLE,
        min_price DOUBLE,
        consume_datetime TIMESTAMP,
        effective_from TIMESTAMP,
        effective_to DATE,
        inserted_datetime TIMESTAMP,
        updated_datetime TIMESTAMP
    )
    USING DELTA
""")

# Apply SCD Type 2 Merge
spark.sql("""
    MERGE INTO de_pyspark_training_catalog.buddy_group_1.amanolov_gold_dim_products_exam AS target
    USING staging_products AS source
    ON target.product_nk = source.product_nk AND target.effective_to = '9999-12-31'

    WHEN MATCHED AND (
        target.category_name <> source.category_name OR
        target.product_status <> source.product_status OR
        target.list_price <> source.list_price
    ) THEN
      -- Close the existing record
      UPDATE SET
        target.effective_to = source.consume_datetime,
        target.updated_datetime = current_timestamp()

    WHEN MATCHED THEN
      -- SCD Type 1 fields update in place
      UPDATE SET
        target.product_name = source.product_name,
        target.weight_class = source.weight_class,
        target.min_price = source.min_price,
        target.updated_datetime = current_timestamp()

    WHEN NOT MATCHED THEN
      -- Insert new record
      INSERT (
        product_sk,
        product_nk,
        product_name,
        category_name,
        weight_class,
        product_status,
        list_price,
        min_price,
        consume_datetime,
        effective_from,
        effective_to,
        inserted_datetime,
        updated_datetime
      )
      VALUES (
        source.product_sk,
        source.product_nk,
        source.product_name,
        source.category_name,
        source.weight_class,
        source.product_status,
        source.list_price,
        source.min_price,
        source.consume_datetime,
        source.effective_from,
        source.effective_to,
        current_timestamp(),
        current_timestamp()
      )
""")

In [0]:
#################
##### Gold ######
#################

###################
### fact_orders ###
###################

from pyspark.sql.functions import col, lit, xxhash64, current_timestamp, concat_ws

# Load silver order_items table containing line_item_id, product_id
silver_order_items_df = spark.table("de_pyspark_training_catalog.buddy_group_1.amanolov_silver_order_items_exam")

# Load silver orders table containing customer_id, order details
silver_orders_df = spark.table("de_pyspark_training_catalog.buddy_group_1.amanolov_silver_orders_exam")

# Load gold dims
dim_customers_df = spark.table("de_pyspark_training_catalog.buddy_group_1.amanolov_gold_dim_customers_exam")
dim_products_df = spark.table("de_pyspark_training_catalog.buddy_group_1.amanolov_gold_dim_products_exam")

# Join order_items with orders to get customer_id and order details
fact_orders_df = silver_order_items_df.join(
    silver_orders_df.drop("inserted_datetime"),  # Drop to avoid column name duplication
    on="order_id",
    how="left"
)

# Build order_nk (natural key)
fact_orders_df = fact_orders_df.withColumn(
    "order_nk",
    concat_ws("|",
              col("order_id").cast("string"),
              col("line_item_id").cast("string"),
              col("customer_id").cast("string"),
              col("product_id").cast("string"))
)

# Build surrogate key
fact_orders_df = fact_orders_df.withColumn("order_sk", xxhash64("order_nk"))

# SCD2 Join to dim_customers (Time-aware)
fact_orders_df = fact_orders_df.join(
    dim_customers_df.select("cust_sk", "cust_nk", "effective_from", "effective_to"),
    (fact_orders_df.customer_id == dim_customers_df.cust_nk) &
    (fact_orders_df.order_date >= dim_customers_df.effective_from) &
    (fact_orders_df.order_date < dim_customers_df.effective_to),
    how="left"
)

# SCD2 Join to dim_products (Time-aware)
fact_orders_df = fact_orders_df.join(
    dim_products_df.select("product_sk", "product_nk", "effective_from", "effective_to"),
    (fact_orders_df.product_id == dim_products_df.product_nk) &
    (fact_orders_df.order_date >= dim_products_df.effective_from) &
    (fact_orders_df.order_date < dim_products_df.effective_to),
    how="left"
)

# Add inserted_datetime
fact_orders_df = fact_orders_df.withColumn("inserted_datetime", current_timestamp())

# Select final columns per the PDF
fact_orders_df = fact_orders_df.select(
    "order_sk",
    "cust_sk",
    "product_sk",
    "order_nk",
    col("customer_id").alias("customer_nk"),
    col("product_id").alias("product_nk"),
    "order_id",
    "line_item_id",
    "order_date",
    "order_mode",
    "order_status",
    "unit_price",
    "quantity",
    "inserted_datetime"
)

# Save as gold fact table
fact_orders_df.write.format("delta").mode("overwrite").saveAsTable("de_pyspark_training_catalog.buddy_group_1.amanolov_gold_fact_orders_exam")

