# Initialization

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DataType, IntegerType
from pyspark.sql.functions import col, trim, length
from pyspark.sql.window import Window

# Read bronze table

In [0]:
df = spark.table("workspace.bronze.erp_cust_az12")

# Silver Transformation

## Trim

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

## clean gender

In [0]:
df = (
    df
    .withColumn(
        "GEN",
        F.when(F.upper(F.col("gen")).isin("F", "FEMALE"), "Female")
        .when(F.upper(F.col("gen")).isin("M", "MALE"), "Male")
        .otherwise("n/a")
    )
)

## correct date

In [0]:
df = df.withColumn(
  "BDATE",
  F.when(F.col("BDATE").isNull() | (F.col("BDATE") > F.current_date()), None)
  .otherwise(col("BDATE"))
)

# Rename

In [0]:
RENAME_MAP = {
    "CUST_ID": "customer_id",
    "BIRTH_DATE": "birth_date",
    "GENDER": "gender"
}

for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

# Write to silver erp table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_customers")

## Sanity check

In [0]:
%sql
SELECT * from workspace.silver.erp_customers

In [0]:
# Remove the first 3 letters of the customer ID, so it's consistent with customer id on other tables

silver_df = spark.table("workspace.silver.erp_customers")

silver_df = silver_df.withColumn(
    "customer_id",
    F.when(col("customer_id").startswith("NAS"),
           F.substring(col("customer_id"), 4, F.length(col("customer_id"))))
     .otherwise(col("customer_id"))
)

silver_df.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.erp_customers")

In [0]:
%sql
-- FInal check

SELECT *
FROM workspace.silver.erp_customers