### Clean orders from silver and load it to Silver

1. Spaces around values = 1005
2. Missing `customer_id` = 332
3. Missing or invalid `order_id` = 239
4. Mixed date formats = 4120
5. Amount has `N/A` (non-numeric) = 482
6. Data types stored as strings.



In [0]:
# imports
from pyspark.sql.functions import trim, col, when, monotonically_increasing_id, lit, coalesce, length, try_to_date, round, expr

In [0]:
bronze_df = spark.table('retailx.bronze.orders')
bronze_df.printSchema()
bronze_df.show(5)

root
 |-- order_id: long (nullable = true)
 |-- customer_id: double (nullable = true)
 |-- order_date: string (nullable = true)
 |-- amount: string (nullable = true)

+--------+-----------+----------+-------+
|order_id|customer_id|order_date| amount|
+--------+-----------+----------+-------+
|       1|     9572.0|02/17/2025|3770.03|
|       2|     2028.0|25-08-2025| 8249.9|
|       3|     9187.0|01/28/2025|5127.71|
|       4|     1550.0|16-03-2025|2600.61|
|       5|     7174.0|12/01/2024|6445.43|
+--------+-----------+----------+-------+
only showing top 5 rows


In [0]:
# Clean order_id column

df_order_id_clean = bronze_df.withColumn('order_id', trim(col('order_id')))\
                            .withColumn("order_sk", monotonically_increasing_id() +1)\
                            .drop('order_id')\
                            .withColumn('order_sk', col('order_sk').cast('int'))

df_order_id_clean.show(5)
df_order_id_clean.printSchema()

+-----------+----------+-------+--------+
|customer_id|order_date| amount|order_sk|
+-----------+----------+-------+--------+
|     9572.0|02/17/2025|3770.03|       1|
|     2028.0|25-08-2025| 8249.9|       2|
|     9187.0|01/28/2025|5127.71|       3|
|     1550.0|16-03-2025|2600.61|       4|
|     7174.0|12/01/2024|6445.43|       5|
+-----------+----------+-------+--------+
only showing top 5 rows
root
 |-- customer_id: double (nullable = true)
 |-- order_date: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- order_sk: integer (nullable = false)



In [0]:
# clean customer_id column

customer_fix_df = df_order_id_clean.withColumn('customer_id', trim(col('customer_id')))\
                                    .withColumn('customer_id', col('customer_id').cast('float').cast('int'))\
                                    .withColumn('customer_id', when(col('customer_id').isNull(), 1).otherwise(col('customer_id')))
                                    
customer_fix_df.printSchema()
#customer_fix_df.filter(col('customer_id').isNull()).show(5)
customer_fix_df.show(5)


root
 |-- customer_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- order_sk: integer (nullable = false)

+-----------+----------+-------+--------+
|customer_id|order_date| amount|order_sk|
+-----------+----------+-------+--------+
|       9572|02/17/2025|3770.03|       1|
|       2028|25-08-2025| 8249.9|       2|
|       9187|01/28/2025|5127.71|       3|
|       1550|16-03-2025|2600.61|       4|
|       7174|12/01/2024|6445.43|       5|
+-----------+----------+-------+--------+
only showing top 5 rows


In [0]:
# Clean order_date

order_date_fix_df = customer_fix_df.withColumn('order_date', trim(col('order_date')))\
                                    .withColumn('order_date',
                                                coalesce(
                                                    try_to_date(col("order_date"), "MM/dd/yyyy"),
                                                    try_to_date(col("order_date"), "dd-MM-yyyy"), 
                                                    try_to_date(col("order_date"), "MM-dd-yyyy"),
                                                    try_to_date(col("order_date"), "dd/MM/yyyy"),
                                                    try_to_date(col("order_date"), "yyyy-MM-dd")
                                                    )
                                    )
order_date_fix_df.printSchema()
#order_date_fix_df.filter(col('order_date').isNull()).show(5)
order_date_fix_df.show(5)

root
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- amount: string (nullable = true)
 |-- order_sk: integer (nullable = false)

+-----------+----------+-------+--------+
|customer_id|order_date| amount|order_sk|
+-----------+----------+-------+--------+
|       9572|2025-02-17|3770.03|       1|
|       2028|2025-08-25| 8249.9|       2|
|       9187|2025-01-28|5127.71|       3|
|       1550|2025-03-16|2600.61|       4|
|       7174|2024-12-01|6445.43|       5|
+-----------+----------+-------+--------+
only showing top 5 rows


In [0]:
# Clean amount column

amount_fix_df = (
    order_date_fix_df
        .withColumn("amount", trim(col("amount")))
        .withColumn("amount", when(col("amount") == "N/A", None).otherwise(col("amount")))
        .withColumn("amount", expr("try_cast(amount as double)"))
        .withColumn("amount", round(col("amount"), 2))
)


                                
amount_fix_df.printSchema()
amount_fix_df.show(5)




root
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- amount: double (nullable = true)
 |-- order_sk: integer (nullable = false)

+-----------+----------+-------+--------+
|customer_id|order_date| amount|order_sk|
+-----------+----------+-------+--------+
|       9572|2025-02-17|3770.03|       1|
|       2028|2025-08-25| 8249.9|       2|
|       9187|2025-01-28|5127.71|       3|
|       1550|2025-03-16|2600.61|       4|
|       7174|2024-12-01|6445.43|       5|
+-----------+----------+-------+--------+
only showing top 5 rows


In [0]:
silver_df = amount_fix_df.select('order_sk', 'customer_id', 'order_date', 'amount')
silver_df.show(5)
display(silver_df, 5)

+--------+-----------+----------+-------+
|order_sk|customer_id|order_date| amount|
+--------+-----------+----------+-------+
|       1|       9572|2025-02-17|3770.03|
|       2|       2028|2025-08-25| 8249.9|
|       3|       9187|2025-01-28|5127.71|
|       4|       1550|2025-03-16|2600.61|
|       5|       7174|2024-12-01|6445.43|
+--------+-----------+----------+-------+
only showing top 5 rows


order_sk,customer_id,order_date,amount
1,9572,2025-02-17,3770.03
2,2028,2025-08-25,8249.9
3,9187,2025-01-28,5127.71
4,1550,2025-03-16,2600.61
5,7174,2024-12-01,6445.43
6,8037,2025-01-25,4454.87
7,5935,2024-12-08,8522.81
8,4163,2025-08-01,
9,7235,2025-06-08,5595.78
10,9632,2025-09-20,914.95


In [0]:
# Loading as Delta Silver
silver_df.write\
        .format('delta')\
        .mode('overwrite')\
        .saveAsTable('retailx.silver.orders')

In [0]:
%sql
select * from retailx.silver.orders limit 5;

order_sk,customer_id,order_date,amount
1,9572,2025-02-17,3770.03
2,2028,2025-08-25,8249.9
3,9187,2025-01-28,5127.71
4,1550,2025-03-16,2600.61
5,7174,2024-12-01,6445.43
