**Initialization**

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType,DateType
from pyspark.sql.functions import col, trim,upper
from pyspark.sql.window import Window





**Read Bronze Table**

In [0]:
df = spark.read.table("workspace.bronze.crm_prd_info")


**Silver Transformations**

**Trim all string columns**

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))



**Product Key Parsing**

In [0]:
df = df.withColumn(
    "cat_id",
    F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_")
)

df = df.withColumn(
    "prd_key",
    F.substring(col("prd_key"), 7, F.length(col("prd_key")))
)


**Cost Cleanup**

In [0]:
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

**Product Line Normalization**

In [0]:
df = (
    df
   
    .withColumn(
        "prd_line",
        F.when(upper(trim(col("prd_line"))) == "M", "Mountain")
         .when(upper(trim(col("prd_line"))) == "R", "Road")
         .when(upper(trim(col("prd_line"))) == "S", "Other Sales")
         .when(upper(trim(col("prd_line"))) == "T", "Touring")
         .otherwise("n/a")
    )  
)



***Date Casting***

In [0]:
df = df.withColumn("prd_start_dt",col("prd_start_dt").cast(DateType()))


**Rename Columns**

In [0]:
df = (
    df
    .withColumnRenamed("prd_id", "product_id")
    .withColumnRenamed("cat_id", "category_id")
    .withColumnRenamed("prd_key", "product_number")
    .withColumnRenamed("prd_nm", "product_name")
    .withColumnRenamed("prd_cost", "product_cost")
    .withColumnRenamed("prd_line", "product_line")
    .withColumnRenamed("prd_start_dt", "start_date")
    .withColumnRenamed("prd_end_dt", "end_date")
   
)



**Writing Silver Table**

In [0]:
df.write \
  .mode("overwrite") \
  .format("delta") \
  .saveAsTable("workspace.silver.crm_products")


**Sanity checks of Silver Table**

In [0]:
%sql
SELECT * FROM workspace.silver.crm_products;