In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import trim, col

## Reading From Bronze Layer

In [0]:
product_info = spark.table("workspace.bronze.crm_product_info")


## Transformation

In [0]:
# Renaming the columns

column_map = {
  "prd_id": "product_id",
  "prd_key": "product_key",
  "prd_nm": "product_number",
  "prd_cost": "product_cost",
  "prd_line": "product_line",
  "prd_start_dt": "start_date",
  "prd_end_dt": "end_date"
}

In [0]:
for old_column_name, new_column_name in column_map.items():
    product_info = product_info.withColumnRenamed(old_column_name, new_column_name)

In [0]:
# Extract category_id column from prd_key and modify prd_key

product_info = product_info.withColumn("category_id", F.regexp_replace(F.substring(col("product_key"), 1, 5), "-", "_"))
product_info = product_info.withColumn("product_key", F.substring(col("product_key"), 7, F.length(col("product_key"))))


In [0]:
# Trimming the string column's value and remove all internal spaces

for field in product_info.schema.fields:
    if isinstance(field.dataType, StringType):
        product_info = product_info.withColumn(
            field.name, 
            F.trim(F.regexp_replace(F.col(field.name), r"\s+", ""))
        )

In [0]:
# Product_line Normalization

product_info = (
    product_info.withColumn("product_line",
        F.when(F.upper(col("product_line")) == "M", "Mountain")
         .when(F.upper(col("product_line")) == "R", "Road")
         .when(F.upper(col("product_line")) == "S", "Other Sales")
         .when(F.upper(col("product_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)

In [0]:
# Standardize date types

date_cols_to_fix = ['start_date', 'end_date']

for col_name in date_cols_to_fix:
    if col_name in product_info.columns:
        product_info = product_info.withColumn(
            col_name,
            F.col(col_name).cast(DateType())
        ) 

## Writing to Silver Table

In [0]:
product_info.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_product_info")