#Import

In [0]:
from pyspark.sql.types import (IntegerType, StringType, DateType, TimestampType)
from pyspark.sql.functions import trim, col, when, upper

#Column Mapping

In [0]:
COLUMN_MAPPING = {
    "cst_id": {
        "name": "customer_id",
        "type": IntegerType()
    },
    "cst_key": {
        "name": "customer_key",
        "type": StringType()
    },
    "cst_firstname": {
        "name": "first_name",
        "type": StringType()
    },
    "cst_lastname": {
        "name": "last_name",
        "type": StringType()
    },
    "cst_marital_status": {
        "name": "marital_status",
        "type": StringType()
    },
    "cst_gndr": {
        "name": "gender",
        "type": StringType()
    },
    "cst_create_date": {
        "name": "created_date",
        "type": DateType()
    }
}

#Read Bronze Layer

In [0]:
df = spark.table("workspace.bronze.crm_cust_info")

In [0]:
df.display()

#Issues
#String data is not trimmed
#cst_marital_status and cst_gndr is in single character
#column names are not mapped

#Transformation

#String trimming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType): #trim string only
        df = df.withColumn(field.name, trim(col(field.name)))

#Data Transformation (Marital Status / Gender) - Single Character to Full

In [0]:
df = (
    df
    .withColumn(
        "cst_marital_status",
        when(upper(col("cst_marital_status")) == "S", "Single")
         .when(upper(col("cst_marital_status")) == "M", "Married")
         .otherwise("n/a")
    )
    .withColumn(
        "cst_gndr",
        when(upper(col("cst_gndr")) == "F", "Female")
         .when(upper(col("cst_gndr")) == "M", "Male")
         .otherwise("n/a")
    )
)

#Remove null values

In [0]:
df = df.filter(col('customer_id').isNotNull())

#Column Mapping (Column Name)

In [0]:
for old_name, meta in COLUMN_MAPPING.items():
    df = df.withColumnRenamed(old_name, meta['name'])

#Write to Silver Layer

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_customers")

#Sanity Check in Silver Layer

In [0]:
%sql
SELECT * FROM workspace.silver.crm_customers LIMIT 5;