## **Initialization**

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, lit, trim
from pyspark.sql.window import Window


## **Reading From Bronze Layer**

In [0]:
df = spark.table("workspace.bronze.crm_prd_info")

## **Data Transformation**

### 1-Trimming String Type Data

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

### 2-Product Key Normalization and creation of new column "cat_id"

In [0]:
# Creation of the new column "cat_id" from prd_key column
df = df.withColumn("cat_id", F.regexp_replace(F.substring(col("prd_key"), 1, 5),"-", "_"))

# Product_key Parcing
df = df.withColumn("prd_key", F.substring(col("prd_key"), 7, F.length(col("prd_key"))))

### 3-Hundle Nulls in Cost column

In [0]:
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

### 4-Product Line Normalization

In [0]:
df = (
    df
    .withColumn(
        "prd_line",
        F.when(F.upper(F.col("prd_line")) == "R", "Road")
        .when(F.upper(F.col("prd_line")) == "S", "Other Sales")
        .when(F.upper(F.col("prd_line")) == "M", "Mountain")
        .when(F.upper(F.col("prd_line")) == "T", "Touring")
        .otherwise("Unknown")
        )
)

### 5-Converting/Casting all data in date column to Date type

In [0]:
df = df.withColumn("prd_start_dt", col("prd_start_dt").cast(DateType()))
df = df.withColumn("prd_end_dt", col("prd_end_dt").cast(DateType()))

### 6-Renaming Columns name

In [0]:
# Creating new column names in a dictionary
RENAME_MAP = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}

# Looping for all columns names and rename them
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

## **Write Into Silver Layer**

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")

## **Sanity Check Of Data Frame**

In [0]:
df.display()

In [0]:
%sql
select * from workspace.bronze.erp_px_cat_g1v2