# Read the Source Data

In [0]:
df = spark.table('workspace.bronze_pyspark.crm_sales')
display(df.limit(2))

# import methods & libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col, when, concat, coalesce, trim, lit, substring, regexp_replace,lead,abs, nullif
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Explore the source table

## Check for invalid dates

In [0]:
# Expectation No Result
(df.select(
    col('sls_order_dt')
    )
    .where(
              (col('sls_order_dt') == 0) | 
              (length(col('sls_order_dt')).cast('string') != 8) |
              (col('sls_order_dt') > 20500101) |
              (col('sls_order_dt') < 19000101) 
         )).display()
         # found 0s & length < 8, need to handle these

In [0]:
# Expectation No Result
(df.select(
    col('sls_ship_dt')
    )
    .where(
              (col('sls_ship_dt') == 0) | 
              (length(col('sls_ship_dt')).cast('string') != 8) |
              (col('sls_ship_dt') > 20500101) |
              (col('sls_ship_dt') < 19000101) 
         )).display()

In [0]:
# Expectation No Result
(df.select(
    col('sls_due_dt')
    )
    .where(
              (col('sls_due_dt') == 0) | 
              (length(col('sls_due_dt')).cast('string') != 8) |
              (col('sls_due_dt') > 20500101) |
              (col('sls_due_dt') < 19000101) 
         )).display()

## Check for order date < ship date or due date


In [0]:
# Expectation: No result
(df.select(
    col('sls_order_dt'),
    col('sls_ship_dt'),
    col('sls_due_dt')
)
.where (
    (col('sls_order_dt') > col('sls_ship_dt')) |
(col('sls_order_dt') > col('sls_due_dt'))
 )
 ).display()

## Check for Data Consistency

In [0]:
(df.select(col('sls_quantity'))
 .where(
     (col('sls_quantity') <= 0) | (col('sls_quantity').isNull())
 )).display()
 # no nulls or -ve quantity

In [0]:
(df.select(col('sls_price'))
 .where(
     (col('sls_price') <= 0) | (col('sls_price').isNull())
 )).display()
 # found -ve values, need to convert to ABS before using, nulls to be handled

In [0]:
(df.select(col('sls_sales'))
 .where(
     (col('sls_sales') < 0) | (col('sls_sales').isNull())
 )).display()
 # found nulls & -ve values in sales, need to handle

In [0]:
(df.select(col('sls_sales'))
 .where(
     col('sls_sales') != col('sls_quantity') * abs(col('sls_price'))
 )).display()
 # lot of records where sales != quantity*price

# transformations

In [0]:
df = spark.table("workspace.bronze_pyspark.crm_sales")
sales = df.select(
    col("sls_ord_num").alias("order_number"),
    col("sls_prd_key").alias("product_number"),
    col("sls_cust_id").alias("customer_number"),
    # the order_dt, ship_dt, due_dt columns are integer type, so this is casted to string type & then casted to date type
    when(
        (length(col("sls_order_dt").cast("string")) != 8) | (col("sls_order_dt") == 0),
        lit(None).cast("date"),
    )
    .otherwise(to_date(col("sls_order_dt").cast("string"), "yyyyMMdd"))
    .alias("order_date"),
    when(
        (length(col("sls_ship_dt").cast("string")) != 8) | (col("sls_ship_dt") == 0),
        lit(None).cast("date"),
    )
    .otherwise(to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd"))
    .alias("ship_date"),
    when(
        (length(col("sls_due_dt").cast("string")) != 8) | (col("sls_due_dt") == 0),
        lit(None).cast("date"),
    )
    .otherwise(to_date(col("sls_due_dt").cast("string"), "yyyyMMdd"))
    .alias("due_date"),
    # since sales amount was != quantity *price & sales is -ve and in some cases sales is Null transformation is done
    when(
        (col("sls_sales") != col("sls_quantity") * abs(col("sls_price")))
        | (col("sls_sales") <= 0)
        | (col("sls_sales").isNull()),
        col("sls_quantity") * col("sls_price"),
    )
    .otherwise(col("sls_sales"))
    .alias("sales_amount"),
    col("sls_quantity").alias("quantity"),
    # when price is -ve, or null the price was taken by deviding sales_amount by quantity, in case quantity = 0 then devided by null.
    when(
        (col("sls_price") <= 0) | (col("sls_price").isNull()),
        when(
            col("sls_quantity") != 0, col("sls_sales") / col("sls_quantity")
        ).otherwise(None),
    )
    .otherwise(col("sls_price"))
    .alias("price"),
)

# Drop Target table if it already exists

In [0]:
spark.sql("""DROP TABLE IF EXISTS workspace.silver_pyspark.crm_sales""")

# Create Target Table & Load the transformed data to it

In [0]:
sales.write.format('delta').mode('overwrite').saveAsTable('workspace.silver_pyspark.crm_sales')

# Sanity Checks of Target Table

In [0]:
df = spark.table('workspace.silver_pyspark.crm_sales')
df.display()

In [0]:
spark.sql("""SELECT COUNT(*) FROM workspace.silver_pyspark.crm_sales""").display()

## Check for invalid dates

In [0]:
# Expectation No Result
(df.select(
    col('order_date')
    )
    .where(
              (col('order_date').isNull()) |
              (col('order_date') > '2050-01-01') |
              (col('order_date') < '1900-01-01') 
         )).display()
         # found 0s & length < 8, need to handle these

In [0]:
# Expectation No Result
(df.select(
    col('ship_date')
    )
    .where(
              (col('ship_date').isNull()) |
              (col('ship_date') > '2050-01-01') |
              (col('ship_date') < '1900-01-01') 
         ) 
         ).display()

In [0]:
# Expectation No Result
(df.select(
    col('due_date')
    )
    .where(
              (col('due_date').isNull()) |
              (col('due_date') > '2050-01-01') |
              (col('due_date') < '1900-01-01') 
         ) 
         ).display()

## Check for order date < ship date or due date

In [0]:
# Expectation: No result
(df.select(
    col('order_date'),
    col('ship_date'),
    col('due_date')
)
.where (
    (col('order_date') > col('ship_date')) |
(col('order_date') > col('due_date'))
 )
 ).display()

## Check for Data Consistency

In [0]:
# Expectation: No result
(df.select(col('quantity'))
 .where(
     (col('quantity') <= 0) | (col('quantity').isNull())
 )).display()

In [0]:
# Expectation: No result
(df.select(col('price'))
 .where(
     (col('price') <= 0) | (col('price').isNull())
 )).display()

In [0]:
# Expectation: No result
(df.select(col('sales_amount'))
 .where(
     (col('sales_amount') < 0) | (col('sales_amount').isNull())
 )).display()

In [0]:
# Expectation: No result
(df.select(col('sales_amount'))
 .where(
     col('sales_amount') != col('quantity') * abs(col('price'))
 )).display()
 # no records

# View table changes

In [0]:
spark.sql("""DESCRIBE HISTORY workspace.silver_pyspark.crm_sales """).display()