# Init

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim, col

# Read from Bronze Layer  

In [0]:
df = spark.table("workspace.bronze.crm_sales_details")

# Data Transformations


In [0]:
df.display()

## Trim columns

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType): 
        df = df.withColumn(field.name, trim(col(field.name)))

In [0]:
df.display()

## Cleaning dates


In [0]:
df = (
    df.withColumn(
        "sls_order_dt",
        f.when(
            (col("sls_order_dt") == 0) | (f.length(col("sls_order_dt")) != 8), None
        ).otherwise(f.to_date(f.col("sls_order_dt").cast("string"), "yyyyMMdd"))
    )
    .withColumn(
        "sls_ship_dt",
        f.when(
            (col("sls_ship_dt") == 0) | (f.length(col("sls_ship_dt")) != 8), None
        ).otherwise(f.to_date(f.col("sls_ship_dt").cast("string"), "yyyyMMdd"))
    )
    .withColumn(
        "sls_due_dt",
        f.when(
            (col("sls_due_dt") == 0) | (f.length(col("sls_due_dt")) != 8), None
        ).otherwise(f.to_date(f.col("sls_due_dt").cast("string"), "yyyyMMdd"))
    )
)

In [0]:
df.display()

## Sales and price correction 

In [0]:
df = (
    df.withColumn(
        "sls_sales",
        f.when(
            (col("sls_sales").isNull()) 
            | (col("sls_sales") <= 0) 
            | (col("sls_sales") != col("sls_quantity") * f.abs(col("sls_price"))), 
            col("sls_quantity") * f.abs(col("sls_price"))
        ).otherwise(col("sls_sales"))
    )
)

In [0]:
df.display()

In [0]:
df = (
    df.withColumn(
        "sls_price",
        f.when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0), 
            col("sls_sales") / f.nullif(col("sls_quantity"), f.lit(0))  
        ).otherwise(col("sls_price"))
    )
)

In [0]:
df.display()

## Rename columns

In [0]:
RENAME_MAP = {
    "sls_ord_num": "order_number",
    "sls_prd_key": "product_number",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}

for old_name, new_name in RENAME_MAP.items():
  df = df.withColumnRenamed(old_name, new_name)

In [0]:
df.display()

# Write into Silver Table


In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("silver.crm_sales_details")

In [0]:
%sql
SELECT * FROM workspace.silver.crm_sales_details LIMIT 10