# Imports and bronze table reading

In [0]:
import pyspark.sql.functions as func
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

In [0]:
df = spark.table("workspace.bronze.erp_px_cat_g1v2")
df.display()

# Transformations

### Trimming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

### Normalizing "Maintenance" Data Type to Boolean

In [0]:
df = df.withColumn(
    "maintenance",
    func.when(func.upper(col("maintenance")) == "YES", func.lit(True))
     .when(func.upper(col("maintenance")) == "NO", func.lit(False))
     .otherwise(None)
)

### Renaming Columns

In [0]:
RENAME_MAP = {
    "id": "category_id",
    "cat": "category",
    "subcat": "subcategory",
    "maintenance": "maintenance_flag"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

### Sanity checks

In [0]:
df.limit(10).display()

# Writing Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_product_category")

### Sanity checks of silver table

In [0]:
%sql
SELECT * FROM workspace.silver.erp_product_category LIMIT 10