## Data cleaning

Loads the Instacart CSV files (orders, order_products, products, aisles, departments) into Spark DataFrames.
Prints each table’s schema (column names + data types) and shows a few sample rows.

In [2]:
# Fabric / PySpark
from pyspark.sql import functions as F

# Update these paths based on where you put the CSVs in your Lakehouse
base_path = "Files/Bronze_raw/"   # example

orders_path = base_path + "orders.csv"
prior_path  = base_path + "order_products__prior.csv"
train_path  = base_path + "order_products__train.csv"
products_path = base_path + "products.csv"
aisles_path = base_path + "aisles.csv"
departments_path = base_path + "departments.csv"

orders = (spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(orders_path))

order_products_prior = (spark.read.option("header", True).option("inferSchema", True).csv(prior_path))
order_products_train = (spark.read.option("header", True).option("inferSchema", True).csv(train_path))
products = (spark.read.option("header", True).option("inferSchema", True).csv(products_path))
aisles = (spark.read.option("header", True).option("inferSchema", True).csv(aisles_path))
departments = (spark.read.option("header", True).option("inferSchema", True).csv(departments_path))

# Quick schema checks
for name, df in [
    ("orders", orders),
    ("order_products__prior", order_products_prior),
    ("order_products__train", order_products_train),
    ("products", products),
    ("aisles", aisles),
    ("departments", departments)
]:
    print("\n===", name, "===")
    df.printSchema()
    display(df.limit(5))


StatementMeta(, dccab2a8-399b-488c-9600-073a14ad5267, 4, Finished, Available, Finished)


=== orders ===
root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: double (nullable = true)



SynapseWidget(Synapse.DataFrame, f1cee9dc-0265-41db-a978-9bf06d24861f)


=== order_products__prior ===
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)



SynapseWidget(Synapse.DataFrame, 9ea0374f-b215-4d3b-9b4c-a8c08f089fee)


=== order_products__train ===
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)



SynapseWidget(Synapse.DataFrame, 075c42a9-f7c6-4190-82d0-ee439e1424d7)


=== products ===
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 7b05d30c-5cf4-48d9-a9f9-f919deb922c9)


=== aisles ===
root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 13d14400-8df2-400b-ac3b-d272749faefd)


=== departments ===
root
 |-- department_id: integer (nullable = true)
 |-- department: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 8ecfc0f4-c046-46fd-9b91-63826ab181f2)

--




Builds a synthetic timeline per customer:

orders are sorted by order_number.
computes cumulative days since first order using days_since_prior_order.
adds those days to a fixed anchor date (e.g., 2017-01-01) to create order_date.
derives order_month as the first day of the month of order_date.





--

In [3]:
from pyspark.sql import Window
from pyspark.sql import functions as F

# 1) Drop missing customer_id (user_id)
orders_clean = orders.filter(F.col("user_id").isNotNull())

# 2) De-duplicate on order_id (defensive)
# If duplicates exist, keep the one with max order_number (or any stable rule)
w_dedup = Window.partitionBy("order_id").orderBy(F.col("order_number").desc_nulls_last())
orders_clean = (orders_clean
    .withColumn("rn", F.row_number().over(w_dedup))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

# 3) Fix invalid days_since_prior_order (NULL for first order, negative values)
orders_clean = (orders_clean
    .withColumn(
        "days_since_prior_order_fixed",
        F.when(F.col("days_since_prior_order").isNull(), F.lit(0.0))
         .when(F.col("days_since_prior_order") < 0, F.lit(0.0))
         .otherwise(F.col("days_since_prior_order"))
    )
)

# 4) Create synthetic order_date using cumulative days since first order per customer
w_user_order = Window.partitionBy("user_id").orderBy(F.col("order_number").asc())
orders_clean = (orders_clean
    .withColumn(
        "days_since_first_order",
        F.sum(F.col("days_since_prior_order_fixed")).over(w_user_order)
    )
)

anchor_date = F.to_date(F.lit("2017-01-01"))

clean_orders = (orders_clean
    .withColumn("order_date", F.date_add(anchor_date, F.col("days_since_first_order").cast("int")))
    .withColumn("order_month", F.date_trunc("month", F.col("order_date")).cast("date"))
    .select(
        F.col("user_id").alias("customer_id"),
        F.col("order_id"),
        F.col("order_date"),
        F.col("order_month")
    )
)

display(clean_orders.limit(10))


StatementMeta(, dccab2a8-399b-488c-9600-073a14ad5267, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 11684e02-30db-4fc4-b166-6e18c67243bf)

--



Filters out rows where user_id is NULL (missing customer). Ensures there is only one row per order_id. If duplicates exist, keeps the row with a stable rule (ex: highest order_number).

Fix invalid days_since_prior_order Converts: NULL days_since_prior_order (usually first order) → 0 negative values (defensive) → 0 We later build a synthetic order timeline from this field. Null/negative values would break cumulative time calculations.

Finally, Produces a clean order-level dataset with only the columns needed for cohorts:

customer_id 
order_id 
order_date 
order_month



--

A clean table clean_orders stored in Lakehouse (Delta).

In [4]:
# Save as a managed Delta table in your Lakehouse
(clean_orders
  .write
  .mode("overwrite")
  .format("delta")
  .saveAsTable("clean_orders")
)

# Quick validation
spark.sql("SELECT COUNT(*) AS rows, COUNT(DISTINCT order_id) AS distinct_orders, COUNT(DISTINCT customer_id) AS distinct_customers FROM clean_orders").show()
spark.sql("SELECT * FROM clean_orders ORDER BY customer_id, order_date LIMIT 20").show()


StatementMeta(, dccab2a8-399b-488c-9600-073a14ad5267, 6, Finished, Available, Finished)

+-------+---------------+------------------+
|   rows|distinct_orders|distinct_customers|
+-------+---------------+------------------+
|3421083|        3421083|            206209|
+-------+---------------+------------------+

+-----------+--------+----------+-----------+
|customer_id|order_id|order_date|order_month|
+-----------+--------+----------+-----------+
|          1| 2539329|2017-01-01| 2017-01-01|
|          1| 2398795|2017-01-16| 2017-01-01|
|          1|  473747|2017-02-06| 2017-02-01|
|          1| 2254736|2017-03-07| 2017-03-01|
|          1|  431534|2017-04-04| 2017-04-01|
|          1| 3367565|2017-04-23| 2017-04-01|
|          1|  550135|2017-05-13| 2017-05-01|
|          1| 3108588|2017-05-27| 2017-05-01|
|          1| 2295261|2017-05-27| 2017-05-01|
|          1| 2550362|2017-06-26| 2017-06-01|
|          1| 1187899|2017-07-10| 2017-07-01|
|          2| 2168274|2017-01-01| 2017-01-01|
|          2| 1501582|2017-01-11| 2017-01-01|
|          2| 1901567|2017-01-14| 2017