# Initialization

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col
from pyspark.sql.window import Window

# Read from bronze table

In [0]:
df = spark.table("workspace.bronze.crm_prd_info")

# Silver Transformations

**_Renaming fields_**

In [0]:
RENAME_MAP = {
    "prd_id": "product_id",
    "prd_key": "product_key",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "product_start_date",
    "prd_end_dt": "product_end_date"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

**_Trim string columns_**

In [0]:
for field in df.schema.fields:
    if field.dataType == StringType():
        df = df.withColumn(field.name, trim(col(field.name)))


**_Clearing nulls_**

In [0]:
df = df.withColumn("product_cost", 
                   F.when(col("product_cost").isNull(), 0)
                   .otherwise(col("product_cost")))

**_product key parsing_**

In [0]:
df = (
    df
    .withColumn("cat_id", F.regexp_replace(F.substring(col("product_key"), 1, 5), "-", "_"))
    .withColumn("product_key", F.substring(col("product_key"), 7, F.length(col("product_key"))))
)

df.display()

**_productline normalization_**

In [0]:

df = (
    df
    # Normalize product line
    .withColumn(
        "product_line",
        F.when(F.upper(col("product_line")) == "M", "Mountain")
         .when(F.upper(col("product_line")) == "R", "Road")
         .when(F.upper(col("product_line")) == "S", "Other Sales")
         .when(F.upper(col("product_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)

**_Sanity checks of dataframe_**

In [0]:
df.limit(10).display()

**_write to silver table_**

In [0]:
df.write.mode("overwrite").saveAsTable("workspace.silver.crm_products")

**_sanity checks of silver table_**

In [0]:
%sql
SELECT * FROM workspace.silver.crm_products LIMIT 10;