# Initialization

In [0]:
%run "/Workspace/Users/amberasad0299@gmail.com/databricks_data_lakehouse_project/scripts/silver/silver_util"

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim, length

# Read Bronze Table

In [0]:
bronze_table_name = "crm_sales_details"
df = read_bronze(spark, bronze_table_name)

# Silver Transformations

## Trimming

In [0]:
df = trim_all_strings(df)

## Cleaning Dates

In [0]:
for date_col in ["sls_order_dt", "sls_ship_dt", "sls_due_dt"]:
    df = df.withColumn(
        date_col,
        F.when(
            (col(date_col) == 0) | (length(col(date_col)) != 8),
            None
        ).otherwise(F.to_date(col(date_col).cast("string"), "yyyyMMdd"))
    )

## Sales and Price Corrections

In [0]:
df = (
    df
    .withColumn(
        "sls_price",
        F.when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0),
            F.when(
                col("sls_quantity") != 0,
                col("sls_sales") / col("sls_quantity")
            ).otherwise(None)
        ).otherwise(col("sls_price"))
    )
)

## Renaming Columns

In [0]:
RENAME_MAP = {
    "sls_ord_num": "order_number",
    "sls_prd_key": "product_number",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}
df = rename_columns(df, RENAME_MAP)

## Sanity Check before Writing

In [0]:
df.limit(10).display()

# Writing Silver Table

In [0]:
silver_table_name = "crm_sales" 
write_silver(df, silver_table_name)

## Sanity Check after Writing

In [0]:
%sql
SELECT * FROM workspace.silver.crm_sales LIMIT 10