In [0]:

from pyspark.sql.functions import (
    col, when, substring, regexp_replace
)

# 1. Ensure schema exists
spark.sql("CREATE DATABASE IF NOT EXISTS lakehouse.silver")

# 2. Read Bronze table
bronze_df = spark.table("lakehouse.bronze.crm_prd")

# 3. Clean data (equivalent to DELETE WHERE prd_id IS NULL)
bronze_clean_df = bronze_df.filter(col("prd_id").isNotNull())

# 4. Transformations
silver_df = (
    bronze_clean_df
    .withColumn("prd_id", col("prd_id").cast("int"))
    .withColumn("prd_key", col("prd_key").cast("string"))
    .withColumn("prd_cost", col("prd_cost").cast("double"))
    .withColumn("prd_name", col("prd_nm").cast("string"))
    .withColumn("prd_start_dt", col("prd_start_dt").cast("date"))
    .withColumn("prd_end_dt", col("prd_end_dt").cast("date"))
    .withColumn(
        "prd_key_px_cat",
        substring(regexp_replace(col("prd_key"), "-", "_"), 1, 5)
    )
    .withColumn(
        "prd_key_sales",
        substring(col("prd_key"), 7, 100)
    )
    .withColumn(
        "prd_line",
        when(col("prd_line") == "S", "Sport")
        .when(col("prd_line") == "R", "Racing")
        .when(col("prd_line") == "M", "Medium")
        .when(col("prd_line") == "T", "Turbo/Tech/Touring")
        .otherwise(col("prd_line"))
    )
)

# 5. Write to Silver layer
(
    silver_df
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("lakehouse.silver.crm_prdd")
)

print("Silver CRM product table created successfully")
