## Reading From Bronze Layer

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

In [0]:
customer_info = spark.table("workspace.bronze.crm_customer_info")

## Transformation

In [0]:
# Trimming the string column's value

for field in customer_info.schema.fields:
  if isinstance(field.dataType, StringType):
      customer_info = customer_info.withColumn(field.name, trim(col(field.name)))


In [0]:
# Normalization

customer_info = (
  customer_info.withColumn("cst_marital_status",
                           F.when(F.upper(F.col("cst_marital_status")) == "S", "Single")
                            .when(F.upper(F.col("cst_marital_status")) == "M", "Married")
                            .otherwise("n/a")

  )
  .withColumn("cst_gndr",
              F.when(F.upper(F.col("cst_gndr")) == "F", "Female")
               .when(F.upper(F.col("cst_gndr")) == "M", "Male")
               .otherwise("n/a")
  
  )
)


In [0]:
# Renaming the columns

column_map = {
  "cst_id": "customer_id",
  "cst_key": "customer_key",
  "cst_firstname": "first_name",
  "cst_lastname": "last_name",
  "cst_marital_status": "marital_status",
  "cst_gndr": "gender",
  "cst_create_date": "customer_creation_date"
}


In [0]:
for old_column_name, new_column_name in column_map.items():
    customer_info = customer_info.withColumnRenamed(old_column_name, new_column_name)


## Writing to Silver Table

In [0]:
customer_info.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_customer_info")