In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

#Read it from bronze layer

In [0]:
df = spark.table("workspace.bronze.prd_info_info")

# data cleaning 


#Triming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))


#Product Key Parsing


In [0]:
df = df.withColumn("cat_id", F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_"))
df = df.withColumn("prd_key", F.substring(col("prd_key"), 7, F.length(col("prd_key"))))

# Cost Cleanup the null 


In [0]:
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

#Normalization|

In [0]:

df = (
    df
    # Normalize product line
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)
     

#Renaming Columns


In [0]:

RENAME_MAP = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}

for old_name , new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name , new_name)

In [0]:
df = df.withColumn("start_date", col("end_date").cast(DateType()))

In [0]:
df.display()

product_id,product_number,product_name,product_cost,product_line,start_date,end_date,category_id
210,FR-R92B-58,HL Road Frame - Black- 58,0,Road,,,CO_RF
211,FR-R92R-58,HL Road Frame - Red- 58,0,Road,,,CO_RF
212,HL-U509-R,Sport-100 Helmet- Red,12,Other Sales,2007-12-28,2007-12-28,AC_HE
213,HL-U509-R,Sport-100 Helmet- Red,14,Other Sales,2008-12-27,2008-12-27,AC_HE
214,HL-U509-R,Sport-100 Helmet- Red,13,Other Sales,,,AC_HE
215,HL-U509,Sport-100 Helmet- Black,12,Other Sales,2007-12-28,2007-12-28,AC_HE
216,HL-U509,Sport-100 Helmet- Black,14,Other Sales,2008-12-27,2008-12-27,AC_HE
217,HL-U509,Sport-100 Helmet- Black,13,Other Sales,,,AC_HE
218,SO-B909-M,Mountain Bike Socks- M,3,Mountain,2007-12-28,2007-12-28,CL_SO
219,SO-B909-L,Mountain Bike Socks- L,3,Mountain,2007-12-28,2007-12-28,CL_SO


In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")

In [0]:
%sql 
select * from workspace.silver.crm_products

product_id,product_number,product_name,product_cost,product_line,start_date,end_date,category_id
210,FR-R92B-58,HL Road Frame - Black- 58,0,Road,,,CO_RF
211,FR-R92R-58,HL Road Frame - Red- 58,0,Road,,,CO_RF
212,HL-U509-R,Sport-100 Helmet- Red,12,Other Sales,2007-12-28,2007-12-28,AC_HE
213,HL-U509-R,Sport-100 Helmet- Red,14,Other Sales,2008-12-27,2008-12-27,AC_HE
214,HL-U509-R,Sport-100 Helmet- Red,13,Other Sales,,,AC_HE
215,HL-U509,Sport-100 Helmet- Black,12,Other Sales,2007-12-28,2007-12-28,AC_HE
216,HL-U509,Sport-100 Helmet- Black,14,Other Sales,2008-12-27,2008-12-27,AC_HE
217,HL-U509,Sport-100 Helmet- Black,13,Other Sales,,,AC_HE
218,SO-B909-M,Mountain Bike Socks- M,3,Mountain,2007-12-28,2007-12-28,CL_SO
219,SO-B909-L,Mountain Bike Socks- L,3,Mountain,2007-12-28,2007-12-28,CL_SO
