# Initialization

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

# import trimming function
from script.silver.silver_utils import trim_string_columns

# Read Bronze table

In [0]:
df = spark.table("workspace.bronze.crm_prd_info")

# Silver Transformations

## Trimming

In [0]:
# silver_utils.py trimming function
df = trim_string_columns(df)

## Product Key Parsing

In [0]:
df = df.withColumn("cat_id", 
                    F.regexp_replace(F.substring(
                            col("prd_key"),1, 5)
                            , "-", "_"))

df = df.withColumn("prd_key", 
                   F.substring(
                    col("prd_key"), 7, 
                    F.length(col("prd_key"))))

## Cost Cleanup

In [0]:
# replace nulls with 0s
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

## Product Line Normalization

In [0]:
df = (
    df
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M",
               "Mountain")
         .when(F.upper(col("prd_line")) == "R",
               "Road")
         .when(F.upper(col("prd_line")) == "S",
               "Other Sales")
         .when(F.upper(col("prd_line")) == "T",
               "Touring")
         .otherwise("n/a")
    )
)

## Date Casting

In [0]:
df = df.withColumn("prd_start_dt", col("prd_start_dt").cast(DateType()))

## Renaming Columns

In [0]:
rename_map = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
    }

for old_name, new_name in rename_map.items():
    df = df.withColumnRenamed(old_name, new_name)

## Sanity checks of dataframe

In [0]:
df.limit(10).display()

In [0]:
# should return no Nulls
df.filter(col("product_cost").isNull()).display()

In [0]:
# value counts for normalised attributes
display(
    df.groupBy("category_id", "product_line")
      .count()
      .orderBy("count"), ascending=False
)

# Writing Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")

## Sanity checks of silver table

In [0]:
%sql
SELECT *
FROM workspace.silver.crm_products
LIMIT 10;