In [0]:
#INIT
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

In [0]:
#Reading from bronze
df = spark.table("workspace.bronze.erp_cust_az12")

In [0]:
#Trimming
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

In [0]:
#Cleaning Customer ID, if starts with NAS, remove NAS
df = df.withColumn(
    "cid",
    F.when(col("cid").startswith("NAS"),
           F.substring(col("cid"), 4, F.length(col("cid"))))
    .otherwise(col("cid"))
)

In [0]:
#Validating birth names

df = df.withColumn(
    "bdate",
    F.when(col("bdate") > F.current_date(), None)
     .otherwise(col("bdate"))
)
     


In [0]:
#Gender normalization
df = df.withColumn(
    "gen",
    F.when(F.upper(col("gen")).isin("F", "FEMALE"), "Female")
     .when(F.upper(col("gen")).isin("M", "MALE"), "Male")
     .otherwise("n/a")
)

In [0]:
#Renaming columns
RENAME_MAP = {
    "cid": "customer_number",
    "bdate": "birth_date",
    "gen": "gender"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)
     


In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_customers")