In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## SILVER LAYER SCRIPT

# COMMAND ----------

# MAGIC %md
# MAGIC ### DATA LOADING

# COMMAND ----------

# Define storage account and SAS token details
storage_account_name = "instacartdatalake1"
sas_token = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-04-30T22:47:32Z&st=2025-04-09T14:47:32Z&spr=https&sig=rujTRHKQfCbM9Dks7pD4lKPq2XBnaxho8j3DhtPs7nA%3D"
container_name = "bronze"

# Unmount the existing mount point if it exists
try:
    dbutils.fs.unmount(f"/mnt/{container_name}")
except:
    pass

# Set up configuration
dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
    mount_point=f"/mnt/{container_name}",
    extra_configs={"fs.azure.sas."+container_name+"."+storage_account_name+".blob.core.windows.net": sas_token}
)

# Verify the mount
display(dbutils.fs.ls(f"/mnt/{container_name}"))

/mnt/bronze has been unmounted.


path,name,size,modificationTime
dbfs:/mnt/bronze/InstacartOnlineGroceryBasketDataset/,InstacartOnlineGroceryBasketDataset/,0,0


In [0]:
# COMMAND ----------

bronze_base_path = "/mnt/bronze/InstacartOnlineGroceryBasketDataset/"

aisles_df = spark.read.option("header", "true").csv(bronze_base_path + "aisles.csv")
departments_df = spark.read.option("header", "true").csv(bronze_base_path + "departments.csv")
order_products_prior_df = spark.read.option("header", "true").csv(bronze_base_path + "order_products__prior.csv")
order_products_train_df = spark.read.option("header", "true").csv(bronze_base_path + "order_products__train.csv")
orders_df = spark.read.option("header", "true").csv(bronze_base_path + "orders.csv")
products_df = spark.read.option("header", "true").csv(bronze_base_path + "products.csv")

In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ### Transformation

# COMMAND ----------

# Define storage account and SAS token details
storage_account_name = "instacartdatalake1"
sas_token = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-04-30T22:47:32Z&st=2025-04-09T14:47:32Z&spr=https&sig=rujTRHKQfCbM9Dks7pD4lKPq2XBnaxho8j3DhtPs7nA%3D"
container_name = "silver"

# Unmount the existing mount point if it exists
try:
    dbutils.fs.unmount(f"/mnt/{container_name}")
except:
    pass

# Set up configuration
dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
    mount_point=f"/mnt/{container_name}",
    extra_configs={"fs.azure.sas."+container_name+"."+storage_account_name+".blob.core.windows.net": sas_token}
)

/mnt/silver has been unmounted.


True

In [0]:
orders_df.limit(10).write.mode("overwrite").option("header", "true").csv("/mnt/silver/test_orders")


In [0]:
display(dbutils.fs.ls("/mnt/silver"))


path,name,size,modificationTime
dbfs:/mnt/silver/test_orders/,test_orders/,0,0


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ### Save Cleaned Raw Tables to Silver (No Merges)

# COMMAND ----------
# Remove rows where eval_set is 'test'
from pyspark.sql.functions import col
orders_df = orders_df.filter(col("eval_set") != "test")

# Drop duplicate or conflicting columns if present in products_df
columns_to_drop = ["aisle", "department", "aisle_name", "department_name"]
products_df = products_df.drop(*[c for c in columns_to_drop if c in products_df.columns])

# Save all raw datasets individually to Silver
orders_df.write.format("delta").mode("overwrite").save("/mnt/silver/orders")
products_df.write.format("delta").mode("overwrite").save("/mnt/silver/products")
order_products_prior_df.write.format("delta").mode("overwrite").save("/mnt/silver/order_products_prior")
order_products_train_df.write.format("delta").mode("overwrite").save("/mnt/silver/order_products_train")
aisles_df.write.format("delta").mode("overwrite").save("/mnt/silver/aisles")
departments_df.write.format("delta").mode("overwrite").save("/mnt/silver/departments")

# COMMAND ----------
# MAGIC %md
# MAGIC ✅ Done! All raw Bronze tables are now written as clean, separate Delta tables in the Silver layer. Power BI or SQL tools can join them on-demand for reporting.


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ### Explore Silver Data (Query Examples)

# COMMAND ----------
# Load Silver Delta Tables
orders = spark.read.format("delta").load("/mnt/silver/orders")
products = spark.read.format("delta").load("/mnt/silver/products")
order_prior = spark.read.format("delta").load("/mnt/silver/order_products_prior")
order_train = spark.read.format("delta").load("/mnt/silver/order_products_train")
aisles = spark.read.format("delta").load("/mnt/silver/aisles")
departments = spark.read.format("delta").load("/mnt/silver/departments")

In [0]:
# COMMAND ----------
# Show some orders from a specific user (example: user_id = 1)
orders.filter(col("user_id") == 1).show(5)


+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329|      1|   prior|           1|        2|               08|                  NULL|
| 2398795|      1|   prior|           2|        3|               07|                  15.0|
|  473747|      1|   prior|           3|        3|               12|                  21.0|
| 2254736|      1|   prior|           4|        4|               07|                  29.0|
|  431534|      1|   prior|           5|        4|               15|                  28.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+
only showing top 5 rows



In [0]:
# COMMAND ----------
# Count total number of orders
print(f"Total Orders: {orders.count()}")


Total Orders: 3346083


In [0]:
# COMMAND ----------
# Most frequently ordered product_ids in prior orders
from pyspark.sql.functions import desc

order_prior.groupBy("product_id") \
    .count() \
    .orderBy(desc("count")) \
    .show(10)


+----------+------+
|product_id| count|
+----------+------+
|     24852|472565|
|     13176|379450|
|     21137|264683|
|     21903|241921|
|     47209|213584|
|     47766|176815|
|     47626|152657|
|     16797|142951|
|     26209|140627|
|     27845|137905|
+----------+------+
only showing top 10 rows



In [0]:
# COMMAND ----------
# List distinct departments
departments.select("department_id", "department_name").distinct().show()


+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|           10|           bulk|
|           21|        missing|
|           13|         pantry|
|           12|   meat seafood|
|           17|      household|
|            1|         frozen|
|            3|         bakery|
|           18|         babies|
|           14|      breakfast|
|           15|   canned goods|
|            2|          other|
|            6|  international|
|            5|        alcohol|
|            4|        produce|
|            8|           pets|
|            9|dry goods pasta|
|           16|     dairy eggs|
|           11|  personal care|
|           19|         snacks|
|            7|      beverages|
+-------------+---------------+
only showing top 20 rows



In [0]:
# COMMAND ----------
# Top users by number of orders
orders.groupBy("user_id").count().orderBy(desc("count")).show(10)


+-------+-----+
|user_id|count|
+-------+-----+
|  17884|  100|
|  31310|  100|
|  10502|  100|
|  22802|  100|
|   1958|  100|
|   7658|  100|
|   9911|  100|
|  50817|  100|
|   3742|  100|
|  15124|  100|
+-------+-----+
only showing top 10 rows



In [0]:
# COMMAND ----------
# Number of orders by hour of day
orders.groupBy("order_hour_of_day").count().orderBy("order_hour_of_day").show()


+-----------------+------+
|order_hour_of_day| count|
+-----------------+------+
|               00| 22224|
|               01| 12103|
|               02|  7375|
|               03|  5343|
|               04|  5393|
|               05|  9374|
|               06| 29913|
|               07| 90032|
|               08|174664|
|               09|252529|
|               10|282470|
|               11|278616|
|               12|266828|
|               13|271885|
|               14|276659|
|               15|277207|
|               16|266444|
|               17|223433|
|               18|178556|
|               19|137341|
+-----------------+------+
only showing top 20 rows



In [0]:
# COMMAND ----------
# Number of orders by day of week
orders.groupBy("order_dow").count().orderBy("order_dow").show()


+---------+------+
|order_dow| count|
+---------+------+
|        0|585237|
|        1|576377|
|        2|458074|
|        3|428087|
|        4|417171|
|        5|443388|
|        6|437749|
+---------+------+



In [0]:
# COMMAND ----------
# Top 10 products by total orders in training set
order_train.groupBy("product_id").count().orderBy(desc("count")).show(10)


+----------+-----+
|product_id|count|
+----------+-----+
|     24852|18726|
|     13176|15480|
|     21137|10894|
|     21903| 9784|
|     47626| 8135|
|     47766| 7409|
|     47209| 7293|
|     16797| 6494|
|     26209| 6033|
|     27966| 5546|
+----------+-----+
only showing top 10 rows

