## Initialization

In [0]:

import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

## Lets read Bronze table and validate the data

In [0]:
df_bronze = spark.table("salesdb.bronze.crm_sales_data")
display(df_bronze)


## Lets do Silver Transformations  (lets fix the  schema and column names)

In [0]:
# for field in df_bronze.schema.fields:
#   print(field.name, field.dataType)

RENAME_MAP = {
    "Cust_ID": ("customer_id", "string"),
    "Cust_Name": ("customer_name", "string"),
    "Email": ("email", "string"),
    "Phone": ("phone", "string"),
    "Country": ("country", "string"),
    "State": ("state", "string"),
    "City": ("city", "string"),
    "Prodct_ID": ("product_id", "string"),
    "Prodct_Name": ("product_name", "string"),
    "Cat": ("category", "string"),
    "Price": ("price", "int"),
    "Qty": ("quantity", "int"),
    "Ord_ID": ("order_id", "string"),
    "Ord_Date": ("order_date", "string")    ,
    "SalesRep_ID": ("sales_rep_id", "string"),
    "SalesRep_Name": ("sales_rep_name", "string"),
    "Region": ("region", "string")  
}


# Rename columns
for old_name, (new_name, new_data_type) in RENAME_MAP.items():
    df_bronze = df_bronze.withColumnRenamed(old_name, new_name)

# Cast columns to specified types
for _, (new_name, new_data_type) in RENAME_MAP.items():
    df_bronze = df_bronze.withColumn(new_name, col(new_name).cast(new_data_type))

# Display the transformed DataFrame and validate the data and columns 
display(df_bronze)



## Look like their are some invalid data with null price , lets fix these invalid rows 

In [0]:
df_with_not_null = df_bronze.filter(col("price").isNotNull())
display(df_with_not_null)


## Lets check if any string columns has values with leading or trailing spaces

In [0]:
from pyspark.sql.functions import col, trim

for field in df_with_not_null.schema.fields:
    if isinstance(field.dataType, StringType):
        not_trimmed_df = df_with_not_null.filter(col(field.name) != trim(col(field.name)))
        if not_trimmed_df.count() > 0:
            display(not_trimmed_df.select(field.name).distinct())

## Trim all the string columns values

In [0]:
for field in df_with_not_null.schema.fields:
    if isinstance(field.dataType, StringType):
        df_with_not_null = df_with_not_null.withColumn(field.name, trim(col(field.name)))

## Order date format is inconsistent, lets format all the date values to yyyy/MM/dd 

In [0]:
from pyspark.sql.functions import to_date, col, when

# Normalize to yyyy-MM-dd
df_fixed_dt = df_with_not_null.withColumn(
    "order_date",
    when(col("order_date").rlike("^\\d{2}/\\d{2}/\\d{2}$"), to_date(col("order_date"), "MM/dd/yy"))
    .when(col("order_date").rlike("^\\d{4}-\\d{2}-\\d{2}$"), to_date(col("order_date"), "yyyy-MM-dd"))
    .when(col("order_date").rlike("^\\d{4}/\\d{2}/\\d{2}$"), to_date(col("order_date"), "yyyy/MM/dd"))
)

display(df_fixed_dt)

## look like data is clean now , lets write into the silver layer table


In [0]:
df_fixed_dt.write.mode("overwrite").format("delta").saveAsTable("salesdb.silver.crm_sales_data") 

## lets check the data from silver layer table 

In [0]:
%sql
select * from salesdb.silver.crm_sales_data

## check if column values are trimmed correctly


In [0]:
%sql
select customer_name, length(customer_name),
 length(trim(customer_name))  from salesdb.silver.crm_sales_data