# Clean sales tabels

## import Necessory function

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.window import Window
from pyspark.sql.functions import col, trim, current_timestamp

## Load the data

In [0]:
df = spark.read.table("dev_project.bronze.crm_sales_details")

## Remove the extra spaces in string col

In [0]:
for field in df.schema.fields:
  if isinstance(field.dataType, StringType):
    df = df.withColumn(field.name, trim(col(field.name)))
    

## Date Cleaning

In [0]:
df =(
    df
    .withColumn(
        "sls_order_dt",
        F.when(
            (col("sls_order_dt") == 0) | (F.length(col("sls_order_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_order_dt").cast("string"), "yyyyMMdd"))
    )
    .withColumn(
        "sls_ship_dt",
        F.when(
            (col("sls_ship_dt") == 0) | (F.length(col("sls_ship_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd"))
        )
    .withColumn(
        "sls_due_dt",
        F.when(
            (col("sls_due_dt") == 0) | (F.length(col("sls_due_dt")) != 8),
            None
        ).otherwise(F.to_date(col("sls_due_dt").cast("string"), "yyyyMMdd"))
    )
)
    

## Sales & Price Correction

In [0]:
# something odd in sales column
# 

df = (
    df
    .withColumn(
        "sls_sales",
        F.when(
            (col("sls_sales").isNull()) | (F.length(col("sls_sales")) != 8),
            # inner condition
            F.when(
                # if quantity is zero, then sales is null
                # prevent divide by zero
                col("sls_quantity") != 0,
                col("sls_sales")/col("sls_quantity")
            ).otherwise(None)
            ).otherwise(col("sls_sales"))
            )
     )


## Rename Column

In [0]:
RENAME_MAP = {
    "sls_ord_num": "order_number",
    "sls_prd_key": "product_number",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

df = df.withColumn("_ingest_time", current_timestamp())

## Writing Silver Table

In [0]:
df.write.mode("overwrite").saveAsTable("dev_project.silver.crm_sales_details")  

In [0]:
%sql
SELECT * FROM dev_project.silver.crm_sales_details LIMIT 10;

In [0]:
df.display()