In [0]:


bronze_df = spark.read.table("workspace.default.customer_transactions_large")

bronze_df.write.format("delta").mode("overwrite").saveAsTable("default.customer_transactions_bronze")

spark.sql("SELECT * FROM default.customer_transactions_bronze").show()



+--------------+-----------+----------------+------+----------------+------+
|transaction_id|customer_id|transaction_date|amount|product_category|region|
+--------------+-----------+----------------+------+----------------+------+
|          T001|      C1001|      2023-01-15|150.75|     Electronics|    US|
|          T002|      C1002|      2023-01-16|  NULL|        Clothing|    UK|
|          T003|      C1001|      2023-01-17| -25.0|     electronics|    US|
|          T004|      C1003|      2023-01-18| 300.5|           Books|  NULL|
|          T005|      C1002|      2023-01-18| 75.25|        CLOTHING|    UK|
|          T006|      C1004|      2023-01-19| 200.0|           Books|    US|
|          T007|      C1001|      2023-01-15|150.75|     Electronics|    US|
|          T008|      C1005|      2023-01-20| 450.0| Home Appliances|    US|
|          T009|      C1006|      2023-01-21| 89.99|     electronics|    UK|
|          T010|      C1003|      2023-01-22| 120.0|           Books|    US|

In [0]:
from pyspark.sql.functions import col, to_date, regexp_replace, when

bronze_df = spark.read.table("workspace.default.customer_transactions_bronze")

silver_df = (
    bronze_df
    .withColumn("transaction_Date", regexp_replace(col("transaction_date"), "-", "/"))
    .withColumn("transaction_Date", to_date(col("transaction_Date"), "yyyy/MM/dd"))
    .dropna()
    .dropDuplicates()
    .withColumn("amount", when(col("amount") < 0, -col("amount")).otherwise(col("amount")))
)

silver_df.write.format("delta").mode("overwrite").saveAsTable("default.customer_transactions_silver")

spark.sql("SELECT * FROM default.customer_transactions_silver").show()



+--------------+-----------+----------------+------+----------------+------+
|transaction_id|customer_id|transaction_Date|amount|product_category|region|
+--------------+-----------+----------------+------+----------------+------+
|          T032|      C1007|      2023-02-13|  30.0|           Books|    UK|
|          T048|      C1003|      2023-03-01| 340.0|           Books|    US|
|          T044|      C1009|      2023-02-25|  20.0| Home Appliances|    UK|
|          T007|      C1001|      2023-01-15|150.75|     Electronics|    US|
|          T016|      C1010|      2023-01-28| 99.95|     Electronics|    US|
|          T013|      C1008|      2023-01-25|750.25|     Electronics|    US|
|          T042|      C1007|      2023-02-23|  95.0|     electronics|    UK|
|          T015|      C1009|      2023-01-27| 220.0| Home Appliances|    UK|
|          T001|      C1001|      2023-01-15|150.75|     Electronics|    US|
|          T019|      C1003|      2023-01-31|280.75|           Books|    US|

In [0]:
from pyspark.sql.functions import count, sum, avg, max

silver_df = spark.read.table("default.customer_transactions_silver")

gold_df = (
    silver_df.groupBy("customer_id")
    .agg(
        count("*").alias("total_transactions"),
        sum("amount").alias("total_amount_spent"),
        avg("amount").alias("avg_transaction_value"),
        max("transaction_Date").alias("last_transaction_date")
    )
)

gold_df.write.format("delta").mode("overwrite").saveAsTable("default.customer_transactions_gold")

spark.sql("SELECT * FROM default.customer_transactions_gold").show()



+-----------+------------------+------------------+---------------------+---------------------+
|customer_id|total_transactions|total_amount_spent|avg_transaction_value|last_transaction_date|
+-----------+------------------+------------------+---------------------+---------------------+
|      C1003|                 4|           1050.75|             262.6875|           2023-03-01|
|      C1010|                 4|            509.95|             127.4875|           2023-02-26|
|      C1005|                 4|            1730.0|                432.5|           2023-03-03|
|      C1002|                 5|             340.8|                68.16|           2023-02-28|
|      C1007|                 3|            220.25|    73.41666666666667|           2023-02-23|
|      C1006|                 4|            290.49|              72.6225|           2023-02-22|
|      C1004|                 3|             555.0|                185.0|           2023-03-02|
|      C1009|                 4|        