In [0]:
%sql
select * from analytics.bronze.crm_sales_details


In [0]:
%sql
USE CATALOG analytics;
USE SCHEMA silver;

## Read Bronze table

In [0]:
df = spark.table("analytics.bronze.crm_sales_details")
df.display()

In [0]:
from pyspark.sql.functions import trim, col
from pyspark.sql.types import StringType

for f in df.schema.fields:
    if isinstance(f.dataType, StringType):
        df = df.withColumn(f.name, trim(col(f.name)))

df.display()

## Convert dates

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, length

df = (
    df
    .withColumn(
        "sls_order_dt",
        F.when(
            (col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8),
            None
        ).otherwise(
            F.to_date(col("sls_order_dt").cast("string"), "yyyyMMdd")
        )
    )
    .withColumn(
        "sls_ship_dt",
        F.when(
            (col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8),
            None
        ).otherwise(
            F.to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd")
        )
    )
    .withColumn(
        "sls_due_dt",
        F.when(
            (col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8),
            None
        ).otherwise(
            F.to_date(col("sls_due_dt").cast("string"), "yyyyMMdd")
        )
    )
)

df.display()

## Numeric checks

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

df = (
    df
    .withColumn(
        "sls_price",
        F.when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0),
            F.when(
                col("sls_quantity") != 0,
                col("sls_sales") / col("sls_quantity")
            ).otherwise(None)
        ).otherwise(col("sls_price"))
    )
)
df.display()

## Rename columns

In [0]:
df = (
    df
    .withColumnRenamed("sls_ord_num", "order_number")
    .withColumnRenamed("sls_prd_key", "product_number")
    .withColumnRenamed("sls_cust_id", "customer_id")
    .withColumnRenamed("sls_order_dt", "order_date")
    .withColumnRenamed("sls_ship_dt", "ship_date")
    .withColumnRenamed("sls_due_dt", "due_date")
    .withColumnRenamed("sls_sales", "sales_amount")
    .withColumnRenamed("sls_quantity", "quantity")
    .withColumnRenamed("sls_price", "price")

)

df.display()

## Write Silver table

In [0]:
df.write \
  .mode("overwrite") \
  .format("delta") \
  .saveAsTable("analytics.silver.crm_sales_details")

In [0]:
spark.table("silver.crm_sales_details").display()