# Initialization

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType , IntegerType
from pyspark.sql.functions import trim, col

# Read Bronze table

In [0]:
df=spark.table("bronze.crm_sales_details")

In [0]:
df.display()

In [0]:
df.dtypes

# Silver Transformations

## Checking if any column has NULLs
## 

In [0]:
# count the nulls in each column

null_stats = (
    df.select([
        F.count(F.when(F.col(c).isNull(), c)).alias(c)
        for c in df.columns
    ])
)

display(null_stats)


## Trimming

In [0]:
for field in df.schema.fields:
  if field.dataType == StringType():
    df = df.withColumn(field.name, trim(col(field.name)))
df.display()

In [0]:
# check if date columns have 0s
cols = ["sls_due_dt", "sls_order_dt", "sls_ship_dt"]
zero_stats = (
    df.select([
        F.count(F.when(F.col(c)==0, c)).alias(c)
        for c in cols
    ])
)

display(zero_stats)

## Cleaning Dates

In [0]:
cols = ["sls_due_dt", "sls_order_dt", "sls_ship_dt"]

for c in cols:
  df = df.withColumn(c, 
                     F.when(
                         (col(c)==0) | (F.length(col(c))!=8), None) 
                     .otherwise(F.to_date(col(c).cast("string"), "yyyyMMdd"))) 

In [0]:
df.display()

In [0]:
df.dtypes

## Sales and Price Corrections

In [0]:
from functools import reduce
#check which rows are NULL

condition = reduce(lambda a, b: a | b, (F.col(c).isNull()for c in df.columns))
df.filter(condition).display()

In [0]:
df = (
    df
    .withColumn(
        "sls_price",
        F.when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0),
            F.when(
                col("sls_quantity") != 0,
                col("sls_sales") / col("sls_quantity")
            ).otherwise(None)
        ).otherwise(col("sls_price"))
    )
)


### Sales=quantity*price

In [0]:
# count the nulls in each column

null_stats = (
    df.select([
        F.count(F.when(F.col(c).isNull(), c)).alias(c)
        for c in df.columns
    ])
)

display(null_stats)

In [0]:
df.filter(F.col("sls_sales").isNull()).display()


In [0]:
df.filter(F.col("sls_sales")<0).display()

In [0]:
df = df.withColumn(
    "sls_sales",
    F.when(
        col("sls_sales").isNull() | (col("sls_sales") < 0),
        col("sls_quantity") * col("sls_price")
    ).otherwise(col("sls_sales"))

)

In [0]:
df.filter(F.col("sls_sales")<0).display()

In [0]:

null_stats = (
    df.select([
        F.count(F.when(F.col(c).isNull(), c)).alias(c)
        for c in df.columns
    ])
)

display(null_stats)

## Renaming the Columns

In [0]:
RENAME_MAP = {
    "sls_ord_num": "order_number",
    "sls_prd_key": "product_number",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}

In [0]:
for old,new in RENAME_MAP.items():
  df = df.withColumnRenamed(old,new)
df.limit(5).display()

## Sanity checks of dataframe

In [0]:
# check NULLs in renamed columns

null_stats = (
    df.select([
        F.count(F.when(F.col(c).isNull(), c)).alias(c)
        for c in df.columns
    ])
)

display(null_stats)

In [0]:
df.limit(10).display()

In [0]:
df.dtypes

# Write Into Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_sales")

## Sanity checks of Silver table

In [0]:
%sql
SELECT * FROM workspace.silver.crm_sales LIMIT 10

In [0]:
%sql
SELECT * FROM workspace.silver.crm_sales 
WHERE order_date>ship_date OR order_date>due_date 