In [5]:
# Bronze Layer - Ingest JSON from ADLS

df_bronze = spark.read.parquet("abfss://retailrevenue@tjnewstorage71.dfs.core.windows.net/bronze/manish040596/azure-data-engineer-projects/refs/heads/main/retail_transactions_bronze.parquet")
df_bronze.show()



StatementMeta(tjsparkpool71, 0, 6, Finished, Available, Finished)

+--------+----------+-----------+--------------------+--------------+----------+----------------+-------+---------+---------+
|event_id|event_type|customer_id|     event_timestamp|payment_method|product_id|product_category| amount| location|   status|
+--------+----------+-----------+--------------------+--------------+----------+----------------+-------+---------+---------+
|   E1000|    cancel|        C11|2024-05-04T15:00:00Z|   Credit Card|     P1016|     Electronics|    0.0|Bangalore|Cancelled|
|   E1001|    cancel|         C1|2024-05-20T17:00:00Z|   Credit Card|     P1018|       Groceries|    0.0|  Kolkata|Cancelled|
|   E1002|    cancel|        C10|2024-05-24T20:00:00Z|          Cash|     P1002|     Electronics|    0.0|   Mumbai|Cancelled|
|   E1003|    refund|        C15|2024-05-23T02:00:00Z|   Credit Card|     P1019|     Electronics|-139.21|  Chennai| Refunded|
|   E1004|    refund|        C14|2024-05-28T00:00:00Z|           UPI|     P1018|     Electronics| -397.0|  Kolkata| Re

In [None]:
# Silver Layer - Filter purchase & clean data

from pyspark.sql.functions import col, to_date, lower

df_silver = (
    df_bronze
    .filter(col("event_type") == "purchase")
    .dropna(subset=["customer_id", "amount"])
    .withColumn("event_date", to_date(col("event_timestamp")))
    .withColumn("payment_method", lower(col("payment_method")))
    .withColumn("amount", col("amount").cast("float"))
    .select(
        "event_id", "customer_id", "event_date", "product_id",
        "product_category", "payment_method", "amount", "location"
    )
)
df_silver.write.mode("overwrite").parquet("abfss://retailrevenue@tjnewstorage71.dfs.core.windows.net/silver/")

In [7]:
# Gold Layer - Aggregates

from pyspark.sql.functions import sum, count, col

df_silver = spark.read.parquet("abfss://retailrevenue@tjnewstorage71.dfs.core.windows.net/silver/")

df_silver.show()


StatementMeta(tjsparkpool71, 0, 8, Finished, Available, Finished)

+--------+-----------+----------+----------+----------------+--------------+------+---------+
|event_id|customer_id|event_date|product_id|product_category|payment_method|amount| location|
+--------+-----------+----------+----------+----------------+--------------+------+---------+
|   E1005|        C16|2024-05-27|     P1003|     Electronics|          cash|249.18|  Kolkata|
|   E1007|           |2024-05-28|     P1003|      Home Decor|   net banking|379.66|   Mumbai|
|   E1011|        C14|2024-05-24|     P1002|        Clothing|    debit card|162.41|  Kolkata|
|   E1016|         C6|2024-05-21|     P1007|      Home Decor|   net banking|164.06|Bangalore|
|   E1020|        C18|2024-05-06|     P1006|     Electronics|          cash|213.54|  Chennai|
|   E1021|         C9|2024-05-23|     P1017|           Books|   net banking|311.96|  Kolkata|
|   E1023|         C3|2024-05-25|     P1007|      Home Decor|          cash|130.08|    Delhi|
|   E1026|         C7|2024-05-24|     P1010|       Groceries

In [9]:
df_daily_revenue = (
    df_silver.groupBy("event_date")
    .agg(sum("amount").alias("daily_revenue"), count("*").alias("total_purchases"))
)

df_top_categories = (
    df_silver.groupBy("product_category")
    .agg(sum("amount").alias("Total Sales"))
    .orderBy(col("Total Sales").desc())
)


df_daily_revenue.write.mode("overwrite").parquet("abfss://retailrevenue@tjnewstorage71.dfs.core.windows.net/gold/daily_revenue")
df_top_categories.write.mode("overwrite").parquet("abfss://retailrevenue@tjnewstorage71.dfs.core.windows.net/gold/top_categories")

StatementMeta(tjsparkpool71, 0, 10, Finished, Available, Finished)

In [10]:
df_daily_revenue.show()

StatementMeta(tjsparkpool71, 0, 11, Finished, Available, Finished)

+----------+------------------+---------------+
|event_date|     daily_revenue|total_purchases|
+----------+------------------+---------------+
|2024-05-30|            242.75|              1|
|2024-05-25| 130.0800018310547|              1|
|2024-05-19| 450.4300079345703|              2|
|2024-05-05|197.07000732421875|              1|
|2024-05-29| 779.0400085449219|              2|
|2024-05-23| 427.9599914550781|              2|
|2024-05-21|1066.2099914550781|              3|
|2024-05-15| 391.1600036621094|              1|
|2024-05-10| 811.1199951171875|              2|
|2024-05-09| 412.4599914550781|              1|
|2024-05-31| 480.9599914550781|              1|
|2024-05-16| 1078.739990234375|              4|
|2024-05-20| 298.6199951171875|              1|
|2024-05-26| 740.5499954223633|              3|
|2024-05-17| 559.2900085449219|              2|
|2024-05-24| 808.2399978637695|              4|
|2024-05-04|  320.260009765625|              1|
|2024-05-27| 322.5899963378906|         

In [11]:
df_top_categories.show()

StatementMeta(tjsparkpool71, 0, 12, Finished, Available, Finished)

+----------------+------------------+
|product_category|       Total Sales|
+----------------+------------------+
|     Electronics|3116.0300064086914|
|       Groceries| 3107.269989013672|
|           Books|2129.0799865722656|
|      Home Decor|1949.0900192260742|
|        Clothing|1617.3899841308594|
+----------------+------------------+

