In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

## Explore head of the customer data

In [0]:
%sql
SELECT * FROM bike_lakehouse.bronze.crm_cust_info LIMIT 20;

### Transformation Logic
1. Trim the whitespace for string data type
2. Normalize marital status and gender
3. Rename the colum name


In [0]:
df = spark.table("bike_lakehouse.bronze.crm_cust_info")

In [0]:
## loop through the schema and trim the whitespace if the schema is StringType

for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, F.trim(F.col(field.name)))

In [0]:
display(df.limit(20))

In [0]:
# normalized the marital status
df = df.withColumn("cst_marital_status", 
                   F.when(F.col("cst_marital_status")== "M", "Married")
                   .when(F.col("cst_marital_status")== "S", "Single")
                   .otherwise("n/a") )

In [0]:
# normalize gender
df = df.withColumn("cst_gndr",
                   F.when(F.col("cst_gndr") == "M", "Male")
                   .when(F.col("cst_gndr") == "F", "Female")
                   .otherwise("n/a"))

Define mapping column name

In [0]:
COL_MAPPING = {
    "cst_id": "customer_id",
    "cst_key": "customer_key",
    "cst_firstname": "firstname",
    "cst_lastname": "lastname",
    "cst_marital_status": "marital_status",
    "cst_gndr": "gender",
    "cst_create_date": "create_date"
}

In [0]:
df = df.withColumnsRenamed(COL_MAPPING)

In [0]:
display(df.limit(20))

In [0]:
df.write.mode("overwrite")\
    .format("delta")\
    .saveAsTable("bike_lakehouse.silver.crm_customers")

In [0]:
%sql
SELECT * FROM bike_lakehouse.silver.crm_customers LIMIT 20;