In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

In [0]:
RENAME_MAP = {
    "cst_id" : "customer_id",
    "cst_key" : "customer_key",
    "cst_firstname" : "firstname",
    "cst_lastname" : "lastname",
    "cst_marital_status" : "marital_status",
    "cst_gndr" : "gender",
    "cst_create_date" : "create_date"
}

Read the data from bronze table

In [0]:
df = spark.table("workspace.bronze.crm_cust_info")

Data transformation

In [0]:
df = df.filter(col("cst_id").isNotNull())

In [0]:
df.display()

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))


In [0]:
df.display()

normalization

In [0]:
df = (
    df
    .withColumn(
        "cst_marital_status",
        F.when(F.upper(F.col("cst_marital_status")) == "S","Single")
         .when(F.upper(F.col("cst_marital_status")) == "M","Married")
        .otherwise("n/a")
    )
    .withColumn(
        "cst_gndr",
        F.when(F.upper(F.col("cst_gndr")) == "M","Male")
         .when(F.upper(F.col("cst_gndr")) == "F","Female")
         .otherwise("n/a")
    )  
)

Renameing the columns

In [0]:
for old_name, new_name in RENAME_MAP.items():
     df = df.withColumnRenamed(old_name,new_name)


In [0]:
df.display()

In [0]:
(
    df.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("silver.crm_customers")
)

In [0]:
%sql
select * FROM workspace.silver.crm_customers