In [0]:
#Initialization

import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import trim, col
     

In [0]:
#Read Bronze table

df = spark.table("workspace.bronze.erp_loc_a101")
     

In [0]:
#Silver Transformations
#Trimming

for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

In [0]:
#Customer ID Cleanup

df = df.withColumn("cid", F.regexp_replace(col("cid"), "-", ""))

In [0]:
#Country Normalization

df = df.withColumn(
    "cntry",
    F.when(col("cntry") == "DE", "Germany")
     .when(col("cntry").isin("US", "USA"), "United States")
     .when((col("cntry") == "") | col("cntry").isNull(), "n/a")
     .otherwise(col("cntry"))
)

In [0]:
#Renaming Columns

RENAME_MAP = {
    "cid": "customer_number",
    "cntry": "country"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

In [0]:
#Sanity checks of dataframe

df.limit(10).display()

In [0]:
#Writing Silver Table

df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_customer_location")

In [0]:
%sql
---Sanity checks of silver table---
SELECT * FROM workspace.silver.erp_customer_location LIMIT 10