In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import os
import logging
from dotenv import load_dotenv
import datetime

load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# spark = SparkSession.builder \
#     .appName("FoodDeliveryETL") \
#     .config("spark.jars.packages", 
#             "org.apache.hadoop:hadoop-aws:3.3.6,net.snowflake:spark-snowflake_2.13:3.10") \
#     .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
#     .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY")) \
#     .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_KEY")) \
#     .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
#     .getOrCreate()

spark = SparkSession.builder \
    .appName("FoodDeliveryETL") \
    .config("spark.jars.packages", 
            "org.apache.hadoop:hadoop-aws:3.3.6,net.snowflake:spark-snowflake_2.13:3.1.0") \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_KEY")) \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-2.amazonaws.com") \
    .getOrCreate()

# spark = SparkSession.bui

ModuleNotFoundError: No module named 'py4j'

In [13]:
df.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- service_type: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |    |    |-- unit_price: string (nullable = true)
 |    |    |-- total_price: string (nullable = true)
 |-- delivery_location: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- address: string (nullable = true)
 |-- delivery_fee: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- estimated_delivery_time: string (nullable = true)
 |-- payment_method: string (nullab

In [21]:
df = df.withColumn("event_timestamp", F.to_timestamp(df["event_timestamp"], "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("delivery_fee",  df["delivery_fee"].cast(FloatType()))\
        .withColumn("total_amount", df["total_amount"].cast(DoubleType()))
df = df.withColumn("estimated_delivery_time", F.to_timestamp(df["estimated_delivery_time"], "yyyy-MM-dd HH:mm:ss"))

In [22]:
df.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- order_id: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- service_type: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |    |    |-- unit_price: string (nullable = true)
 |    |    |-- total_price: string (nullable = true)
 |-- delivery_location: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- address: string (nullable = true)
 |-- delivery_fee: float (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- estimated_delivery_time: timestamp (nullable = true)
 |-- payment_method: string (n

In [19]:
df_exploded = df.select("order_id", F.explode("items").alias("item"))

# Now, select specific nested fields from the exploded 'item' struct
df_exploded.select(
    "order_id", 
    "item.item_id", 
    "item.name", 
    "item.quantity", 
    "item.unit_price", 
    "item.total_price"
).show(5, truncate=False)

[Stage 3:>                                                          (0 + 1) / 1]

+------------------------------------+-------+-----------+--------+----------+-----------+
|order_id                            |item_id|name       |quantity|unit_price|total_price|
+------------------------------------+-------+-----------+--------+----------+-----------+
|8d090ee5-a8c3-405f-aee0-afc7b28f162f|item_3 |Menu Item 3|3       |4985.0    |14955.0    |
|8d090ee5-a8c3-405f-aee0-afc7b28f162f|item_4 |Menu Item 4|1       |1479.0    |1479.0     |
|b17c605f-a93c-4c23-a7e9-9771e962bf08|item_5 |Menu Item 5|1       |4533.0    |4533.0     |
|b17c605f-a93c-4c23-a7e9-9771e962bf08|item_4 |Menu Item 4|3       |3833.0    |11499.0    |
|b17c605f-a93c-4c23-a7e9-9771e962bf08|item_2 |Menu Item 2|2       |2076.0    |4152.0     |
+------------------------------------+-------+-----------+--------+----------+-----------+
only showing top 5 rows



                                                                                

In [20]:
df_exploded.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- item: struct (nullable = true)
 |    |-- item_id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- quantity: long (nullable = true)
 |    |-- unit_price: string (nullable = true)
 |    |-- total_price: string (nullable = true)



In [25]:
df = df.filter(F.col("order_id").isNotNull() & F.col("merchant_id").isNotNull() & (F.col("total_amount") > 0))
df = df.withColumn("event_date", F.to_date("event_timestamp", "yyyyMMdd"))
df = df.withColumn("delivery_latitude", F.col("delivery_location.latitude")) \
        .withColumn("delivery_longitude", F.col("delivery_location.longitude")) \
        .drop("delivery_location")
df = df.withColumn("delivery_time_minutes", 
                    F.round((F.unix_timestamp("estimated_delivery_time") - 
                            F.unix_timestamp("event_timestamp")) / 60))
df = df.withColumn("is_delayed", F.when(F.col("delivery_time_minutes") > 45, True).otherwise(False))


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `delivery_location`.`latitude` cannot be resolved. Did you mean one of the following? [`delivery_latitude`, `delivery_longitude`, `delivery_time_minutes`, `delivery_fee`, `event_timestamp`].;
'Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, event_date#422, 'delivery_location.latitude AS delivery_latitude#441, delivery_longitude#277, delivery_time_minutes#312, is_delayed#330]
+- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, to_date(event_timestamp#183, Some(yyyy-MM-dd), Some(Etc/UTC), false) AS event_date#422, delivery_latitude#259, delivery_longitude#277, delivery_time_minutes#312, is_delayed#330]
   +- Filter ((isnotnull(order_id#2) AND isnotnull(merchant_id#3)) AND (total_amount#213 > cast(0 as double)))
      +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, event_date#243, delivery_latitude#259, delivery_longitude#277, delivery_time_minutes#312, CASE WHEN (delivery_time_minutes#312 > cast(45 as double)) THEN true ELSE false END AS is_delayed#330]
         +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, event_date#243, delivery_latitude#259, delivery_longitude#277, round((cast((unix_timestamp(estimated_delivery_time#228, yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false) - unix_timestamp(event_timestamp#183, yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false)) as double) / cast(60 as double)), 0) AS delivery_time_minutes#312]
            +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, event_date#243, delivery_latitude#259, delivery_longitude#277]
               +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_location#8, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, event_date#243, delivery_latitude#259, delivery_location#8.longitude AS delivery_longitude#277]
                  +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_location#8, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, event_date#243, delivery_location#8.latitude AS delivery_latitude#259]
                     +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_location#8, delivery_fee#198, total_amount#213, estimated_delivery_time#228, payment_method#12, payment_status#13, date_format(event_timestamp#183, yyyy-MM-dd, Some(Etc/UTC)) AS event_date#243]
                        +- Filter ((isnotnull(order_id#2) AND isnotnull(merchant_id#3)) AND (total_amount#213 > cast(0 as double)))
                           +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_location#8, delivery_fee#198, total_amount#213, to_timestamp(estimated_delivery_time#11, Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Etc/UTC), false) AS estimated_delivery_time#228, payment_method#12, payment_status#13]
                              +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_location#8, delivery_fee#198, cast(total_amount#10 as double) AS total_amount#213, estimated_delivery_time#11, payment_method#12, payment_status#13]
                                 +- Project [event_id#0, event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_location#8, cast(delivery_fee#9 as float) AS delivery_fee#198, total_amount#10, estimated_delivery_time#11, payment_method#12, payment_status#13]
                                    +- Project [event_id#0, to_timestamp(event_timestamp#1, Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Etc/UTC), false) AS event_timestamp#183, order_id#2, merchant_id#3, customer_id#4, service_type#5, order_status#6, items#7, delivery_location#8, delivery_fee#9, total_amount#10, estimated_delivery_time#11, payment_method#12, payment_status#13]
                                       +- Relation [event_id#0,event_timestamp#1,order_id#2,merchant_id#3,customer_id#4,service_type#5,order_status#6,items#7,delivery_location#8,delivery_fee#9,total_amount#10,estimated_delivery_time#11,payment_method#12,payment_status#13] parquet


In [35]:
# df = df.withColumn("event_date", F.to_date("event_timestamp", "yyyyMMdd"))
df=df.withColumn('event_date', df['event_timestamp'].cast(DateType()))


In [40]:
df = df.withColumn("event_date", F.to_date("event_timestamp", "yyyyMMdd"))


In [51]:
df = spark.read.parquet("s3a://numa-delivery/bronze/food-delivery-orders-raw/2025/02/05/07/5254.parquet")
df.printSchema() 

[Stage 17:>                                                         (0 + 1) / 1]

root
 |-- event_id: string (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- service_type: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |    |    |-- unit_price: string (nullable = true)
 |    |    |-- total_price: string (nullable = true)
 |-- delivery_location: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- address: string (nullable = true)
 |-- delivery_fee: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- estimated_delivery_time: string (nullable = true)
 |-- payment_method: string (nullab

                                                                                

In [55]:
df = df.withColumn("event_timestamp", F.to_timestamp("event_timestamp", "yyyy-MM-dd HH:mm:ss.SSSSSS"))
df = df.withColumn("event_date_parsed", F.to_date("event_timestamp_parsed"))

In [64]:
df = df.withColumn("event_date", F.to_date("event_timestamp"))

In [57]:
df = df.withColumn("estimated_delivery_time", F.to_timestamp("estimated_delivery_time", "yyyy-MM-dd HH:mm:ss.SSSSSS"))


In [59]:
df = df.withColumn("delivery_time_minutes", 
                    F.round((F.unix_timestamp("estimated_delivery_time") - 
                            F.unix_timestamp("event_timestamp")) / 60))

In [65]:
df.select('event_date').show(1)

[Stage 25:>                                                         (0 + 1) / 1]

+----------+
|event_date|
+----------+
|2025-02-05|
+----------+
only showing top 1 row



                                                                                

In [2]:
order_schema = StructType([
        StructField("event_id", StringType()),
        StructField("event_timestamp", TimestampType()),
        StructField("order_id", StringType()),
        StructField("merchant_id", StringType()),
        StructField("customer_id", StringType()),
        StructField("service_type", StringType()),
        StructField("order_status", StringType()),
        StructField("items", ArrayType(StructType([
            StructField("item_id", StringType()),
            StructField("name", StringType()),
            StructField("quantity", IntegerType()),
            StructField("unit_price", FloatType()),
            StructField("total_price", FloatType())
        ]))),
        StructField("delivery_location", StructType([
            StructField("latitude", DoubleType()),
            StructField("longitude", DoubleType()),
            StructField("address", StringType())
        ])),
        StructField("delivery_fee", FloatType()),
        StructField("total_amount", FloatType()),
        StructField("estimated_delivery_time", TimestampType()),
        StructField("payment_method", StringType()),
        StructField("payment_status", StringType())
    ])

In [3]:
def get_dates_in_bronze():
    """
    Function to generate list of available dates in the bronze layer
    """
    start_date = datetime.date(2025, 2, 1)  # Change to your start date
    end_date = datetime.date.today()
    
    date_list = []
    while start_date <= end_date:
        date_list.append(start_date.strftime("%Y/%m/%d"))
        start_date += datetime.timedelta(days=1)

    return date_list

In [None]:
base_s3_bronze_path = "s3a://numa-delivery/bronze/food-delivery-orders-raw"
s3_silver_orders_path = "s3a://numa-delivery/silver/food-delivery-orders/"
s3_silver_items_path = "s3a://numa-delivery/silver/food-delivery-order_items/"

date_list = get_dates_in_bronze()

for date in date_list:
    s3_bronze_path = f"{base_s3_bronze_path}/{date}/*"
    
    try:
        logger.info(f"Processing data for date: {date}")
        bronze_df = read_bronze_data(s3_bronze_path)
        
        if bronze_df.count() == 0:
            logger.info(f"No data found for {date}, skipping.")
            continue
        
        silver_df = transform_to_silver(bronze_df)
        order_items_df = explode_order_items(silver_df)
        
        write_silver_data(silver_df, s3_silver_orders_path)
        write_order_items(order_items_df, s3_silver_items_path)
        
        write_to_snowflake(silver_df, "ORDER")
        write_to_snowflake(order_items_df, "ORDER_ITEMS")
    
    except Exception as e:
        logger.error(f"Error processing {date}: {e}")

In [4]:
df = spark.read.parquet("s3a://numa-delivery/bronze/food-delivery-orders-raw/2025/02/05/07/5254.parquet")
df.printSchema() 

Py4JJavaError: An error occurred while calling o36.parquet.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
	at net.snowflake.spark.snowflake.DefaultSource.shortName(DefaultSource.scala:40)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$2(DataSource.scala:629)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$2$adapted(DataSource.scala:629)
	at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
	at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
	at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
	at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
	at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:629)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	... 32 more


In [None]:
base_s3_bronze_path = "s3a://numa-delivery/bronze/food-delivery-orders-raw"
s3_silver_orders_path = "s3a://numa-delivery/silver/food-delivery-orders/"
s3_silver_items_path = "s3a://numa-delivery/silver/food-delivery-order_items/"

date_list = get_dates_in_bronze()

for date in date_list:
    s3_bronze_path = f"{base_s3_bronze_path}/{date}/*"
    
    logger.info(f"Processing data for date: {date}")
    bronze_df = spark.parquet()

In [None]:
def get_orders_schema():
    return StructType([
        StructField("event_id", StringType()),
        StructField("event_timestamp", TimestampType()),
        StructField("order_id", StringType()),
        StructField("merchant_id", StringType()),
        StructField("customer_id", StringType()),
        StructField("service_type", StringType()),
        StructField("order_status", StringType()),
        StructField("items", ArrayType(StructType([
            StructField("item_id", StringType()),
            StructField("name", StringType()),
            StructField("quantity", IntegerType()),
            StructField("unit_price", FloatType()),
            StructField("total_price", FloatType())
        ]))),
        StructField("delivery_location", StructType([
            StructField("latitude", DoubleType()),
            StructField("longitude", DoubleType()),
            StructField("address", StringType())
        ])),
        StructField("delivery_fee", FloatType()),
        StructField("total_amount", FloatType()),
        StructField("estimated_delivery_time", TimestampType()),
        StructField("payment_method", StringType()),
        StructField("payment_status", StringType())
    ])

def read_bronze_data(s3_path):
    logger.info(f"Reading data from {s3_path}")
    return spark.read.schema(get_orders_schema()).parquet(s3_path)

def transform_to_silver(df):
    df = df.filter(F.col("order_id").isNotNull() & F.col("merchant_id").isNotNull() & (F.col("total_amount") > 0))
    df = df.withColumn("event_date", F.date_format("event_timestamp", "yyyy-MM-dd"))
    df = df.withColumn("delivery_latitude", F.col("delivery_location.latitude")) \
           .withColumn("delivery_longitude", F.col("delivery_location.longitude")) \
           .drop("delivery_location")
    df = df.withColumn("delivery_time_minutes", 
                       F.round((F.unix_timestamp("estimated_delivery_time") - 
                                F.unix_timestamp("event_timestamp")) / 60))
    df = df.withColumn("is_delayed", F.when(F.col("delivery_time_minutes") > 45, True).otherwise(False))
    return df

def explode_order_items(df):
    return df.select("order_id", F.explode("items").alias("item"))\
        .select("order_id", "item.item_id", "item.name", "item.quantity", "item.unit_price", "item.total_price")

def write_silver_data(df, s3_path):
    logger.info(f"Writing data to {s3_path}")
    df.write.partitionBy("event_date").mode("append").parquet(s3_path)

def write_order_items(df, s3_path):
    df.write.mode("append").parquet(s3_path)

def write_to_snowflake(df, table_name):
    sf_options = {
        "sfUrl": "https://fg00255.switzerland-north.azure.snowflakecomputing.com",
        "sfUser": os.getenv("SNOWFLAKE_USER"),
        "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"),
        "sfDatabase": "DELIVERY",
        "sfSchema": "SILVER",
        "sfWarehouse": "COMPUTE_WH"
    }
    logger.info(f"Writing data to Snowflake table: {table_name}")
    df.write.format("net.snowflake.spark.snowflake").options(**sf_options).option("dbtable", table_name).mode("append").save()

def get_dates_in_bronze():
    """
    Function to generate list of available dates in the bronze layer
    """
    start_date = datetime.date(2025, 2, 1)  # Change to your start date
    end_date = datetime.date.today()
    
    date_list = []
    while start_date <= end_date:
        date_list.append(start_date.strftime("%Y/%m/%d"))
        start_date += datetime.timedelta(days=1)

    return date_list

def process_data_for_each_day():
    base_s3_bronze_path = "s3a://numa-delivery/bronze/food-delivery-orders-raw"
    s3_silver_orders_path = "s3a://numa-delivery/silver/food-delivery-orders/"
    s3_silver_items_path = "s3a://numa-delivery/silver/food-delivery-order_items/"
    
    date_list = get_dates_in_bronze()
    
    for date in date_list:
        s3_bronze_path = f"{base_s3_bronze_path}/{date}/*"
        
        try:
            logger.info(f"Processing data for date: {date}")
            bronze_df = read_bronze_data(s3_bronze_path)
            
            if bronze_df.count() == 0:
                logger.info(f"No data found for {date}, skipping.")
                continue
            
            silver_df = transform_to_silver(bronze_df)
            order_items_df = explode_order_items(silver_df)
            
            write_silver_data(silver_df, s3_silver_orders_path)
            write_order_items(order_items_df, s3_silver_items_path)
            
            write_to_snowflake(silver_df, "ORDER")
            write_to_snowflake(order_items_df, "ORDER_ITEMS")
        
        except Exception as e:
            logger.error(f"Error processing {date}: {e}")

def main():
    process_data_for_each_day()
    spark.stop()

if __name__ == "__main__":
    main()
