## Ingest Fact Data into Bronze Layer

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, BooleanType
import pyspark.sql.functions as F

In [0]:
catalog_name = 'ecommerce'

In [0]:
order_items_schema = StructType([
    StructField("dt",                 StringType(), True),
    StructField("order_ts",           StringType(), True),
    StructField("customer_id",        StringType(), True),
    StructField("order_id",           StringType(), True),
    StructField("item_seq",           StringType(), True),
    StructField("product_id",         StringType(), True),
    StructField("quantity",           StringType(), True),
    StructField("unit_price_currency",StringType(), True),
    StructField("unit_price",         StringType(), True),
    StructField("discount_pct",       StringType(), True),
    StructField("tax_amount",         StringType(), True),
    StructField("channel",            StringType(), True),
    StructField("coupon_code",        StringType(), True),
])

In [0]:
# Load data using the schema defined
raw_data_path = "/Volumes/ecommerce/source_data/raw/order_items/landing/*.csv"

df = spark.read.option("header", "true").option("delimiter", ",").schema(order_items_schema).csv(raw_data_path) \
    .withColumn("file_name", F.col("_metadata.file_path")) \
    .withColumn("ingest_timestamp", F.current_timestamp())

In [0]:
display(df.limit(5))

dt,order_ts,customer_id,order_id,item_seq,product_id,quantity,unit_price_currency,unit_price,discount_pct,tax_amount,channel,coupon_code,file_name,ingest_timestamp
2025-08-30,2025-08-30 12:46:50,CUST000000114495,676395,1,2000000136295,1,GBP,13,16%,1,app,NEW10,dbfs:/Volumes/ecommerce/source_data/raw/order_items/landing/order_items_2025-08-30.csv,2025-12-26T14:02:14.553Z
2025-08-30,2025-08-30 19:00:46,CUST000000167574,676396,1,2000000125442,1,AUD,23,7%,3,web,PRIME5,dbfs:/Volumes/ecommerce/source_data/raw/order_items/landing/order_items_2025-08-30.csv,2025-12-26T14:02:14.553Z
2025-08-30,2025-08-30 19:00:46,CUST000000167574,676396,2,2000000319490,1,AUD,487,4%,24,web,PRIME5,dbfs:/Volumes/ecommerce/source_data/raw/order_items/landing/order_items_2025-08-30.csv,2025-12-26T14:02:14.553Z
2025-08-30,2025-08-30 21:50:52,CUST000000091703,676397,1,2000000213705,1,INR,3199,2%,158,web,FEST20,dbfs:/Volumes/ecommerce/source_data/raw/order_items/landing/order_items_2025-08-30.csv,2025-12-26T14:02:14.553Z
2025-08-30,2025-08-30 23:23:09,CUST000000027113,676398,1,2000000383569,1,INR,1696,16%,72,app,,dbfs:/Volumes/ecommerce/source_data/raw/order_items/landing/order_items_2025-08-30.csv,2025-12-26T14:02:14.553Z


In [0]:
df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_order_items")