In [0]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("transactions-bronze-to-silver").getOrCreate()

In [0]:
df = spark.read.json("/Volumes/workspace/data/datasets/transactions.json")
display(df)

customer_id,items,payment_type,store_id,timestamp,total_amount,transaction_id
C1,"[{""qty"": 2, ""price"": 25, ""product_id"": ""P1""}]",card,S1,2025-11-01T15:33:40.748Z,50.0,2


In [0]:
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col, explode, to_timestamp

items_schema = ArrayType(
    StructType([
        StructField("product_id", StringType(), True),
        StructField("qty", DoubleType(), True),
        StructField("price", DoubleType(), True)
    ])
)

df = df.withColumn("items_array", from_json(col("items"), items_schema))

# 4️⃣ Explode the array into individual rows
df_exploded = df.withColumn("item", explode(col("items_array")))

# 5️⃣ Flatten out item attributes
df_final = df_exploded.select(
    "transaction_id",
    "store_id",
    to_timestamp("timestamp").alias("ts"),
    col("item.product_id").alias("product_id"),
    col("item.qty").alias("qty"),
    col("item.price").alias("price")
)

# flat = raw.selectExpr("transaction_id", "store_id", "timestamp", "explode(items) as item")

In [0]:
df_final.show()

+--------------+--------+--------------------+----------+---+-----+
|transaction_id|store_id|                  ts|product_id|qty|price|
+--------------+--------+--------------------+----------+---+-----+
|             2|      S1|2025-11-01 15:33:...|        P1|2.0| 25.0|
+--------------+--------+--------------------+----------+---+-----+



In [0]:
df.show()

+-----------+--------------------+------------+--------+--------------------+------------+--------------+--------------------+
|customer_id|               items|payment_type|store_id|           timestamp|total_amount|transaction_id|         items_array|
+-----------+--------------------+------------+--------+--------------------+------------+--------------+--------------------+
|         C1|[{"qty": 2, "pric...|        card|      S1|2025-11-01T15:33:...|        50.0|             2|[{NULL, NULL, 25.0}]|
+-----------+--------------------+------------+--------+--------------------+------------+--------------+--------------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, sum as _sum, countDistinct

spark= SparkSession.builder.appName("silver-to-gold").getOrCreate()

silver= spark.read.parquet('/Volumes/workspace/data/datasets/silver-retail.parquet')


date,store_id,total_sales,total_transactions
2025-11-01,S1,50.0,1


In [0]:
silver_agg= silver.withColumn("sales_amount", col("price")*col("qty")).withColumn("date", to_date(col("ts")))
display(silver)

transaction_id,store_id,ts,product_id,qty,price
2,S1,2025-11-01T15:33:40.748Z,P1,2.0,25.0


In [0]:
# Aggregation-1: daily sales summary
daily_sales = silver_agg.groupBy("date", "store_id") \
                .agg(
                    _sum("sales_amount").alias("total_sales"), 
                    countDistinct("transaction_id").alias("total_transactions")
                )
display(daily_sales)

date,store_id,total_sales,total_transactions
2025-11-01,S1,50.0,1


In [0]:
# Aggregation-2: Product Performance
prod_perf= silver_agg.groupBy("product_id").agg(
    _sum(col("qty")).alias("total_sold"),
    _sum(col("sales_amount")).alias("total_sales"))

display(prod_perf)

product_id,total_sold,total_sales
P1,2.0,50.0
