In [0]:
bronze_df = spark.read.table("workspace.default.bronze_table")

In [0]:
bronze_df.show(10)

+--------------+-----------+----------------+------+----------------+------+
|transaction_id|customer_id|transaction_date|amount|product_category|region|
+--------------+-----------+----------------+------+----------------+------+
|          T001|      C1001|      01/15/2023|150.75|     Electronics|    US|
|          T002|      C1002|      01/16/2023|  NULL|        Clothing|    UK|
|          T003|      C1001|      01/17/2023| -25.0|     electronics|    US|
|          T004|      C1003|      01/18/2023| 300.5|           Books|  NULL|
|          T005|      C1002|      01/18/2023| 75.25|        CLOTHING|    UK|
|          T006|      C1004|      01/19/2023| 200.0|           Books|    US|
|          T007|      C1001|      01/15/2023|150.75|     Electronics|    US|
|          T008|      C1005|      01/20/2023| 450.0| Home Appliances|    US|
|          T009|      C1006|      01/21/2023| 89.99|     electronics|    UK|
|          T010|      C1003|      01/22/2023| 120.0|           Books|    US|

In [0]:
bronze_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- product_category: string (nullable = true)
 |-- region: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
bronze_df.groupBy(bronze_df.columns).agg(count("*").alias("count")).filter("count > 1").show()


+--------------+-----------+----------------+------+----------------+------+-----+
|transaction_id|customer_id|transaction_date|amount|product_category|region|count|
+--------------+-----------+----------------+------+----------------+------+-----+
+--------------+-----------+----------------+------+----------------+------+-----+



In [0]:
bronze_df = bronze_df.fillna({"amount": 0})


In [0]:
bronze_df.filter(col("amount").isNull()).count()

0

In [0]:
bronze_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- amount: double (nullable = false)
 |-- product_category: string (nullable = true)
 |-- region: string (nullable = true)



In [0]:
bronze_df = bronze_df.withColumn("amount", when(col('amount') > 0, col('amount')).otherwise(0))



In [0]:
bronze_df = bronze_df.withColumn("transaction_date", regexp_replace("transaction_date", "-", "/"))
bronze_df.orderBy("transaction_id",ascending=False).show()


+--------------+-----------+----------------+------+----------------+------+
|transaction_id|customer_id|transaction_date|amount|product_category|region|
+--------------+-----------+----------------+------+----------------+------+
|          T050|      C1005|      03/03/2023| 500.0|     Electronics|    UK|
|          T049|      C1004|      03/02/2023| 230.0| Home Appliances|    US|
|          T048|      C1003|      03/01/2023| 340.0|           Books|    US|
|          T047|      C1002|      02/28/2023| 70.25|        CLOTHING|    UK|
|          T046|      C1001|      02/27/2023| 190.0|     Electronics|    US|
|          T045|      C1010|      02/26/2023| 150.0|        Clothing|    US|
|          T044|      C1009|      02/25/2023|   0.0| Home Appliances|    UK|
|          T043|      C1008|      02/24/2023| 420.0|           Books|    US|
|          T042|      C1007|      02/23/2023|  95.0|     electronics|    UK|
|          T041|      C1006|      02/22/2023| 110.0|        Clothing|    US|

In [0]:
bronze_df = bronze_df.withColumn('transaction_date', to_date(col('transaction_date'), 'MM/dd/yyyy'))

In [0]:
bronze_df.show()


+--------------+-----------+----------------+------+----------------+------+
|transaction_id|customer_id|transaction_date|amount|product_category|region|
+--------------+-----------+----------------+------+----------------+------+
|          T001|      C1001|      2023-01-15|150.75|     Electronics|    US|
|          T002|      C1002|      2023-01-16|   0.0|        Clothing|    UK|
|          T003|      C1001|      2023-01-17|   0.0|     electronics|    US|
|          T004|      C1003|      2023-01-18| 300.5|           Books|  NULL|
|          T005|      C1002|      2023-01-18| 75.25|        CLOTHING|    UK|
|          T006|      C1004|      2023-01-19| 200.0|           Books|    US|
|          T007|      C1001|      2023-01-15|150.75|     Electronics|    US|
|          T008|      C1005|      2023-01-20| 450.0| Home Appliances|    US|
|          T009|      C1006|      2023-01-21| 89.99|     electronics|    UK|
|          T010|      C1003|      2023-01-22| 120.0|           Books|    US|

In [0]:
bronze_df = bronze_df.withColumn('product_category', lower(col('product_category')))
bronze_df = bronze_df.withColumn('region', when(col('region').isNull(), 'Unknown' ).otherwise(col('region')))
bronze_df.show()

+--------------+-----------+----------------+------+----------------+-------+
|transaction_id|customer_id|transaction_date|amount|product_category| region|
+--------------+-----------+----------------+------+----------------+-------+
|          T001|      C1001|      2023-01-15|150.75|     electronics|     US|
|          T002|      C1002|      2023-01-16|   0.0|        clothing|     UK|
|          T003|      C1001|      2023-01-17|   0.0|     electronics|     US|
|          T004|      C1003|      2023-01-18| 300.5|           books|Unknown|
|          T005|      C1002|      2023-01-18| 75.25|        clothing|     UK|
|          T006|      C1004|      2023-01-19| 200.0|           books|     US|
|          T007|      C1001|      2023-01-15|150.75|     electronics|     US|
|          T008|      C1005|      2023-01-20| 450.0| home appliances|     US|
|          T009|      C1006|      2023-01-21| 89.99|     electronics|     UK|
|          T010|      C1003|      2023-01-22| 120.0|           b

In [0]:
bronze_df.write.format('delta').mode("overwrite").saveAsTable("workspace.default.silver_table")