In [0]:
from pyspark.sql.functions import col, from_json, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, IntegerType

# ---- Define schemas ----

# ✅ Orders schema now supports nested 'items' array
orders_schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("payment_method", StringType()),
    StructField("order_status", StringType()),
    StructField("event_time", StringType()),
    StructField("shipping_city", StringType()),
    StructField("shipping_pincode", StringType()),
    StructField("ip_address", StringType()),
    StructField("items", ArrayType(
        StructType([
            StructField("product_id", StringType()),
            StructField("quantity", IntegerType()),
            StructField("unit_price", DoubleType()),
            StructField("amount", DoubleType())
        ])
    ))
])


# ✅ Payments schema (unchanged)
payments_schema = StructType([
    StructField("payment_id", StringType()),
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("payment_timestamp", StringType()),
    StructField("amount", DoubleType()),
    StructField("payment_status", StringType()),
    StructField("method", StringType())
])

# ✅ Shipments schema (unchanged)

shipments_schema = StructType([
    StructField("shipment_id", StringType(), nullable=False),
    StructField("order_id", StringType(), nullable=False),
    StructField("courier", StringType(), nullable=True),
    StructField("dispatched_at", StringType(), nullable=True),
    StructField("delivered_at", StringType(), nullable=True),  # Allow null values
    StructField("status", StringType(), nullable=True),
    StructField("tracking_url", StringType(), nullable=True)
])

schemas = {
    "orders": orders_schema,
    "payments": payments_schema,
    "shipments": shipments_schema
}

# ---- Dynamic ingestion function ----
def process_topic(topic, schema):
    raw_df = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "") #your kafka ip address
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss", "false")
        .load()
    )

    parsed_df = (
        raw_df
        .selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), schema).alias("data"))
        .select("data.*")
    )

    # Add event_date for partitioning (based on whichever time column exists)
    for ts_col in ["event_time", "payment_timestamp", "dispatched_at"]:
        if ts_col in parsed_df.columns:
            parsed_df = parsed_df.withColumn("event_date", to_date(col(ts_col)))

    # ✅ Write to Bronze tables
    (
        parsed_df.writeStream
        .format("delta")
        .option("checkpointLocation", f"/Volumes/e-commerce-project/bronze/checkpoint/{topic}")
        .option("mergeSchema", "true")
        .outputMode("append")
        .partitionBy("event_date")
        .trigger(once=True)
        .toTable(f"`e-commerce-project`.bronze.{topic}")
    )


# ---- Loop over topics ----
for topic, schema in schemas.items():
    process_topic(topic, schema)


In [0]:
%sql
select *from`e-commerce-project`.bronze.shipments

shipment_id,order_id,courier,dispatched_at,delivered_at,status,tracking_url,event_date
SHIP_ORD008_3,ORD008,EcomExpress,2025-10-16T07:30:00Z,2025-10-20T10:20:00Z,DELIVERED,https://track.ecom/ORD008,2025-10-16
SHIP_ORD008_2,ORD008,EcomExpress,2025-10-16T07:30:00Z,,DISPATCHED,https://track.ecom/ORD008,2025-10-16
SHIP_ORD006_3,ORD006,Delhivery,2025-10-16T18:00:00Z,2025-10-18T11:30:00Z,DELIVERED,https://track.delhivery/ORD006,2025-10-16
SHIP_ORD005_3,ORD005,BlueDart,2025-10-13T09:00:00Z,2025-10-14T17:45:00Z,DELIVERED,https://track.bluedart/ORD005,2025-10-13
SHIP_ORD005_2,ORD005,BlueDart,2025-10-13T09:00:00Z,,DISPATCHED,https://track.bluedart/ORD005,2025-10-13
SHIP_ORD004_3,ORD004,EcomExpress,2025-10-13T10:15:00Z,2025-10-14T13:20:00Z,DELIVERED,https://track.ecom/ORD004,2025-10-13
SHIP_ORD005_3,ORD005,BlueDart,2025-10-13T09:00:00Z,2025-10-14T17:45:00Z,DELIVERED,https://track.bluedart/ORD005,2025-10-13
SHIP_ORD005_2,ORD005,BlueDart,2025-10-13T09:00:00Z,,DISPATCHED,https://track.bluedart/ORD005,2025-10-13
SHIP_ORD004_3,ORD004,EcomExpress,2025-10-13T10:15:00Z,2025-10-14T13:20:00Z,DELIVERED,https://track.ecom/ORD004,2025-10-13
SHIP_ORD009_3,ORD009,BlueDart,2025-10-17T09:30:00Z,2025-10-19T12:00:00Z,DELIVERED,https://track.bluedart/ORD009,2025-10-17
