##Silver Sales Details Table

##### Spark Read Sales details from Bronze Table and Create a new DataFrame

#####Initilization

In [0]:
from pyspark.sql.functions import col, when, row_number
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

#####Spark Read Table and create a new DataFrame 

In [0]:
df = spark.table('dlh.bronze_db.bronze_sales_details')

#####Trimmer Function

In [0]:
def trimmer(df: DataFrame) -> DataFrame:
    for field in df.schema.fields:
        if isinstance(field.dataType, StringType):
            df = df.withColumn("field.name", F.trim(F.col(field.name)))
    return df

In [0]:
df1 = trimmer(df)

#####Data Exploration on Nulls

In [0]:
df_check = (df1
        .groupBy("sls_ord_num")
        .agg(F.count("sls_ord_num").alias("ord_count"))
        .filter(col("ord_count") > 1))

In [0]:
df_check = ( df1.filter(col("sls_ord_num").isNull()))

#####Cleaning Dates

In [0]:
df2 = ( df1
            .withColumn("sls_order_dt", 
                        F.when( ( F.length(col("sls_order_dt")) == 0) | 
                                ( F.length(col("sls_order_dt")) != 8),
                                None )
                        .otherwise(
                            F.to_date(col("sls_order_dt")
                                      .cast("string")
                                      ,"yyyyMMdd"))
                        )
            .withColumn("sls_ship_dt",
                            F.when( ( F.length(col("sls_ship_dt")) == 0)|
                                     ( F.length(col("sls_ship_dt")) != 8)
                                     ,None)
                            .otherwise(
                                F.to_date(col("sls_ship_dt")
                                          .cast("string"),"yyyyMMdd"))
                            
                        )
            .withColumn("sls_due_dt", 
                        F.when( ( F.length(col("sls_due_dt")) == 0) |
                                ( F.length(col("sls_due_dt")) != 8)
                                ,None)
                        .otherwise(F.to_date(col("sls_due_dt").cast("string"),"yyyyMMdd"))
                        )
                       
)

#####Business Logic:
- `If the sls_sales is NULL or Negative or ZERO ==> price * quantity.` 
- `If Price is ZERO or NULL , calculate it using Sales and Quantity.`
- `If Price is Negative , convert it to a Positive value.`

In [0]:
(df2
    .filter(
            (col("sls_sales") != col("sls_quantity") * col("sls_price")) | 
            (col("sls_sales").isNull()) |
            (col("sls_sales") <= 0) |
            (col("sls_quantity").isNull()) |
            (col("sls_quantity") <= 0) |
            (col("sls_price").isNull()) |
            (col("sls_price") <= 0)
            )
    .select(col("sls_sales"), col("sls_quantity"),col("sls_price"))).display()

sls_sales,sls_quantity,sls_price
10.0,2,
25.0,5,
70.0,2,
9.0,1,
35.0,1,
100.0,10,
16.0,2,
769.0,1,-769.0
30.0,1,-30.0
22.0,1,-22.0


In [0]:
# When sls_sales is NULL -> Make it by (quantity * price)
# When sls_sales is 0 | Negative -> Make it by (price * quantity)
# When sls_sales is less than 0 ("Negative") 
# -> Make it by (price * quantity)

df3  = (df2.withColumn("sls_sales",  
                       F.when(col("sls_sales").isNull(),
                              col("sls_quantity") * col("sls_price"))
                        .when(col("sls_sales") == 0 , 
                              col("sls_quantity") * col("sls_price"))
                       .when(col("sls_sales") < 0 , 
                             col("sls_quantity") * col("sls_price"))
                       .otherwise(col("sls_sales"))
))

In [0]:
### If Price is zero or null, calculate it using Sales divided by Quantity.
### If Price is negative, convert it to a positive value
df4 = (df3.withColumn("sls_price",  
                         F.when(
                             col("sls_price").isNull() | 
                             (col("sls_price") == 0),
                                            col("sls_sales")
                                            .cast("double") / 
                                            col("sls_quantity"))
                        .when(col("sls_price") < 0 , 
                              F.abs(col("sls_price")))
                        .when(col("sls_price") != 
                              (col("sls_sales").cast("double") / 
                               col("sls_quantity")),
                                            col("sls_sales")
                                            .cast("double") / 
                                            col("sls_quantity"))
                        .otherwise(col("sls_price"))
                    )
)

In [0]:
(df4
    .filter(
            (col("sls_sales") != col("sls_quantity") * col("sls_price")) | 
            (col("sls_sales").isNull()) |
            (col("sls_sales") <= 0) |
            (col("sls_quantity").isNull()) |
            (col("sls_quantity") <= 0) |
            (col("sls_price").isNull()) |
            (col("sls_price") <= 0)
            )
    .select(col("sls_sales"), col("sls_quantity"),col("sls_price"))).display()

sls_sales,sls_quantity,sls_price


#####Rename Function

In [0]:
RENAME_MAP ={

    "sls_ord_num": "order_number",
    "sls_prd_key": "product_number",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}

def renamed(df: DataFrame) -> DataFrame:
    for old_name, new_name in RENAME_MAP.items():
        df= df.withColumnRenamed(old_name,new_name)
    return df

In [0]:
df5 = renamed(df4)

In [0]:
df6 = df5.select("order_number",
                 "product_number",
                 "customer_id",
                 "order_date",
                 "ship_date",
                 "due_date",
                 "sales_amount",
                 "quantity",
                 "price")

In [0]:
df7 = df6.withColumn("ingest_ts", F.current_timestamp())

#####Write Sales details from DF to Silver sales details table as Delta table 

In [0]:
spark.sql("DROP TABLE IF EXISTS dlh.silver_db.silver_sales_details")
(df7.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("dlh.silver_db.silver_sales_details"))